1
0
mirror of https://github.com/nvbn/thefuck.git synced 2025-09-18 19:22:32 +01:00

Move commands-related logic to Command and CorrectedCommand

This commit is contained in:
nvbn
2015-09-08 15:00:57 +03:00
parent 4a27595e97
commit a8dbc48fd4
8 changed files with 149 additions and 120 deletions

View File

@@ -1,46 +0,0 @@
import pytest
from subprocess import PIPE
from mock import Mock
from thefuck import main
from tests.utils import Command
class TestGetCommand(object):
@pytest.fixture(autouse=True)
def Popen(self, monkeypatch):
Popen = Mock()
Popen.return_value.stdout.read.return_value = b'stdout'
Popen.return_value.stderr.read.return_value = b'stderr'
monkeypatch.setattr('thefuck.main.Popen', Popen)
return Popen
@pytest.fixture(autouse=True)
def prepare(self, monkeypatch):
monkeypatch.setattr('thefuck.main.os.environ', {})
monkeypatch.setattr('thefuck.main.wait_output', lambda *_: True)
@pytest.fixture(autouse=True)
def generic_shell(self, monkeypatch):
monkeypatch.setattr('thefuck.shells.from_shell', lambda x: x)
monkeypatch.setattr('thefuck.shells.to_shell', lambda x: x)
def test_get_command_calls(self, Popen, settings):
settings.env = {}
assert main.get_command(['thefuck', 'apt-get', 'search', 'vim']) \
== Command('apt-get search vim', 'stdout', 'stderr')
Popen.assert_called_once_with('apt-get search vim',
shell=True,
stdout=PIPE,
stderr=PIPE,
env={})
@pytest.mark.parametrize('args, result', [
(['thefuck', ''], None),
(['thefuck', '', ''], None),
(['thefuck', 'ls', '-la'], 'ls -la'),
(['thefuck', 'ls'], 'ls')])
def test_get_command_script(self, args, result):
if result:
assert main.get_command(args).script == result
else:
assert main.get_command(args) is None

View File

@@ -1,8 +1,10 @@
from subprocess import PIPE
from mock import Mock from mock import Mock
from pathlib import Path from pathlib import Path
import pytest import pytest
from tests.utils import CorrectedCommand, Rule, Command from tests.utils import CorrectedCommand, Rule, Command
from thefuck import conf from thefuck import conf
from thefuck.exceptions import EmptyCommand
class TestCorrectedCommand(object): class TestCorrectedCommand(object):
@@ -76,3 +78,48 @@ class TestRule(object):
priority=100) priority=100)
assert list(rule.get_corrected_commands(Command(script='test'))) \ assert list(rule.get_corrected_commands(Command(script='test'))) \
== [CorrectedCommand(script='test!', priority=100)] == [CorrectedCommand(script='test!', priority=100)]
class TestCommand(object):
@pytest.fixture(autouse=True)
def Popen(self, monkeypatch):
Popen = Mock()
Popen.return_value.stdout.read.return_value = b'stdout'
Popen.return_value.stderr.read.return_value = b'stderr'
monkeypatch.setattr('thefuck.types.Popen', Popen)
return Popen
@pytest.fixture(autouse=True)
def prepare(self, monkeypatch):
monkeypatch.setattr('thefuck.types.os.environ', {})
monkeypatch.setattr('thefuck.types.Command._wait_output',
staticmethod(lambda *_: True))
@pytest.fixture(autouse=True)
def generic_shell(self, monkeypatch):
monkeypatch.setattr('thefuck.shells.from_shell', lambda x: x)
monkeypatch.setattr('thefuck.shells.to_shell', lambda x: x)
def test_from_script_calls(self, Popen, settings):
settings.env = {}
assert Command.from_raw_script(
['apt-get', 'search', 'vim']) == Command(
'apt-get search vim', 'stdout', 'stderr')
Popen.assert_called_once_with('apt-get search vim',
shell=True,
stdout=PIPE,
stderr=PIPE,
env={})
@pytest.mark.parametrize('script, result', [
([''], None),
(['', ''], None),
(['ls', '-la'], 'ls -la'),
(['ls'], 'ls')])
def test_from_script(self, script, result):
if result:
assert Command.from_raw_script(script).script == result
else:
with pytest.raises(EmptyCommand):
Command.from_raw_script(script)

View File

@@ -3,8 +3,9 @@ from thefuck import types
from thefuck.conf import DEFAULT_PRIORITY from thefuck.conf import DEFAULT_PRIORITY
def Command(script='', stdout='', stderr=''): class Command(types.Command):
return types.Command(script, stdout, stderr) def __init__(self, script='', stdout='', stderr=''):
super(Command, self).__init__(script, stdout, stderr)
class Rule(types.Rule): class Rule(types.Rule):

2
thefuck/exceptions.py Normal file
View File

@@ -0,0 +1,2 @@
class EmptyCommand(Exception):
"""Raises when empty command passed to `thefuck`."""

View File

@@ -4,16 +4,12 @@ from pathlib import Path
from os.path import expanduser from os.path import expanduser
from pprint import pformat from pprint import pformat
import pkg_resources import pkg_resources
from subprocess import Popen, PIPE
import os
import sys import sys
from psutil import Process, TimeoutExpired
import colorama import colorama
import six
from . import logs, types, shells from . import logs, types, shells
from .conf import initialize_settings_file, init_settings, settings from .conf import initialize_settings_file, init_settings, settings
from .corrector import get_corrected_commands from .corrector import get_corrected_commands
from .utils import compatibility_call from .exceptions import EmptyCommand
from .ui import select_command from .ui import select_command
@@ -27,62 +23,6 @@ def setup_user_dir():
return user_dir return user_dir
def wait_output(popen):
"""Returns `True` if we can get output of the command in the
`settings.wait_command` time.
Command will be killed if it wasn't finished in the time.
"""
proc = Process(popen.pid)
try:
proc.wait(settings.wait_command)
return True
except TimeoutExpired:
for child in proc.children(recursive=True):
child.kill()
proc.kill()
return False
def get_command(args):
"""Creates command from `args` and executes it."""
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in args[1:])
else:
script = ' '.join(args[1:])
script = script.strip()
if not script:
return
script = shells.from_shell(script)
env = dict(os.environ)
env.update(settings.env)
with logs.debug_time(u'Call: {}; with env: {};'.format(script, env)):
result = Popen(script, shell=True, stdout=PIPE, stderr=PIPE, env=env)
if wait_output(result):
stdout = result.stdout.read().decode('utf-8')
stderr = result.stderr.read().decode('utf-8')
logs.debug(u'Received stdout: {}'.format(stdout))
logs.debug(u'Received stderr: {}'.format(stderr))
return types.Command(script, stdout, stderr)
else:
logs.debug(u'Execution timed out!')
return types.Command(script, None, None)
def run_command(old_cmd, command):
"""Runs command from rule for passed command."""
if command.side_effect:
compatibility_call(command.side_effect, old_cmd, command.script)
shells.put_to_history(command.script)
print(command.script)
# Entry points: # Entry points:
def fix_command(): def fix_command():
@@ -92,16 +32,16 @@ def fix_command():
with logs.debug_time('Total'): with logs.debug_time('Total'):
logs.debug(u'Run with settings: {}'.format(pformat(settings))) logs.debug(u'Run with settings: {}'.format(pformat(settings)))
command = get_command(sys.argv) try:
command = types.Command.from_raw_script(sys.argv[1:])
if not command: except EmptyCommand:
logs.debug('Empty command, nothing to do') logs.debug('Empty command, nothing to do')
return return
corrected_commands = get_corrected_commands(command) corrected_commands = get_corrected_commands(command)
selected_command = select_command(corrected_commands) selected_command = select_command(corrected_commands)
if selected_command: if selected_command:
run_command(command, selected_command) selected_command.run(command)
def _get_current_version(): def _get_current_version():

View File

@@ -27,6 +27,6 @@ def git_support(fn, command):
expansion = ' '.join(map(quote, split(search.group(2)))) expansion = ' '.join(map(quote, split(search.group(2))))
new_script = command.script.replace(alias, expansion) new_script = command.script.replace(alias, expansion)
command = Command._replace(command, script=new_script) command = command.update(script=new_script)
return fn(command) return fn(command)

View File

@@ -9,9 +9,7 @@ def sudo_support(fn, command):
if not command.script.startswith('sudo '): if not command.script.startswith('sudo '):
return fn(command) return fn(command)
result = fn(Command(command.script[5:], result = fn(command.update(script=command.script[5:]))
command.stdout,
command.stderr))
if result and isinstance(result, six.string_types): if result and isinstance(result, six.string_types):
return u'sudo {}'.format(result) return u'sudo {}'.format(result)

View File

@@ -1,11 +1,91 @@
from collections import namedtuple
from imp import load_source from imp import load_source
import os
from subprocess import Popen, PIPE, TimeoutExpired
import sys import sys
from psutil import Process
import six
from .conf import settings, DEFAULT_PRIORITY, ALL_ENABLED from .conf import settings, DEFAULT_PRIORITY, ALL_ENABLED
from .utils import compatibility_call from .utils import compatibility_call
from . import logs from .exceptions import EmptyCommand
from . import logs, shells
Command = namedtuple('Command', ('script', 'stdout', 'stderr'))
class Command(object):
"""Command that should be fixed."""
def __init__(self, script, stdout, stderr):
self.script = script
self.stdout = stdout
self.stderr = stderr
def __eq__(self, other):
if isinstance(other, Command):
return (self.script, self.stdout, self.stderr) \
== (other.script, other.stdout, other.stderr)
else:
return False
def __repr__(self):
return 'Command(script={}, stdout={}, stderr={})'.format(
self.script, self.stdout, self.stderr)
def update(self, **kwargs):
"""Returns new command with replaced fields."""
kwargs.setdefault('script', self.script)
kwargs.setdefault('stdout', self.stdout)
kwargs.setdefault('stderr', self.stderr)
return Command(**kwargs)
@staticmethod
def _wait_output(popen):
"""Returns `True` if we can get output of the command in the
`settings.wait_command` time.
Command will be killed if it wasn't finished in the time.
"""
proc = Process(popen.pid)
try:
proc.wait(settings.wait_command)
return True
except TimeoutExpired:
for child in proc.children(recursive=True):
child.kill()
proc.kill()
return False
@staticmethod
def _prepare_script(raw_script):
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
else:
script = ' '.join(raw_script)
script = script.strip()
return shells.from_shell(script)
@classmethod
def from_raw_script(cls, raw_script):
script = cls._prepare_script(raw_script)
if not script:
raise EmptyCommand
env = dict(os.environ)
env.update(settings.env)
with logs.debug_time(u'Call: {}; with env: {};'.format(script, env)):
result = Popen(script, shell=True, stdout=PIPE, stderr=PIPE, env=env)
if cls._wait_output(result):
stdout = result.stdout.read().decode('utf-8')
stderr = result.stderr.read().decode('utf-8')
logs.debug(u'Received stdout: {}'.format(stdout))
logs.debug(u'Received stderr: {}'.format(stderr))
return cls(script, stdout, stderr)
else:
logs.debug(u'Execution timed out!')
return cls(script, None, None)
class Rule(object): class Rule(object):
@@ -108,3 +188,10 @@ class CorrectedCommand(object):
def __repr__(self): def __repr__(self):
return 'CorrectedCommand(script={}, side_effect={}, priority={})'.format( return 'CorrectedCommand(script={}, side_effect={}, priority={})'.format(
self.script, self.side_effect, self.priority) self.script, self.side_effect, self.priority)
def run(self, old_cmd):
"""Runs command from rule for passed command."""
if self.side_effect:
compatibility_call(self.side_effect, old_cmd, self.script)
shells.put_to_history(self.script)
print(self.script)