mirror of
https://github.com/nvbn/thefuck.git
synced 2025-01-31 02:01:13 +00:00
Merge pull request #366 from nvbn/unned-abstractions
Improve code structure
This commit is contained in:
commit
6e886c6b4f
@ -41,3 +41,8 @@ def functional(request):
|
||||
if request.node.get_marker('functional') \
|
||||
and not request.config.getoption('enable_functional'):
|
||||
pytest.skip('functional tests are disabled')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def source_root():
|
||||
return Path(__file__).parent.parent.resolve()
|
||||
|
@ -31,29 +31,33 @@ def proc(request, spawnu, run_without_docker):
|
||||
|
||||
@pytest.mark.functional
|
||||
@pytest.mark.once_without_docker
|
||||
def test_with_confirmation(proc, TIMEOUT):
|
||||
def test_with_confirmation(proc, TIMEOUT, run_without_docker):
|
||||
with_confirmation(proc, TIMEOUT)
|
||||
if not run_without_docker:
|
||||
history_changed(proc, TIMEOUT, u'echo test')
|
||||
|
||||
|
||||
@pytest.mark.functional
|
||||
@pytest.mark.once_without_docker
|
||||
def test_select_command_with_arrows(proc, TIMEOUT):
|
||||
def test_select_command_with_arrows(proc, TIMEOUT, run_without_docker):
|
||||
select_command_with_arrows(proc, TIMEOUT)
|
||||
if not run_without_docker:
|
||||
history_changed(proc, TIMEOUT, u'git help')
|
||||
|
||||
|
||||
@pytest.mark.functional
|
||||
@pytest.mark.once_without_docker
|
||||
def test_refuse_with_confirmation(proc, TIMEOUT):
|
||||
def test_refuse_with_confirmation(proc, TIMEOUT, run_without_docker):
|
||||
refuse_with_confirmation(proc, TIMEOUT)
|
||||
if not run_without_docker:
|
||||
history_not_changed(proc, TIMEOUT)
|
||||
|
||||
|
||||
@pytest.mark.functional
|
||||
@pytest.mark.once_without_docker
|
||||
def test_without_confirmation(proc, TIMEOUT):
|
||||
def test_without_confirmation(proc, TIMEOUT, run_without_docker):
|
||||
without_confirmation(proc, TIMEOUT)
|
||||
if not run_without_docker:
|
||||
history_changed(proc, TIMEOUT, u'echo test')
|
||||
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from thefuck.main import _get_current_version
|
||||
from thefuck.utils import get_installation_info
|
||||
|
||||
envs = ((u'bash', 'thefuck/ubuntu-bash', u'''
|
||||
FROM ubuntu:latest
|
||||
@ -18,7 +18,8 @@ def test_installation(spawnu, shell, TIMEOUT, tag, dockerfile):
|
||||
proc = spawnu(tag, dockerfile, shell)
|
||||
proc.sendline(u'cat /src/install.sh | sh - && $0')
|
||||
proc.sendline(u'thefuck --version')
|
||||
assert proc.expect([TIMEOUT, u'thefuck {}'.format(_get_current_version())],
|
||||
version = get_installation_info().version
|
||||
assert proc.expect([TIMEOUT, u'thefuck {}'.format(version)],
|
||||
timeout=600)
|
||||
proc.sendline(u'fuck')
|
||||
assert proc.expect([TIMEOUT, u'No fucks given'])
|
||||
|
@ -2,7 +2,6 @@ import pytest
|
||||
import os
|
||||
from thefuck.rules.fix_file import match, get_new_command
|
||||
from tests.utils import Command
|
||||
from thefuck.types import Settings
|
||||
|
||||
|
||||
# (script, file, line, col (or None), stdout, stderr)
|
||||
|
@ -2,15 +2,6 @@ import pytest
|
||||
import six
|
||||
from mock import Mock
|
||||
from thefuck import conf
|
||||
from tests.utils import Rule
|
||||
|
||||
|
||||
@pytest.mark.parametrize('enabled, rules, result', [
|
||||
(True, conf.DEFAULT_RULES, True),
|
||||
(False, conf.DEFAULT_RULES, False),
|
||||
(False, conf.DEFAULT_RULES + ['test'], True)])
|
||||
def test_default(enabled, rules, result):
|
||||
assert (Rule('test', enabled_by_default=enabled) in rules) == result
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -28,7 +19,7 @@ def environ(monkeypatch):
|
||||
@pytest.mark.usefixture('environ')
|
||||
def test_settings_defaults(load_source, settings):
|
||||
load_source.return_value = object()
|
||||
conf.init_settings(Mock())
|
||||
settings.init()
|
||||
for key, val in conf.DEFAULT_SETTINGS.items():
|
||||
assert getattr(settings, key) == val
|
||||
|
||||
@ -42,7 +33,7 @@ class TestSettingsFromFile(object):
|
||||
no_colors=True,
|
||||
priority={'vim': 100},
|
||||
exclude_rules=['git'])
|
||||
conf.init_settings(Mock())
|
||||
settings.init()
|
||||
assert settings.rules == ['test']
|
||||
assert settings.wait_command == 10
|
||||
assert settings.require_confirmation is True
|
||||
@ -56,7 +47,7 @@ class TestSettingsFromFile(object):
|
||||
exclude_rules=[],
|
||||
require_confirmation=True,
|
||||
no_colors=True)
|
||||
conf.init_settings(Mock())
|
||||
settings.init()
|
||||
assert settings.rules == conf.DEFAULT_RULES + ['test']
|
||||
|
||||
|
||||
@ -69,7 +60,7 @@ class TestSettingsFromEnv(object):
|
||||
'THEFUCK_REQUIRE_CONFIRMATION': 'true',
|
||||
'THEFUCK_NO_COLORS': 'false',
|
||||
'THEFUCK_PRIORITY': 'bash=10:lisp=wrong:vim=15'})
|
||||
conf.init_settings(Mock())
|
||||
settings.init()
|
||||
assert settings.rules == ['bash', 'lisp']
|
||||
assert settings.exclude_rules == ['git', 'vim']
|
||||
assert settings.wait_command == 55
|
||||
@ -79,26 +70,26 @@ class TestSettingsFromEnv(object):
|
||||
|
||||
def test_from_env_with_DEFAULT(self, environ, settings):
|
||||
environ.update({'THEFUCK_RULES': 'DEFAULT_RULES:bash:lisp'})
|
||||
conf.init_settings(Mock())
|
||||
settings.init()
|
||||
assert settings.rules == conf.DEFAULT_RULES + ['bash', 'lisp']
|
||||
|
||||
|
||||
class TestInitializeSettingsFile(object):
|
||||
def test_ignore_if_exists(self):
|
||||
def test_ignore_if_exists(self, settings):
|
||||
settings_path_mock = Mock(is_file=Mock(return_value=True), open=Mock())
|
||||
user_dir_mock = Mock(joinpath=Mock(return_value=settings_path_mock))
|
||||
conf.initialize_settings_file(user_dir_mock)
|
||||
settings.user_dir = Mock(joinpath=Mock(return_value=settings_path_mock))
|
||||
settings._init_settings_file()
|
||||
assert settings_path_mock.is_file.call_count == 1
|
||||
assert not settings_path_mock.open.called
|
||||
|
||||
def test_create_if_doesnt_exists(self):
|
||||
def test_create_if_doesnt_exists(self, settings):
|
||||
settings_file = six.StringIO()
|
||||
settings_path_mock = Mock(
|
||||
is_file=Mock(return_value=False),
|
||||
open=Mock(return_value=Mock(
|
||||
__exit__=lambda *args: None, __enter__=lambda *args: settings_file)))
|
||||
user_dir_mock = Mock(joinpath=Mock(return_value=settings_path_mock))
|
||||
conf.initialize_settings_file(user_dir_mock)
|
||||
settings.user_dir = Mock(joinpath=Mock(return_value=settings_path_mock))
|
||||
settings._init_settings_file()
|
||||
settings_file_contents = settings_file.getvalue()
|
||||
assert settings_path_mock.is_file.call_count == 1
|
||||
assert settings_path_mock.open.call_count == 1
|
||||
|
@ -1,24 +1,8 @@
|
||||
import pytest
|
||||
from pathlib import PosixPath, Path
|
||||
from mock import Mock
|
||||
from thefuck import corrector, conf, types
|
||||
from pathlib import PosixPath
|
||||
from thefuck import corrector, conf
|
||||
from tests.utils import Rule, Command, CorrectedCommand
|
||||
from thefuck.corrector import make_corrected_commands, get_corrected_commands
|
||||
|
||||
|
||||
def test_load_rule(mocker):
|
||||
match = object()
|
||||
get_new_command = object()
|
||||
load_source = mocker.patch(
|
||||
'thefuck.corrector.load_source',
|
||||
return_value=Mock(match=match,
|
||||
get_new_command=get_new_command,
|
||||
enabled_by_default=True,
|
||||
priority=900,
|
||||
requires_output=True))
|
||||
assert corrector.load_rule(Path('/rules/bash.py')) \
|
||||
== Rule('bash', match, get_new_command, priority=900)
|
||||
load_source.assert_called_once_with('bash', '/rules/bash.py')
|
||||
from thefuck.corrector import get_corrected_commands, organize_commands
|
||||
|
||||
|
||||
class TestGetRules(object):
|
||||
@ -31,18 +15,12 @@ class TestGetRules(object):
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def load_source(self, monkeypatch):
|
||||
monkeypatch.setattr('thefuck.corrector.load_source',
|
||||
monkeypatch.setattr('thefuck.types.load_source',
|
||||
lambda x, _: Rule(x))
|
||||
|
||||
def _compare_names(self, rules, names):
|
||||
assert {r.name for r in rules} == set(names)
|
||||
|
||||
def _prepare_rules(self, rules):
|
||||
if rules == conf.DEFAULT_RULES:
|
||||
return rules
|
||||
else:
|
||||
return types.RulesNamesList(rules)
|
||||
|
||||
@pytest.mark.parametrize('paths, conf_rules, exclude_rules, loaded_rules', [
|
||||
(['git.py', 'bash.py'], conf.DEFAULT_RULES, [], ['git', 'bash']),
|
||||
(['git.py', 'bash.py'], ['git'], [], ['git']),
|
||||
@ -51,44 +29,13 @@ class TestGetRules(object):
|
||||
def test_get_rules(self, glob, settings, paths, conf_rules, exclude_rules,
|
||||
loaded_rules):
|
||||
glob([PosixPath(path) for path in paths])
|
||||
settings.update(rules=self._prepare_rules(conf_rules),
|
||||
settings.update(rules=conf_rules,
|
||||
priority={},
|
||||
exclude_rules=self._prepare_rules(exclude_rules))
|
||||
exclude_rules=exclude_rules)
|
||||
rules = corrector.get_rules()
|
||||
self._compare_names(rules, loaded_rules)
|
||||
|
||||
|
||||
class TestIsRuleMatch(object):
|
||||
def test_no_match(self):
|
||||
assert not corrector.is_rule_match(
|
||||
Command('ls'), Rule('', lambda _: False))
|
||||
|
||||
def test_match(self):
|
||||
rule = Rule('', lambda x: x.script == 'cd ..')
|
||||
assert corrector.is_rule_match(Command('cd ..'), rule)
|
||||
|
||||
@pytest.mark.usefixtures('no_colors')
|
||||
def test_when_rule_failed(self, capsys):
|
||||
rule = Rule('test', Mock(side_effect=OSError('Denied')),
|
||||
requires_output=False)
|
||||
assert not corrector.is_rule_match(Command('ls'), rule)
|
||||
assert capsys.readouterr()[1].split('\n')[0] == '[WARN] Rule test:'
|
||||
|
||||
|
||||
class TestMakeCorrectedCommands(object):
|
||||
def test_with_rule_returns_list(self):
|
||||
rule = Rule(get_new_command=lambda x: [x.script + '!', x.script + '@'],
|
||||
priority=100)
|
||||
assert list(make_corrected_commands(Command(script='test'), rule)) \
|
||||
== [CorrectedCommand(script='test!', priority=100),
|
||||
CorrectedCommand(script='test@', priority=200)]
|
||||
|
||||
def test_with_rule_returns_command(self):
|
||||
rule = Rule(get_new_command=lambda x: x.script + '!',
|
||||
priority=100)
|
||||
assert list(make_corrected_commands(Command(script='test'), rule)) \
|
||||
== [CorrectedCommand(script='test!', priority=100)]
|
||||
|
||||
def test_get_corrected_commands(mocker):
|
||||
command = Command('test', 'test', 'test')
|
||||
rules = [Rule(match=lambda _: False),
|
||||
@ -100,3 +47,13 @@ def test_get_corrected_commands(mocker):
|
||||
mocker.patch('thefuck.corrector.get_rules', return_value=rules)
|
||||
assert [cmd.script for cmd in get_corrected_commands(command)] \
|
||||
== ['test!', 'test@', 'test;']
|
||||
|
||||
|
||||
def test_organize_commands():
|
||||
"""Ensures that the function removes duplicates and sorts commands."""
|
||||
commands = [CorrectedCommand('ls'), CorrectedCommand('ls -la', priority=9000),
|
||||
CorrectedCommand('ls -lh', priority=100),
|
||||
CorrectedCommand('ls -lh', priority=9999)]
|
||||
assert list(organize_commands(iter(commands))) \
|
||||
== [CorrectedCommand('ls'), CorrectedCommand('ls -lh', priority=100),
|
||||
CorrectedCommand('ls -la', priority=9000)]
|
||||
|
@ -1,46 +0,0 @@
|
||||
import pytest
|
||||
from subprocess import PIPE
|
||||
from mock import Mock
|
||||
from thefuck import main
|
||||
from tests.utils import Command
|
||||
|
||||
|
||||
class TestGetCommand(object):
|
||||
@pytest.fixture(autouse=True)
|
||||
def Popen(self, monkeypatch):
|
||||
Popen = Mock()
|
||||
Popen.return_value.stdout.read.return_value = b'stdout'
|
||||
Popen.return_value.stderr.read.return_value = b'stderr'
|
||||
monkeypatch.setattr('thefuck.main.Popen', Popen)
|
||||
return Popen
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def prepare(self, monkeypatch):
|
||||
monkeypatch.setattr('thefuck.main.os.environ', {})
|
||||
monkeypatch.setattr('thefuck.main.wait_output', lambda *_: True)
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def generic_shell(self, monkeypatch):
|
||||
monkeypatch.setattr('thefuck.shells.from_shell', lambda x: x)
|
||||
monkeypatch.setattr('thefuck.shells.to_shell', lambda x: x)
|
||||
|
||||
def test_get_command_calls(self, Popen, settings):
|
||||
settings.env = {}
|
||||
assert main.get_command(['thefuck', 'apt-get', 'search', 'vim']) \
|
||||
== Command('apt-get search vim', 'stdout', 'stderr')
|
||||
Popen.assert_called_once_with('apt-get search vim',
|
||||
shell=True,
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
env={})
|
||||
|
||||
@pytest.mark.parametrize('args, result', [
|
||||
(['thefuck', ''], None),
|
||||
(['thefuck', '', ''], None),
|
||||
(['thefuck', 'ls', '-la'], 'ls -la'),
|
||||
(['thefuck', 'ls'], 'ls')])
|
||||
def test_get_command_script(self, args, result):
|
||||
if result:
|
||||
assert main.get_command(args).script == result
|
||||
else:
|
||||
assert main.get_command(args) is None
|
@ -1,12 +1,8 @@
|
||||
from tests.utils import root
|
||||
|
||||
|
||||
def test_readme():
|
||||
with root.joinpath('README.md').open() as f:
|
||||
def test_readme(source_root):
|
||||
with source_root.joinpath('README.md').open() as f:
|
||||
readme = f.read()
|
||||
|
||||
bundled = root \
|
||||
.joinpath('thefuck') \
|
||||
bundled = source_root.joinpath('thefuck') \
|
||||
.joinpath('rules') \
|
||||
.glob('*.py')
|
||||
|
||||
|
@ -1,43 +1,10 @@
|
||||
from thefuck.types import RulesNamesList, Settings, \
|
||||
SortedCorrectedCommandsSequence
|
||||
from tests.utils import Rule, CorrectedCommand
|
||||
|
||||
|
||||
def test_rules_names_list():
|
||||
assert RulesNamesList(['bash', 'lisp']) == ['bash', 'lisp']
|
||||
assert RulesNamesList(['bash', 'lisp']) == RulesNamesList(['bash', 'lisp'])
|
||||
assert Rule('lisp') in RulesNamesList(['lisp'])
|
||||
assert Rule('bash') not in RulesNamesList(['lisp'])
|
||||
|
||||
|
||||
class TestSortedCorrectedCommandsSequence(object):
|
||||
def test_realises_generator_only_on_demand(self, settings):
|
||||
should_realise = False
|
||||
|
||||
def gen():
|
||||
yield CorrectedCommand('git commit')
|
||||
yield CorrectedCommand('git branch', priority=200)
|
||||
assert should_realise
|
||||
yield CorrectedCommand('git checkout', priority=100)
|
||||
|
||||
commands = SortedCorrectedCommandsSequence(gen())
|
||||
assert commands[0] == CorrectedCommand('git commit')
|
||||
should_realise = True
|
||||
assert commands[1] == CorrectedCommand('git checkout', priority=100)
|
||||
assert commands[2] == CorrectedCommand('git branch', priority=200)
|
||||
|
||||
def test_remove_duplicates(self):
|
||||
side_effect = lambda *_: None
|
||||
seq = SortedCorrectedCommandsSequence(
|
||||
iter([CorrectedCommand('ls', priority=100),
|
||||
CorrectedCommand('ls', priority=200),
|
||||
CorrectedCommand('ls', side_effect, 300)]))
|
||||
assert set(seq) == {CorrectedCommand('ls', priority=100),
|
||||
CorrectedCommand('ls', side_effect, 300)}
|
||||
|
||||
def test_with_blank(self):
|
||||
seq = SortedCorrectedCommandsSequence(iter([]))
|
||||
assert list(seq) == []
|
||||
from subprocess import PIPE
|
||||
from mock import Mock
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
from tests.utils import CorrectedCommand, Rule, Command
|
||||
from thefuck import conf
|
||||
from thefuck.exceptions import EmptyCommand
|
||||
|
||||
|
||||
class TestCorrectedCommand(object):
|
||||
@ -51,3 +18,108 @@ class TestCorrectedCommand(object):
|
||||
def test_hashable(self):
|
||||
assert {CorrectedCommand('ls', None, 100),
|
||||
CorrectedCommand('ls', None, 200)} == {CorrectedCommand('ls')}
|
||||
|
||||
|
||||
class TestRule(object):
|
||||
def test_from_path(self, mocker):
|
||||
match = object()
|
||||
get_new_command = object()
|
||||
load_source = mocker.patch(
|
||||
'thefuck.types.load_source',
|
||||
return_value=Mock(match=match,
|
||||
get_new_command=get_new_command,
|
||||
enabled_by_default=True,
|
||||
priority=900,
|
||||
requires_output=True))
|
||||
assert Rule.from_path(Path('/rules/bash.py')) \
|
||||
== Rule('bash', match, get_new_command, priority=900)
|
||||
load_source.assert_called_once_with('bash', '/rules/bash.py')
|
||||
|
||||
@pytest.mark.parametrize('rules, exclude_rules, rule, is_enabled', [
|
||||
(conf.DEFAULT_RULES, [], Rule('git', enabled_by_default=True), True),
|
||||
(conf.DEFAULT_RULES, [], Rule('git', enabled_by_default=False), False),
|
||||
([], [], Rule('git', enabled_by_default=False), False),
|
||||
([], [], Rule('git', enabled_by_default=True), False),
|
||||
(conf.DEFAULT_RULES + ['git'], [], Rule('git', enabled_by_default=False), True),
|
||||
(['git'], [], Rule('git', enabled_by_default=False), True),
|
||||
(conf.DEFAULT_RULES, ['git'], Rule('git', enabled_by_default=True), False),
|
||||
(conf.DEFAULT_RULES, ['git'], Rule('git', enabled_by_default=False), False),
|
||||
([], ['git'], Rule('git', enabled_by_default=True), False),
|
||||
([], ['git'], Rule('git', enabled_by_default=False), False)])
|
||||
def test_is_enabled(self, settings, rules, exclude_rules, rule, is_enabled):
|
||||
settings.update(rules=rules,
|
||||
exclude_rules=exclude_rules)
|
||||
assert rule.is_enabled == is_enabled
|
||||
|
||||
def test_isnt_match(self):
|
||||
assert not Rule('', lambda _: False).is_match(
|
||||
Command('ls'))
|
||||
|
||||
def test_is_match(self):
|
||||
rule = Rule('', lambda x: x.script == 'cd ..')
|
||||
assert rule.is_match(Command('cd ..'))
|
||||
|
||||
@pytest.mark.usefixtures('no_colors')
|
||||
def test_isnt_match_when_rule_failed(self, capsys):
|
||||
rule = Rule('test', Mock(side_effect=OSError('Denied')),
|
||||
requires_output=False)
|
||||
assert not rule.is_match(Command('ls'))
|
||||
assert capsys.readouterr()[1].split('\n')[0] == '[WARN] Rule test:'
|
||||
|
||||
def test_get_corrected_commands_with_rule_returns_list(self):
|
||||
rule = Rule(get_new_command=lambda x: [x.script + '!', x.script + '@'],
|
||||
priority=100)
|
||||
assert list(rule.get_corrected_commands(Command(script='test'))) \
|
||||
== [CorrectedCommand(script='test!', priority=100),
|
||||
CorrectedCommand(script='test@', priority=200)]
|
||||
|
||||
def test_get_corrected_commands_with_rule_returns_command(self):
|
||||
rule = Rule(get_new_command=lambda x: x.script + '!',
|
||||
priority=100)
|
||||
assert list(rule.get_corrected_commands(Command(script='test'))) \
|
||||
== [CorrectedCommand(script='test!', priority=100)]
|
||||
|
||||
|
||||
class TestCommand(object):
|
||||
@pytest.fixture(autouse=True)
|
||||
def Popen(self, monkeypatch):
|
||||
Popen = Mock()
|
||||
Popen.return_value.stdout.read.return_value = b'stdout'
|
||||
Popen.return_value.stderr.read.return_value = b'stderr'
|
||||
monkeypatch.setattr('thefuck.types.Popen', Popen)
|
||||
return Popen
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def prepare(self, monkeypatch):
|
||||
monkeypatch.setattr('thefuck.types.os.environ', {})
|
||||
monkeypatch.setattr('thefuck.types.Command._wait_output',
|
||||
staticmethod(lambda *_: True))
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def generic_shell(self, monkeypatch):
|
||||
monkeypatch.setattr('thefuck.shells.from_shell', lambda x: x)
|
||||
monkeypatch.setattr('thefuck.shells.to_shell', lambda x: x)
|
||||
|
||||
def test_from_script_calls(self, Popen, settings):
|
||||
settings.env = {}
|
||||
assert Command.from_raw_script(
|
||||
['apt-get', 'search', 'vim']) == Command(
|
||||
'apt-get search vim', 'stdout', 'stderr')
|
||||
Popen.assert_called_once_with('apt-get search vim',
|
||||
shell=True,
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
env={})
|
||||
|
||||
@pytest.mark.parametrize('script, result', [
|
||||
([''], None),
|
||||
(['', ''], None),
|
||||
(['ls', '-la'], 'ls -la'),
|
||||
(['ls'], 'ls')])
|
||||
def test_from_script(self, script, result):
|
||||
if result:
|
||||
assert Command.from_raw_script(script).script == result
|
||||
else:
|
||||
with pytest.raises(EmptyCommand):
|
||||
Command.from_raw_script(script)
|
||||
|
||||
|
@ -1,10 +1,9 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
from mock import Mock
|
||||
import pytest
|
||||
from itertools import islice
|
||||
from thefuck import ui
|
||||
from thefuck.types import CorrectedCommand, SortedCorrectedCommandsSequence
|
||||
from thefuck.types import CorrectedCommand
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -41,10 +40,8 @@ def test_read_actions(patch_getch):
|
||||
|
||||
|
||||
def test_command_selector():
|
||||
selector = ui.CommandSelector([1, 2, 3])
|
||||
selector = ui.CommandSelector(iter([1, 2, 3]))
|
||||
assert selector.value == 1
|
||||
changes = []
|
||||
selector.on_change(changes.append)
|
||||
selector.next()
|
||||
assert selector.value == 2
|
||||
selector.next()
|
||||
@ -53,58 +50,55 @@ def test_command_selector():
|
||||
assert selector.value == 1
|
||||
selector.previous()
|
||||
assert selector.value == 3
|
||||
assert changes == [1, 2, 3, 1, 3]
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('no_colors')
|
||||
class TestSelectCommand(object):
|
||||
@pytest.fixture
|
||||
def commands_with_side_effect(self):
|
||||
return SortedCorrectedCommandsSequence(
|
||||
iter([CorrectedCommand('ls', lambda *_: None, 100),
|
||||
CorrectedCommand('cd', lambda *_: None, 100)]))
|
||||
return [CorrectedCommand('ls', lambda *_: None, 100),
|
||||
CorrectedCommand('cd', lambda *_: None, 100)]
|
||||
|
||||
@pytest.fixture
|
||||
def commands(self):
|
||||
return SortedCorrectedCommandsSequence(
|
||||
iter([CorrectedCommand('ls', None, 100),
|
||||
CorrectedCommand('cd', None, 100)]))
|
||||
return [CorrectedCommand('ls', None, 100),
|
||||
CorrectedCommand('cd', None, 100)]
|
||||
|
||||
def test_without_commands(self, capsys):
|
||||
assert ui.select_command([]) is None
|
||||
assert ui.select_command(iter([])) is None
|
||||
assert capsys.readouterr() == ('', 'No fucks given\n')
|
||||
|
||||
def test_without_confirmation(self, capsys, commands, settings):
|
||||
settings.require_confirmation = False
|
||||
assert ui.select_command(commands) == commands[0]
|
||||
assert ui.select_command(iter(commands)) == commands[0]
|
||||
assert capsys.readouterr() == ('', 'ls\n')
|
||||
|
||||
def test_without_confirmation_with_side_effects(
|
||||
self, capsys, commands_with_side_effect, settings):
|
||||
settings.require_confirmation = False
|
||||
assert ui.select_command(commands_with_side_effect) \
|
||||
assert ui.select_command(iter(commands_with_side_effect)) \
|
||||
== commands_with_side_effect[0]
|
||||
assert capsys.readouterr() == ('', 'ls (+side effect)\n')
|
||||
|
||||
def test_with_confirmation(self, capsys, patch_getch, commands):
|
||||
patch_getch(['\n'])
|
||||
assert ui.select_command(commands) == commands[0]
|
||||
assert ui.select_command(iter(commands)) == commands[0]
|
||||
assert capsys.readouterr() == ('', u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\n')
|
||||
|
||||
def test_with_confirmation_abort(self, capsys, patch_getch, commands):
|
||||
patch_getch([KeyboardInterrupt])
|
||||
assert ui.select_command(commands) is None
|
||||
assert ui.select_command(iter(commands)) is None
|
||||
assert capsys.readouterr() == ('', u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\nAborted\n')
|
||||
|
||||
def test_with_confirmation_with_side_effct(self, capsys, patch_getch,
|
||||
commands_with_side_effect):
|
||||
patch_getch(['\n'])
|
||||
assert ui.select_command(commands_with_side_effect)\
|
||||
assert ui.select_command(iter(commands_with_side_effect))\
|
||||
== commands_with_side_effect[0]
|
||||
assert capsys.readouterr() == ('', u'\x1b[1K\rls (+side effect) [enter/↑/↓/ctrl+c]\n')
|
||||
|
||||
def test_with_confirmation_select_second(self, capsys, patch_getch, commands):
|
||||
patch_getch(['\x1b', '[', 'B', '\n'])
|
||||
assert ui.select_command(commands) == commands[1]
|
||||
assert ui.select_command(iter(commands)) == commands[1]
|
||||
assert capsys.readouterr() == (
|
||||
'', u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\x1b[1K\rcd [enter/↑/↓/ctrl+c]\n')
|
||||
|
@ -1,25 +1,25 @@
|
||||
from pathlib import Path
|
||||
from thefuck import types
|
||||
from thefuck.conf import DEFAULT_PRIORITY
|
||||
|
||||
|
||||
def Command(script='', stdout='', stderr=''):
|
||||
return types.Command(script, stdout, stderr)
|
||||
class Command(types.Command):
|
||||
def __init__(self, script='', stdout='', stderr=''):
|
||||
super(Command, self).__init__(script, stdout, stderr)
|
||||
|
||||
|
||||
def Rule(name='', match=lambda *_: True,
|
||||
class Rule(types.Rule):
|
||||
def __init__(self, name='', match=lambda *_: True,
|
||||
get_new_command=lambda *_: '',
|
||||
enabled_by_default=True,
|
||||
side_effect=None,
|
||||
priority=DEFAULT_PRIORITY,
|
||||
requires_output=True):
|
||||
return types.Rule(name, match, get_new_command,
|
||||
super(Rule, self).__init__(name, match, get_new_command,
|
||||
enabled_by_default, side_effect,
|
||||
priority, requires_output)
|
||||
|
||||
|
||||
def CorrectedCommand(script='', side_effect=None, priority=DEFAULT_PRIORITY):
|
||||
return types.CorrectedCommand(script, side_effect, priority)
|
||||
|
||||
|
||||
root = Path(__file__).parent.parent.resolve()
|
||||
class CorrectedCommand(types.CorrectedCommand):
|
||||
def __init__(self, script='', side_effect=None, priority=DEFAULT_PRIORITY):
|
||||
super(CorrectedCommand, self).__init__(
|
||||
script, side_effect, priority)
|
||||
|
116
thefuck/conf.py
116
thefuck/conf.py
@ -1,26 +1,12 @@
|
||||
from imp import load_source
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from six import text_type
|
||||
from .types import RulesNamesList, Settings
|
||||
|
||||
|
||||
class _DefaultRulesNames(RulesNamesList):
|
||||
def __add__(self, items):
|
||||
return _DefaultRulesNames(list(self) + items)
|
||||
|
||||
def __contains__(self, item):
|
||||
return item.enabled_by_default or \
|
||||
super(_DefaultRulesNames, self).__contains__(item)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, _DefaultRulesNames):
|
||||
return super(_DefaultRulesNames, self).__eq__(other)
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
DEFAULT_RULES = _DefaultRulesNames([])
|
||||
ALL_ENABLED = object()
|
||||
DEFAULT_RULES = [ALL_ENABLED]
|
||||
DEFAULT_PRIORITY = 1000
|
||||
|
||||
DEFAULT_SETTINGS = {'rules': DEFAULT_RULES,
|
||||
@ -53,24 +39,62 @@ SETTINGS_HEADER = u"""# ~/.thefuck/settings.py: The Fuck settings file
|
||||
"""
|
||||
|
||||
|
||||
def _settings_from_file(user_dir):
|
||||
class Settings(dict):
|
||||
def __getattr__(self, item):
|
||||
return self.get(item)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
self[key] = value
|
||||
|
||||
def init(self):
|
||||
"""Fills `settings` with values from `settings.py` and env."""
|
||||
from .logs import exception
|
||||
|
||||
self._setup_user_dir()
|
||||
self._init_settings_file()
|
||||
|
||||
try:
|
||||
self.update(self._settings_from_file())
|
||||
except Exception:
|
||||
exception("Can't load settings from file", sys.exc_info())
|
||||
|
||||
try:
|
||||
self.update(self._settings_from_env())
|
||||
except Exception:
|
||||
exception("Can't load settings from env", sys.exc_info())
|
||||
|
||||
def _init_settings_file(self):
|
||||
settings_path = self.user_dir.joinpath('settings.py')
|
||||
if not settings_path.is_file():
|
||||
with settings_path.open(mode='w') as settings_file:
|
||||
settings_file.write(SETTINGS_HEADER)
|
||||
for setting in DEFAULT_SETTINGS.items():
|
||||
settings_file.write(u'# {} = {}\n'.format(*setting))
|
||||
|
||||
def _setup_user_dir(self):
|
||||
"""Returns user config dir, create it when it doesn't exist."""
|
||||
user_dir = Path(os.path.expanduser('~/.thefuck'))
|
||||
rules_dir = user_dir.joinpath('rules')
|
||||
if not rules_dir.is_dir():
|
||||
rules_dir.mkdir(parents=True)
|
||||
self.user_dir = user_dir
|
||||
|
||||
def _settings_from_file(self):
|
||||
"""Loads settings from file."""
|
||||
settings = load_source('settings',
|
||||
text_type(user_dir.joinpath('settings.py')))
|
||||
settings = load_source(
|
||||
'settings', text_type(self.user_dir.joinpath('settings.py')))
|
||||
return {key: getattr(settings, key)
|
||||
for key in DEFAULT_SETTINGS.keys()
|
||||
if hasattr(settings, key)}
|
||||
|
||||
|
||||
def _rules_from_env(val):
|
||||
def _rules_from_env(self, val):
|
||||
"""Transforms rules list from env-string to python."""
|
||||
val = val.split(':')
|
||||
if 'DEFAULT_RULES' in val:
|
||||
val = DEFAULT_RULES + [rule for rule in val if rule != 'DEFAULT_RULES']
|
||||
return val
|
||||
|
||||
|
||||
def _priority_from_env(val):
|
||||
def _priority_from_env(self, val):
|
||||
"""Gets priority pairs from env."""
|
||||
for part in val.split(':'):
|
||||
try:
|
||||
@ -79,14 +103,13 @@ def _priority_from_env(val):
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
|
||||
def _val_from_env(env, attr):
|
||||
def _val_from_env(self, env, attr):
|
||||
"""Transforms env-strings to python."""
|
||||
val = os.environ[env]
|
||||
if attr in ('rules', 'exclude_rules'):
|
||||
return _rules_from_env(val)
|
||||
return self._rules_from_env(val)
|
||||
elif attr == 'priority':
|
||||
return dict(_priority_from_env(val))
|
||||
return dict(self._priority_from_env(val))
|
||||
elif attr == 'wait_command':
|
||||
return int(val)
|
||||
elif attr in ('require_confirmation', 'no_colors', 'debug'):
|
||||
@ -94,44 +117,11 @@ def _val_from_env(env, attr):
|
||||
else:
|
||||
return val
|
||||
|
||||
|
||||
def _settings_from_env():
|
||||
def _settings_from_env(self):
|
||||
"""Loads settings from env."""
|
||||
return {attr: _val_from_env(env, attr)
|
||||
return {attr: self._val_from_env(env, attr)
|
||||
for env, attr in ENV_TO_ATTR.items()
|
||||
if env in os.environ}
|
||||
|
||||
|
||||
settings = Settings(DEFAULT_SETTINGS)
|
||||
|
||||
|
||||
def init_settings(user_dir):
|
||||
"""Fills `settings` with values from `settings.py` and env."""
|
||||
from .logs import exception
|
||||
|
||||
settings.user_dir = user_dir
|
||||
|
||||
try:
|
||||
settings.update(_settings_from_file(user_dir))
|
||||
except Exception:
|
||||
exception("Can't load settings from file", sys.exc_info())
|
||||
|
||||
try:
|
||||
settings.update(_settings_from_env())
|
||||
except Exception:
|
||||
exception("Can't load settings from env", sys.exc_info())
|
||||
|
||||
if not isinstance(settings['rules'], RulesNamesList):
|
||||
settings.rules = RulesNamesList(settings['rules'])
|
||||
|
||||
if not isinstance(settings.exclude_rules, RulesNamesList):
|
||||
settings.exclude_rules = RulesNamesList(settings.exclude_rules)
|
||||
|
||||
|
||||
def initialize_settings_file(user_dir):
|
||||
settings_path = user_dir.joinpath('settings.py')
|
||||
if not settings_path.is_file():
|
||||
with settings_path.open(mode='w') as settings_file:
|
||||
settings_file.write(SETTINGS_HEADER)
|
||||
for setting in DEFAULT_SETTINGS.items():
|
||||
settings_file.write(u'# {} = {}\n'.format(*setting))
|
||||
|
@ -1,38 +1,29 @@
|
||||
import sys
|
||||
from imp import load_source
|
||||
from pathlib import Path
|
||||
from .conf import settings, DEFAULT_PRIORITY
|
||||
from .types import Rule, CorrectedCommand, SortedCorrectedCommandsSequence
|
||||
from .utils import compatibility_call
|
||||
from .conf import settings
|
||||
from .types import Rule
|
||||
from . import logs
|
||||
|
||||
|
||||
def load_rule(rule):
|
||||
"""Imports rule module and returns it."""
|
||||
name = rule.name[:-3]
|
||||
with logs.debug_time(u'Importing rule: {};'.format(name)):
|
||||
rule_module = load_source(name, str(rule))
|
||||
priority = getattr(rule_module, 'priority', DEFAULT_PRIORITY)
|
||||
return Rule(name, rule_module.match,
|
||||
rule_module.get_new_command,
|
||||
getattr(rule_module, 'enabled_by_default', True),
|
||||
getattr(rule_module, 'side_effect', None),
|
||||
settings.priority.get(name, priority),
|
||||
getattr(rule_module, 'requires_output', True))
|
||||
def get_loaded_rules(rules_paths):
|
||||
"""Yields all available rules.
|
||||
|
||||
:type rules_paths: [Path]
|
||||
:rtype: Iterable[Rule]
|
||||
|
||||
def get_loaded_rules(rules):
|
||||
"""Yields all available rules."""
|
||||
for rule in rules:
|
||||
if rule.name != '__init__.py':
|
||||
loaded_rule = load_rule(rule)
|
||||
if loaded_rule in settings.rules and \
|
||||
loaded_rule not in settings.exclude_rules:
|
||||
yield loaded_rule
|
||||
"""
|
||||
for path in rules_paths:
|
||||
if path.name != '__init__.py':
|
||||
rule = Rule.from_path(path)
|
||||
if rule.is_enabled:
|
||||
yield rule
|
||||
|
||||
|
||||
def get_rules():
|
||||
"""Returns all enabled rules."""
|
||||
"""Returns all enabled rules.
|
||||
|
||||
:rtype: [Rule]
|
||||
|
||||
"""
|
||||
bundled = Path(__file__).parent \
|
||||
.joinpath('rules') \
|
||||
.glob('*.py')
|
||||
@ -41,34 +32,44 @@ def get_rules():
|
||||
key=lambda rule: rule.priority)
|
||||
|
||||
|
||||
def is_rule_match(command, rule):
|
||||
"""Returns first matched rule for command."""
|
||||
script_only = command.stdout is None and command.stderr is None
|
||||
def organize_commands(corrected_commands):
|
||||
"""Yields sorted commands without duplicates.
|
||||
|
||||
if script_only and rule.requires_output:
|
||||
return False
|
||||
:type corrected_commands: Iterable[thefuck.types.CorrectedCommand]
|
||||
:rtype: Iterable[thefuck.types.CorrectedCommand]
|
||||
|
||||
"""
|
||||
try:
|
||||
with logs.debug_time(u'Trying rule: {};'.format(rule.name)):
|
||||
if compatibility_call(rule.match, command):
|
||||
return True
|
||||
except Exception:
|
||||
logs.rule_failed(rule, sys.exc_info())
|
||||
first_command = next(corrected_commands)
|
||||
yield first_command
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
without_duplicates = {
|
||||
command for command in sorted(
|
||||
corrected_commands, key=lambda command: command.priority)
|
||||
if command != first_command}
|
||||
|
||||
def make_corrected_commands(command, rule):
|
||||
new_commands = compatibility_call(rule.get_new_command, command)
|
||||
if not isinstance(new_commands, list):
|
||||
new_commands = (new_commands,)
|
||||
for n, new_command in enumerate(new_commands):
|
||||
yield CorrectedCommand(script=new_command,
|
||||
side_effect=rule.side_effect,
|
||||
priority=(n + 1) * rule.priority)
|
||||
sorted_commands = sorted(
|
||||
without_duplicates,
|
||||
key=lambda corrected_command: corrected_command.priority)
|
||||
|
||||
logs.debug('Corrected commands: '.format(
|
||||
', '.join(str(cmd) for cmd in [first_command] + sorted_commands)))
|
||||
|
||||
for command in sorted_commands:
|
||||
yield command
|
||||
|
||||
|
||||
def get_corrected_commands(command):
|
||||
"""Returns generator with sorted and unique corrected commands.
|
||||
|
||||
:type command: thefuck.types.Command
|
||||
:rtype: Iterable[thefuck.types.CorrectedCommand]
|
||||
|
||||
"""
|
||||
corrected_commands = (
|
||||
corrected for rule in get_rules()
|
||||
if is_rule_match(command, rule)
|
||||
for corrected in make_corrected_commands(command, rule))
|
||||
return SortedCorrectedCommandsSequence(corrected_commands)
|
||||
if rule.is_match(command)
|
||||
for corrected in rule.get_corrected_commands(command))
|
||||
return organize_commands(corrected_commands)
|
||||
|
6
thefuck/exceptions.py
Normal file
6
thefuck/exceptions.py
Normal file
@ -0,0 +1,6 @@
|
||||
class EmptyCommand(Exception):
|
||||
"""Raised when empty command passed to `thefuck`."""
|
||||
|
||||
|
||||
class NoRuleMatched(Exception):
|
||||
"""Raised when no rule matched for some command."""
|
104
thefuck/main.py
104
thefuck/main.py
@ -1,114 +1,38 @@
|
||||
from argparse import ArgumentParser
|
||||
from warnings import warn
|
||||
from pathlib import Path
|
||||
from os.path import expanduser
|
||||
from pprint import pformat
|
||||
import pkg_resources
|
||||
from subprocess import Popen, PIPE
|
||||
import os
|
||||
import sys
|
||||
from psutil import Process, TimeoutExpired
|
||||
import colorama
|
||||
import six
|
||||
from . import logs, types, shells
|
||||
from .conf import initialize_settings_file, init_settings, settings
|
||||
from .conf import settings
|
||||
from .corrector import get_corrected_commands
|
||||
from .utils import compatibility_call
|
||||
from .exceptions import EmptyCommand
|
||||
from .utils import get_installation_info
|
||||
from .ui import select_command
|
||||
|
||||
|
||||
def setup_user_dir():
|
||||
"""Returns user config dir, create it when it doesn't exist."""
|
||||
user_dir = Path(expanduser('~/.thefuck'))
|
||||
rules_dir = user_dir.joinpath('rules')
|
||||
if not rules_dir.is_dir():
|
||||
rules_dir.mkdir(parents=True)
|
||||
initialize_settings_file(user_dir)
|
||||
return user_dir
|
||||
|
||||
|
||||
def wait_output(popen):
|
||||
"""Returns `True` if we can get output of the command in the
|
||||
`settings.wait_command` time.
|
||||
|
||||
Command will be killed if it wasn't finished in the time.
|
||||
|
||||
"""
|
||||
proc = Process(popen.pid)
|
||||
try:
|
||||
proc.wait(settings.wait_command)
|
||||
return True
|
||||
except TimeoutExpired:
|
||||
for child in proc.children(recursive=True):
|
||||
child.kill()
|
||||
proc.kill()
|
||||
return False
|
||||
|
||||
|
||||
def get_command(args):
|
||||
"""Creates command from `args` and executes it."""
|
||||
if six.PY2:
|
||||
script = ' '.join(arg.decode('utf-8') for arg in args[1:])
|
||||
else:
|
||||
script = ' '.join(args[1:])
|
||||
|
||||
script = script.strip()
|
||||
if not script:
|
||||
return
|
||||
|
||||
script = shells.from_shell(script)
|
||||
env = dict(os.environ)
|
||||
env.update(settings.env)
|
||||
|
||||
with logs.debug_time(u'Call: {}; with env: {};'.format(script, env)):
|
||||
result = Popen(script, shell=True, stdout=PIPE, stderr=PIPE, env=env)
|
||||
if wait_output(result):
|
||||
stdout = result.stdout.read().decode('utf-8')
|
||||
stderr = result.stderr.read().decode('utf-8')
|
||||
|
||||
logs.debug(u'Received stdout: {}'.format(stdout))
|
||||
logs.debug(u'Received stderr: {}'.format(stderr))
|
||||
|
||||
return types.Command(script, stdout, stderr)
|
||||
else:
|
||||
logs.debug(u'Execution timed out!')
|
||||
return types.Command(script, None, None)
|
||||
|
||||
|
||||
def run_command(old_cmd, command):
|
||||
"""Runs command from rule for passed command."""
|
||||
if command.side_effect:
|
||||
compatibility_call(command.side_effect, old_cmd, command.script)
|
||||
shells.put_to_history(command.script)
|
||||
print(command.script)
|
||||
|
||||
|
||||
# Entry points:
|
||||
|
||||
def fix_command():
|
||||
"""Fixes previous command. Used when `thefuck` called without arguments."""
|
||||
colorama.init()
|
||||
user_dir = setup_user_dir()
|
||||
init_settings(user_dir)
|
||||
settings.init()
|
||||
with logs.debug_time('Total'):
|
||||
logs.debug(u'Run with settings: {}'.format(pformat(settings)))
|
||||
|
||||
command = get_command(sys.argv)
|
||||
|
||||
if not command:
|
||||
try:
|
||||
command = types.Command.from_raw_script(sys.argv[1:])
|
||||
except EmptyCommand:
|
||||
logs.debug('Empty command, nothing to do')
|
||||
return
|
||||
|
||||
corrected_commands = get_corrected_commands(command)
|
||||
selected_command = select_command(corrected_commands)
|
||||
|
||||
if selected_command:
|
||||
run_command(command, selected_command)
|
||||
|
||||
|
||||
def _get_current_version():
|
||||
return pkg_resources.require('thefuck')[0].version
|
||||
selected_command.run(command)
|
||||
|
||||
|
||||
def print_alias(entry_point=True):
|
||||
"""Prints alias for current shell."""
|
||||
if entry_point:
|
||||
warn('`thefuck-alias` is deprecated, use `thefuck --alias` instead.')
|
||||
position = 1
|
||||
@ -128,16 +52,16 @@ def how_to_configure_alias():
|
||||
|
||||
"""
|
||||
colorama.init()
|
||||
user_dir = setup_user_dir()
|
||||
init_settings(user_dir)
|
||||
settings.init()
|
||||
logs.how_to_configure_alias(shells.how_to_configure())
|
||||
|
||||
|
||||
def main():
|
||||
parser = ArgumentParser(prog='thefuck')
|
||||
version = get_installation_info().version
|
||||
parser.add_argument('-v', '--version',
|
||||
action='version',
|
||||
version='%(prog)s {}'.format(_get_current_version()))
|
||||
version='%(prog)s {}'.format(version))
|
||||
parser.add_argument('-a', '--alias',
|
||||
action='store_true',
|
||||
help='[custom-alias-name] prints alias for current shell')
|
||||
|
@ -27,6 +27,6 @@ def git_support(fn, command):
|
||||
expansion = ' '.join(map(quote, split(search.group(2))))
|
||||
new_script = command.script.replace(alias, expansion)
|
||||
|
||||
command = Command._replace(command, script=new_script)
|
||||
command = command.update(script=new_script)
|
||||
|
||||
return fn(command)
|
||||
|
@ -9,9 +9,7 @@ def sudo_support(fn, command):
|
||||
if not command.script.startswith('sudo '):
|
||||
return fn(command)
|
||||
|
||||
result = fn(Command(command.script[5:],
|
||||
command.stdout,
|
||||
command.stderr))
|
||||
result = fn(command.update(script=command.script[5:]))
|
||||
|
||||
if result and isinstance(result, six.string_types):
|
||||
return u'sudo {}'.format(result)
|
||||
|
319
thefuck/types.py
319
thefuck/types.py
@ -1,15 +1,245 @@
|
||||
from collections import namedtuple
|
||||
from traceback import format_stack
|
||||
from imp import load_source
|
||||
import os
|
||||
from subprocess import Popen, PIPE
|
||||
import sys
|
||||
from psutil import Process, TimeoutExpired
|
||||
import six
|
||||
from .conf import settings, DEFAULT_PRIORITY, ALL_ENABLED
|
||||
from .utils import compatibility_call
|
||||
from .exceptions import EmptyCommand
|
||||
from . import logs, shells
|
||||
|
||||
Command = namedtuple('Command', ('script', 'stdout', 'stderr'))
|
||||
|
||||
Rule = namedtuple('Rule', ('name', 'match', 'get_new_command',
|
||||
'enabled_by_default', 'side_effect',
|
||||
'priority', 'requires_output'))
|
||||
class Command(object):
|
||||
"""Command that should be fixed."""
|
||||
|
||||
def __init__(self, script, stdout, stderr):
|
||||
"""Initializes command with given values.
|
||||
|
||||
:type script: basestring
|
||||
:type stdout: basestring
|
||||
:type stderr: basestring
|
||||
|
||||
"""
|
||||
self.script = script
|
||||
self.stdout = stdout
|
||||
self.stderr = stderr
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Command):
|
||||
return (self.script, self.stdout, self.stderr) \
|
||||
== (other.script, other.stdout, other.stderr)
|
||||
else:
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
return 'Command(script={}, stdout={}, stderr={})'.format(
|
||||
self.script, self.stdout, self.stderr)
|
||||
|
||||
def update(self, **kwargs):
|
||||
"""Returns new command with replaced fields.
|
||||
|
||||
:rtype: Command
|
||||
|
||||
"""
|
||||
kwargs.setdefault('script', self.script)
|
||||
kwargs.setdefault('stdout', self.stdout)
|
||||
kwargs.setdefault('stderr', self.stderr)
|
||||
return Command(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
def _wait_output(popen):
|
||||
"""Returns `True` if we can get output of the command in the
|
||||
`settings.wait_command` time.
|
||||
|
||||
Command will be killed if it wasn't finished in the time.
|
||||
|
||||
:type popen: Popen
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
proc = Process(popen.pid)
|
||||
try:
|
||||
proc.wait(settings.wait_command)
|
||||
return True
|
||||
except TimeoutExpired:
|
||||
for child in proc.children(recursive=True):
|
||||
child.kill()
|
||||
proc.kill()
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _prepare_script(raw_script):
|
||||
"""Creates single script from a list of script parts.
|
||||
|
||||
:type raw_script: [basestring]
|
||||
:rtype: basestring
|
||||
|
||||
"""
|
||||
if six.PY2:
|
||||
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
|
||||
else:
|
||||
script = ' '.join(raw_script)
|
||||
|
||||
script = script.strip()
|
||||
return shells.from_shell(script)
|
||||
|
||||
@classmethod
|
||||
def from_raw_script(cls, raw_script):
|
||||
"""Creates instance of `Command` from a list of script parts.
|
||||
|
||||
:type raw_script: [basestring]
|
||||
:rtype: Command
|
||||
:raises: EmptyCommand
|
||||
|
||||
"""
|
||||
script = cls._prepare_script(raw_script)
|
||||
if not script:
|
||||
raise EmptyCommand
|
||||
|
||||
env = dict(os.environ)
|
||||
env.update(settings.env)
|
||||
|
||||
with logs.debug_time(u'Call: {}; with env: {};'.format(script, env)):
|
||||
result = Popen(script, shell=True, stdout=PIPE, stderr=PIPE, env=env)
|
||||
if cls._wait_output(result):
|
||||
stdout = result.stdout.read().decode('utf-8')
|
||||
stderr = result.stderr.read().decode('utf-8')
|
||||
|
||||
logs.debug(u'Received stdout: {}'.format(stdout))
|
||||
logs.debug(u'Received stderr: {}'.format(stderr))
|
||||
|
||||
return cls(script, stdout, stderr)
|
||||
else:
|
||||
logs.debug(u'Execution timed out!')
|
||||
return cls(script, None, None)
|
||||
|
||||
|
||||
class Rule(object):
|
||||
"""Rule for fixing commands."""
|
||||
|
||||
def __init__(self, name, match, get_new_command,
|
||||
enabled_by_default, side_effect,
|
||||
priority, requires_output):
|
||||
"""Initializes rule with given fields.
|
||||
|
||||
:type name: basestring
|
||||
:type match: (Command) -> bool
|
||||
:type get_new_command: (Command) -> (basestring | [basestring])
|
||||
:type enabled_by_default: boolean
|
||||
:type side_effect: (Command, basestring) -> None
|
||||
:type priority: int
|
||||
:type requires_output: bool
|
||||
|
||||
"""
|
||||
self.name = name
|
||||
self.match = match
|
||||
self.get_new_command = get_new_command
|
||||
self.enabled_by_default = enabled_by_default
|
||||
self.side_effect = side_effect
|
||||
self.priority = priority
|
||||
self.requires_output = requires_output
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Rule):
|
||||
return (self.name, self.match, self.get_new_command,
|
||||
self.enabled_by_default, self.side_effect,
|
||||
self.priority, self.requires_output) \
|
||||
== (other.name, other.match, other.get_new_command,
|
||||
other.enabled_by_default, other.side_effect,
|
||||
other.priority, other.requires_output)
|
||||
else:
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
return 'Rule(name={}, match={}, get_new_command={}, ' \
|
||||
'enabled_by_default={}, side_effect={}, ' \
|
||||
'priority={}, requires_output)'.format(
|
||||
self.name, self.match, self.get_new_command,
|
||||
self.enabled_by_default, self.side_effect,
|
||||
self.priority, self.requires_output)
|
||||
|
||||
@classmethod
|
||||
def from_path(cls, path):
|
||||
"""Creates rule instance from path.
|
||||
|
||||
:type path: pathlib.Path
|
||||
:rtype: Rule
|
||||
|
||||
"""
|
||||
name = path.name[:-3]
|
||||
with logs.debug_time(u'Importing rule: {};'.format(name)):
|
||||
rule_module = load_source(name, str(path))
|
||||
priority = getattr(rule_module, 'priority', DEFAULT_PRIORITY)
|
||||
return cls(name, rule_module.match,
|
||||
rule_module.get_new_command,
|
||||
getattr(rule_module, 'enabled_by_default', True),
|
||||
getattr(rule_module, 'side_effect', None),
|
||||
settings.priority.get(name, priority),
|
||||
getattr(rule_module, 'requires_output', True))
|
||||
|
||||
@property
|
||||
def is_enabled(self):
|
||||
"""Returns `True` when rule enabled.
|
||||
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if self.name in settings.exclude_rules:
|
||||
return False
|
||||
elif self.name in settings.rules:
|
||||
return True
|
||||
elif self.enabled_by_default and ALL_ENABLED in settings.rules:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def is_match(self, command):
|
||||
"""Returns `True` if rule matches the command.
|
||||
|
||||
:type command: Command
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
script_only = command.stdout is None and command.stderr is None
|
||||
|
||||
if script_only and self.requires_output:
|
||||
return False
|
||||
|
||||
try:
|
||||
with logs.debug_time(u'Trying rule: {};'.format(self.name)):
|
||||
if compatibility_call(self.match, command):
|
||||
return True
|
||||
except Exception:
|
||||
logs.rule_failed(self, sys.exc_info())
|
||||
|
||||
def get_corrected_commands(self, command):
|
||||
"""Returns generator with corrected commands.
|
||||
|
||||
:type command: Command
|
||||
:rtype: Iterable[CorrectedCommand]
|
||||
|
||||
"""
|
||||
new_commands = compatibility_call(self.get_new_command, command)
|
||||
if not isinstance(new_commands, list):
|
||||
new_commands = (new_commands,)
|
||||
for n, new_command in enumerate(new_commands):
|
||||
yield CorrectedCommand(script=new_command,
|
||||
side_effect=self.side_effect,
|
||||
priority=(n + 1) * self.priority)
|
||||
|
||||
|
||||
class CorrectedCommand(object):
|
||||
"""Corrected by rule command."""
|
||||
|
||||
def __init__(self, script, side_effect, priority):
|
||||
"""Initializes instance with given fields.
|
||||
|
||||
:type script: basestring
|
||||
:type side_effect: (Command, basestring) -> None
|
||||
:type priority: int
|
||||
|
||||
"""
|
||||
self.script = script
|
||||
self.side_effect = side_effect
|
||||
self.priority = priority
|
||||
@ -29,76 +259,13 @@ class CorrectedCommand(object):
|
||||
return 'CorrectedCommand(script={}, side_effect={}, priority={})'.format(
|
||||
self.script, self.side_effect, self.priority)
|
||||
|
||||
def run(self, old_cmd):
|
||||
"""Runs command from rule for passed command.
|
||||
|
||||
class RulesNamesList(list):
|
||||
"""Wrapper a top of list for storing rules names."""
|
||||
|
||||
def __contains__(self, item):
|
||||
return super(RulesNamesList, self).__contains__(item.name)
|
||||
|
||||
|
||||
class Settings(dict):
|
||||
def __getattr__(self, item):
|
||||
return self.get(item)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
self[key] = value
|
||||
|
||||
|
||||
class SortedCorrectedCommandsSequence(object):
|
||||
"""List-like collection/wrapper around generator, that:
|
||||
|
||||
- immediately gives access to the first commands through [];
|
||||
- realises generator and sorts commands on first access to other
|
||||
commands through [], or when len called.
|
||||
:type old_cmd: Command
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, commands):
|
||||
self._commands = commands
|
||||
self._cached = self._realise_first()
|
||||
self._realised = False
|
||||
|
||||
def _realise_first(self):
|
||||
try:
|
||||
return [next(self._commands)]
|
||||
except StopIteration:
|
||||
return []
|
||||
|
||||
def _remove_duplicates(self, corrected_commands):
|
||||
"""Removes low-priority duplicates."""
|
||||
commands = {command
|
||||
for command in sorted(corrected_commands,
|
||||
key=lambda command: -command.priority)
|
||||
if command.script != self._cached[0]}
|
||||
return commands
|
||||
|
||||
def _realise(self):
|
||||
"""Realises generator, removes duplicates and sorts commands."""
|
||||
from .logs import debug
|
||||
|
||||
if self._cached:
|
||||
commands = self._remove_duplicates(self._commands)
|
||||
self._cached = [self._cached[0]] + sorted(
|
||||
commands, key=lambda corrected_command: corrected_command.priority)
|
||||
self._realised = True
|
||||
debug('SortedCommandsSequence was realised with: {}, after: {}'.format(
|
||||
self._cached, '\n'.join(format_stack())))
|
||||
|
||||
def __getitem__(self, item):
|
||||
if item != 0 and not self._realised:
|
||||
self._realise()
|
||||
return self._cached[item]
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self._cached)
|
||||
|
||||
def __len__(self):
|
||||
if not self._realised:
|
||||
self._realise()
|
||||
return len(self._cached)
|
||||
|
||||
def __iter__(self):
|
||||
if not self._realised:
|
||||
self._realise()
|
||||
return iter(self._cached)
|
||||
if self.side_effect:
|
||||
compatibility_call(self.side_effect, old_cmd, self.script)
|
||||
shells.put_to_history(self.script)
|
||||
print(self.script)
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
import sys
|
||||
from .conf import settings
|
||||
from .exceptions import NoRuleMatched
|
||||
from . import logs
|
||||
|
||||
try:
|
||||
@ -50,27 +51,36 @@ def read_actions():
|
||||
|
||||
|
||||
class CommandSelector(object):
|
||||
"""Helper for selecting rule from rules list."""
|
||||
|
||||
def __init__(self, commands):
|
||||
self._commands = commands
|
||||
""":type commands: Iterable[thefuck.types.CorrectedCommand]"""
|
||||
self._commands_gen = commands
|
||||
try:
|
||||
self._commands = [next(self._commands_gen)]
|
||||
except StopIteration:
|
||||
raise NoRuleMatched
|
||||
self._realised = False
|
||||
self._index = 0
|
||||
self._on_change = lambda x: x
|
||||
|
||||
def _realise(self):
|
||||
if not self._realised:
|
||||
self._commands += list(self._commands_gen)
|
||||
self._realised = True
|
||||
|
||||
def next(self):
|
||||
self._realise()
|
||||
self._index = (self._index + 1) % len(self._commands)
|
||||
self._on_change(self.value)
|
||||
|
||||
def previous(self):
|
||||
self._realise()
|
||||
self._index = (self._index - 1) % len(self._commands)
|
||||
self._on_change(self.value)
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
""":rtype hefuck.types.CorrectedCommand"""
|
||||
return self._commands[self._index]
|
||||
|
||||
def on_change(self, fn):
|
||||
self._on_change = fn
|
||||
fn(self.value)
|
||||
|
||||
|
||||
def select_command(corrected_commands):
|
||||
"""Returns:
|
||||
@ -79,17 +89,22 @@ def select_command(corrected_commands):
|
||||
- None when ctrl+c pressed;
|
||||
- selected command.
|
||||
|
||||
:type corrected_commands: Iterable[thefuck.types.CorrectedCommand]
|
||||
:rtype: thefuck.types.CorrectedCommand | None
|
||||
|
||||
"""
|
||||
if not corrected_commands:
|
||||
try:
|
||||
selector = CommandSelector(corrected_commands)
|
||||
except NoRuleMatched:
|
||||
logs.failed('No fucks given')
|
||||
return
|
||||
|
||||
selector = CommandSelector(corrected_commands)
|
||||
if not settings.require_confirmation:
|
||||
logs.show_corrected_command(selector.value)
|
||||
return selector.value
|
||||
|
||||
selector.on_change(lambda val: logs.confirm_text(val))
|
||||
logs.confirm_text(selector.value)
|
||||
|
||||
for action in read_actions():
|
||||
if action == SELECT:
|
||||
sys.stderr.write('\n')
|
||||
@ -99,5 +114,7 @@ def select_command(corrected_commands):
|
||||
return
|
||||
elif action == PREVIOUS:
|
||||
selector.previous()
|
||||
logs.confirm_text(selector.value)
|
||||
elif action == NEXT:
|
||||
selector.next()
|
||||
logs.confirm_text(selector.value)
|
||||
|
@ -4,7 +4,6 @@ import shelve
|
||||
from warnings import warn
|
||||
from decorator import decorator
|
||||
from contextlib import closing
|
||||
import tempfile
|
||||
|
||||
import os
|
||||
import pickle
|
||||
@ -99,8 +98,7 @@ def get_all_executables():
|
||||
return fallback
|
||||
|
||||
tf_alias = thefuck_alias()
|
||||
tf_entry_points = pkg_resources.require('thefuck')[0]\
|
||||
.get_entry_map()\
|
||||
tf_entry_points = get_installation_info().get_entry_map()\
|
||||
.get('console_scripts', {})\
|
||||
.keys()
|
||||
bins = [exe.name
|
||||
@ -224,3 +222,7 @@ def compatibility_call(fn, *args):
|
||||
.format(fn.__name__, fn.__module__))
|
||||
args += (settings,)
|
||||
return fn(*args)
|
||||
|
||||
|
||||
def get_installation_info():
|
||||
return pkg_resources.require('thefuck')[0]
|
||||
|
Loading…
x
Reference in New Issue
Block a user