1
0
mirror of https://github.com/nvbn/thefuck.git synced 2025-01-31 10:11:14 +00:00

#298 Add ability to chose matched rule

This commit is contained in:
nvbn 2015-07-28 22:04:27 +03:00
parent 4fc18cb4e7
commit 7933e963d8
10 changed files with 407 additions and 239 deletions

View File

@ -212,7 +212,7 @@ in `~/.thefuck/rules`. The rule should contain two functions:
```python ```python
match(command: Command, settings: Settings) -> bool match(command: Command, settings: Settings) -> bool
get_new_command(command: Command, settings: Settings) -> str get_new_command(command: Command, settings: Settings) -> str | list[str]
``` ```
Also the rule can contain an optional function `side_effect(command: Command, settings: Settings) -> None` Also the rule can contain an optional function `side_effect(command: Command, settings: Settings) -> None`

View File

@ -22,7 +22,7 @@ elif (3, 0) < version < (3, 3):
VERSION = '2.5.6' VERSION = '2.5.6'
install_requires = ['psutil', 'colorama', 'six'] install_requires = ['psutil', 'colorama', 'six', 'getch']
extras_require = {':python_version<"3.4"': ['pathlib']} extras_require = {':python_version<"3.4"': ['pathlib']}
setup(name='thefuck', setup(name='thefuck',

88
tests/test_corrector.py Normal file
View File

@ -0,0 +1,88 @@
import pytest
from pathlib import PosixPath, Path
from mock import Mock
from thefuck import corrector, conf, types
from tests.utils import Rule, Command
from thefuck.corrector import make_corrected_commands, get_corrected_commands
def test_load_rule(mocker):
match = object()
get_new_command = object()
load_source = mocker.patch(
'thefuck.corrector.load_source',
return_value=Mock(match=match,
get_new_command=get_new_command,
enabled_by_default=True,
priority=900,
requires_output=True))
assert corrector.load_rule(Path('/rules/bash.py'), settings=Mock(priority={})) \
== Rule('bash', match, get_new_command, priority=900)
load_source.assert_called_once_with('bash', '/rules/bash.py')
class TestGetRules(object):
@pytest.fixture(autouse=True)
def glob(self, mocker):
return mocker.patch('thefuck.corrector.Path.glob', return_value=[])
def _compare_names(self, rules, names):
return [r.name for r in rules] == names
@pytest.mark.parametrize('conf_rules, rules', [
(conf.DEFAULT_RULES, ['bash', 'lisp', 'bash', 'lisp']),
(types.RulesNamesList(['bash']), ['bash', 'bash'])])
def test_get(self, monkeypatch, glob, conf_rules, rules):
glob.return_value = [PosixPath('bash.py'), PosixPath('lisp.py')]
monkeypatch.setattr('thefuck.corrector.load_source',
lambda x, _: Rule(x))
assert self._compare_names(
corrector.get_rules(Path('~'), Mock(rules=conf_rules, priority={})),
rules)
class TestGetMatchedRules(object):
def test_no_match(self):
assert list(corrector.get_matched_rules(
Command('ls'), [Rule('', lambda *_: False)],
Mock(no_colors=True))) == []
def test_match(self):
rule = Rule('', lambda x, _: x.script == 'cd ..')
assert list(corrector.get_matched_rules(
Command('cd ..'), [rule], Mock(no_colors=True))) == [rule]
def test_when_rule_failed(self, capsys):
all(corrector.get_matched_rules(
Command('ls'), [Rule('test', Mock(side_effect=OSError('Denied')),
requires_output=False)],
Mock(no_colors=True, debug=False)))
assert capsys.readouterr()[1].split('\n')[0] == '[WARN] Rule test:'
class TestGetCorrectedCommands(object):
def test_with_rule_returns_list(self):
rule = Rule(get_new_command=lambda x, _: [x.script + '!', x.script + '@'],
priority=100)
assert list(make_corrected_commands(Command(script='test'), [rule], None)) \
== [types.CorrectedCommand(script='test!', priority=100, side_effect=None),
types.CorrectedCommand(script='test@', priority=200, side_effect=None)]
def test_with_rule_returns_command(self):
rule = Rule(get_new_command=lambda x, _: x.script + '!',
priority=100)
assert list(make_corrected_commands(Command(script='test'), [rule], None)) \
== [types.CorrectedCommand(script='test!', priority=100, side_effect=None)]
def test_get_corrected_commands(mocker):
command = Command('test', 'test', 'test')
rules = [Rule(match=lambda *_: False),
Rule(match=lambda *_: True,
get_new_command=lambda x, _: x.script + '!', priority=100),
Rule(match=lambda *_: True,
get_new_command=lambda x, _: [x.script + '@', x.script + ';'],
priority=60)]
mocker.patch('thefuck.corrector.get_rules', return_value=rules)
assert [cmd.script for cmd in get_corrected_commands(command, None, Mock(debug=False))]\
== ['test@', 'test!', 'test;']

View File

@ -1,61 +1,8 @@
import pytest import pytest
from subprocess import PIPE from subprocess import PIPE
from pathlib import PosixPath, Path
from mock import Mock from mock import Mock
from thefuck import main, conf, types from thefuck import main
from tests.utils import Rule, Command from tests.utils import Command
def test_load_rule(mocker):
match = object()
get_new_command = object()
load_source = mocker.patch(
'thefuck.main.load_source',
return_value=Mock(match=match,
get_new_command=get_new_command,
enabled_by_default=True,
priority=900,
requires_output=True))
assert main.load_rule(Path('/rules/bash.py')) \
== Rule('bash', match, get_new_command, priority=900)
load_source.assert_called_once_with('bash', '/rules/bash.py')
class TestGetRules(object):
@pytest.fixture(autouse=True)
def glob(self, mocker):
return mocker.patch('thefuck.main.Path.glob', return_value=[])
def _compare_names(self, rules, names):
return [r.name for r in rules] == names
@pytest.mark.parametrize('conf_rules, rules', [
(conf.DEFAULT_RULES, ['bash', 'lisp', 'bash', 'lisp']),
(types.RulesNamesList(['bash']), ['bash', 'bash'])])
def test_get(self, monkeypatch, glob, conf_rules, rules):
glob.return_value = [PosixPath('bash.py'), PosixPath('lisp.py')]
monkeypatch.setattr('thefuck.main.load_source',
lambda x, _: Rule(x))
assert self._compare_names(
main.get_rules(Path('~'), Mock(rules=conf_rules, priority={})),
rules)
@pytest.mark.parametrize('priority, unordered, ordered', [
({},
[Rule('bash', priority=100), Rule('python', priority=5)],
['python', 'bash']),
({},
[Rule('lisp', priority=9999), Rule('c', priority=conf.DEFAULT_PRIORITY)],
['c', 'lisp']),
({'python': 9999},
[Rule('bash', priority=100), Rule('python', priority=5)],
['bash', 'python'])])
def test_ordered_by_priority(self, monkeypatch, priority, unordered, ordered):
monkeypatch.setattr('thefuck.main._get_loaded_rules',
lambda *_: unordered)
assert self._compare_names(
main.get_rules(Path('~'), Mock(priority=priority)),
ordered)
class TestGetCommand(object): class TestGetCommand(object):
@ -79,7 +26,7 @@ class TestGetCommand(object):
def test_get_command_calls(self, Popen): def test_get_command_calls(self, Popen):
assert main.get_command(Mock(env={}), assert main.get_command(Mock(env={}),
['thefuck', 'apt-get', 'search', 'vim']) \ ['thefuck', 'apt-get', 'search', 'vim']) \
== Command('apt-get search vim', 'stdout', 'stderr') == Command('apt-get search vim', 'stdout', 'stderr')
Popen.assert_called_once_with('apt-get search vim', Popen.assert_called_once_with('apt-get search vim',
shell=True, shell=True,
@ -95,80 +42,3 @@ class TestGetCommand(object):
assert main.get_command(Mock(env={}), args).script == result assert main.get_command(Mock(env={}), args).script == result
else: else:
assert main.get_command(Mock(env={}), args) is None assert main.get_command(Mock(env={}), args) is None
class TestGetMatchedRule(object):
def test_no_match(self):
assert main.get_matched_rule(
Command('ls'), [Rule('', lambda *_: False)],
Mock(no_colors=True)) is None
def test_match(self):
rule = Rule('', lambda x, _: x.script == 'cd ..')
assert main.get_matched_rule(
Command('cd ..'), [rule], Mock(no_colors=True)) == rule
def test_when_rule_failed(self, capsys):
main.get_matched_rule(
Command('ls'), [Rule('test', Mock(side_effect=OSError('Denied')))],
Mock(no_colors=True, debug=False))
assert capsys.readouterr()[1].split('\n')[0] == '[WARN] Rule test:'
class TestRunRule(object):
@pytest.fixture(autouse=True)
def confirm(self, mocker):
return mocker.patch('thefuck.main.confirm', return_value=True)
def test_run_rule(self, capsys):
main.run_rule(Rule(get_new_command=lambda *_: 'new-command'),
Command(), None)
assert capsys.readouterr() == ('new-command\n', '')
def test_run_rule_with_side_effect(self, capsys):
side_effect = Mock()
settings = Mock(debug=False)
command = Command()
main.run_rule(Rule(get_new_command=lambda *_: 'new-command',
side_effect=side_effect),
command, settings)
assert capsys.readouterr() == ('new-command\n', '')
side_effect.assert_called_once_with(command, settings)
def test_when_not_comfirmed(self, capsys, confirm):
confirm.return_value = False
main.run_rule(Rule(get_new_command=lambda *_: 'new-command'),
Command(), None)
assert capsys.readouterr() == ('', '')
class TestConfirm(object):
@pytest.fixture
def stdin(self, mocker):
return mocker.patch('sys.stdin.read', return_value='\n')
def test_when_not_required(self, capsys):
assert main.confirm('command', None, Mock(require_confirmation=False))
assert capsys.readouterr() == ('', 'command\n')
def test_with_side_effect_and_without_confirmation(self, capsys):
assert main.confirm('command', Mock(), Mock(require_confirmation=False))
assert capsys.readouterr() == ('', 'command (+side effect)\n')
# `stdin` fixture should be applied after `capsys`
def test_when_confirmation_required_and_confirmed(self, capsys, stdin):
assert main.confirm('command', None, Mock(require_confirmation=True,
no_colors=True))
assert capsys.readouterr() == ('', 'command [enter/ctrl+c]')
# `stdin` fixture should be applied after `capsys`
def test_when_confirmation_required_and_confirmed_with_side_effect(self, capsys, stdin):
assert main.confirm('command', Mock(), Mock(require_confirmation=True,
no_colors=True))
assert capsys.readouterr() == ('', 'command (+side effect) [enter/ctrl+c]')
def test_when_confirmation_required_and_aborted(self, capsys, stdin):
stdin.side_effect = KeyboardInterrupt
assert not main.confirm('command', None, Mock(require_confirmation=True,
no_colors=True))
assert capsys.readouterr() == ('', 'command [enter/ctrl+c]Aborted\n')

115
tests/test_ui.py Normal file
View File

@ -0,0 +1,115 @@
from mock import Mock
import pytest
from itertools import islice
from thefuck import ui
from thefuck.types import CorrectedCommand
@pytest.fixture
def patch_getch(monkeypatch):
def patch(vals):
def getch():
for val in vals:
if val == KeyboardInterrupt:
raise val
else:
yield val
getch_gen = getch()
monkeypatch.setattr('thefuck.ui.getch', lambda: next(getch_gen))
return patch
def test_read_actions(patch_getch):
patch_getch([ # Enter:
'\n',
# Enter:
'\r',
# Ignored:
'x', 'y',
# Up:
'\x1b', '[', 'A',
# Down:
'\x1b', '[', 'B',
# Ctrl+C:
KeyboardInterrupt], )
assert list(islice(ui.read_actions(), 5)) \
== [ui.SELECT, ui.SELECT, ui.PREVIOUS, ui.NEXT, ui.ABORT]
def test_command_selector():
selector = ui.CommandSelector([1, 2, 3])
assert selector.value == 1
changes = []
selector.on_change(changes.append)
selector.next()
assert selector.value == 2
selector.next()
assert selector.value == 3
selector.next()
assert selector.value == 1
selector.previous()
assert selector.value == 3
assert changes == [1, 2, 3, 1, 3]
class TestSelectCommand(object):
@pytest.fixture
def commands_with_side_effect(self):
return [CorrectedCommand('ls', lambda *_: None, 100),
CorrectedCommand('cd', lambda *_: None, 100)]
@pytest.fixture
def commands(self):
return [CorrectedCommand('ls', None, 100),
CorrectedCommand('cd', None, 100)]
def test_without_commands(self, capsys):
assert ui.select_command([], Mock(debug=False, no_color=True)) is None
assert capsys.readouterr() == ('', 'No fuck given\n')
def test_without_confirmation(self, capsys, commands):
assert ui.select_command(commands,
Mock(debug=False, no_color=True,
require_confirmation=False)) == commands[0]
assert capsys.readouterr() == ('', 'ls\n')
def test_without_confirmation_with_side_effects(self, capsys,
commands_with_side_effect):
assert ui.select_command(commands_with_side_effect,
Mock(debug=False, no_color=True,
require_confirmation=False)) \
== commands_with_side_effect[0]
assert capsys.readouterr() == ('', 'ls (+side effect)\n')
def test_with_confirmation(self, capsys, patch_getch, commands):
patch_getch(['\n'])
assert ui.select_command(commands,
Mock(debug=False, no_color=True,
require_confirmation=True)) == commands[0]
assert capsys.readouterr() == ('', '\x1b[1K\rls [enter/↑/↓/ctrl+c]\n')
def test_with_confirmation_abort(self, capsys, patch_getch, commands):
patch_getch([KeyboardInterrupt])
assert ui.select_command(commands,
Mock(debug=False, no_color=True,
require_confirmation=True)) is None
assert capsys.readouterr() == ('', '\x1b[1K\rls [enter/↑/↓/ctrl+c]\nAborted\n')
def test_with_confirmation_with_side_effct(self, capsys, patch_getch,
commands_with_side_effect):
patch_getch(['\n'])
assert ui.select_command(commands_with_side_effect,
Mock(debug=False, no_color=True,
require_confirmation=True))\
== commands_with_side_effect[0]
assert capsys.readouterr() == ('', '\x1b[1K\rls (+side effect) [enter/↑/↓/ctrl+c]\n')
def test_with_confirmation_select_second(self, capsys, patch_getch, commands):
patch_getch(['\x1b', '[', 'B', '\n'])
assert ui.select_command(commands,
Mock(debug=False, no_color=True,
require_confirmation=True)) == commands[1]
assert capsys.readouterr() == (
'', '\x1b[1K\rls [enter/↑/↓/ctrl+c]\x1b[1K\rcd [enter/↑/↓/ctrl+c]\n')

77
thefuck/corrector.py Normal file
View File

@ -0,0 +1,77 @@
from imp import load_source
from pathlib import Path
from . import conf, types, logs
import sys
def load_rule(rule, settings):
"""Imports rule module and returns it."""
name = rule.name[:-3]
rule_module = load_source(name, str(rule))
priority = getattr(rule_module, 'priority', conf.DEFAULT_PRIORITY)
return types.Rule(name, rule_module.match,
rule_module.get_new_command,
getattr(rule_module, 'enabled_by_default', True),
getattr(rule_module, 'side_effect', None),
settings.priority.get(name, priority),
getattr(rule_module, 'requires_output', True))
def get_loaded_rules(rules, settings):
"""Yields all available rules."""
for rule in rules:
if rule.name != '__init__.py':
loaded_rule = load_rule(rule, settings)
if loaded_rule in settings.rules:
yield loaded_rule
def get_rules(user_dir, settings):
"""Returns all enabled rules."""
bundled = Path(__file__).parent \
.joinpath('rules') \
.glob('*.py')
user = user_dir.joinpath('rules').glob('*.py')
return get_loaded_rules(sorted(bundled) + sorted(user), settings)
def get_matched_rules(command, rules, settings):
"""Returns first matched rule for command."""
script_only = command.stdout is None and command.stderr is None
for rule in rules:
if script_only and rule.requires_output:
continue
try:
with logs.debug_time(u'Trying rule: {};'.format(rule.name),
settings):
if rule.match(command, settings):
yield rule
except Exception:
logs.rule_failed(rule, sys.exc_info(), settings)
def make_corrected_commands(command, rules, settings):
for rule in rules:
new_commands = rule.get_new_command(command, settings)
if not isinstance(new_commands, list):
new_commands = [new_commands]
for n, new_command in enumerate(new_commands):
yield types.CorrectedCommand(script=new_command,
side_effect=rule.side_effect,
priority=(n + 1) * rule.priority)
def get_corrected_commands(command, user_dir, settings):
rules = list(get_rules(user_dir, settings))
logs.debug(
u'Loaded rules: {}'.format(', '.join(rule.name for rule in rules)),
settings)
matched = list(get_matched_rules(command, rules, settings))
logs.debug(
u'Matched rules: {}'.format(', '.join(rule.name for rule in matched)),
settings)
corrected_commands = make_corrected_commands(command, matched, settings)
return sorted(corrected_commands,
key=lambda corrected_command: corrected_command.priority)

View File

@ -28,27 +28,6 @@ def rule_failed(rule, exc_info, settings):
exception('Rule {}'.format(rule.name), exc_info, settings) exception('Rule {}'.format(rule.name), exc_info, settings)
def show_command(new_command, side_effect, settings):
sys.stderr.write('{bold}{command}{reset}{side_effect}\n'.format(
command=new_command,
side_effect=' (+side effect)' if side_effect else '',
bold=color(colorama.Style.BRIGHT, settings),
reset=color(colorama.Style.RESET_ALL, settings)))
def confirm_command(new_command, side_effect, settings):
sys.stderr.write(
'{bold}{command}{reset}{side_effect} '
'[{green}enter{reset}/{red}ctrl+c{reset}]'.format(
command=new_command,
side_effect=' (+side effect)' if side_effect else '',
bold=color(colorama.Style.BRIGHT, settings),
green=color(colorama.Fore.GREEN, settings),
red=color(colorama.Fore.RED, settings),
reset=color(colorama.Style.RESET_ALL, settings)))
sys.stderr.flush()
def failed(msg, settings): def failed(msg, settings):
sys.stderr.write('{red}{msg}{reset}\n'.format( sys.stderr.write('{red}{msg}{reset}\n'.format(
msg=msg, msg=msg,
@ -56,6 +35,27 @@ def failed(msg, settings):
reset=color(colorama.Style.RESET_ALL, settings))) reset=color(colorama.Style.RESET_ALL, settings)))
def show_corrected_command(corrected_command, settings):
sys.stderr.write('{bold}{script}{reset}{side_effect}\n'.format(
script=corrected_command.script,
side_effect=' (+side effect)' if corrected_command.side_effect else '',
bold=color(colorama.Style.BRIGHT, settings),
reset=color(colorama.Style.RESET_ALL, settings)))
def confirm_text(corrected_command, settings):
sys.stderr.write(
'\033[1K\r{bold}{script}{reset}{side_effect} '
'[{green}enter{reset}/{blue}{reset}/{blue}{reset}/{red}ctrl+c{reset}]'.format(
script=corrected_command.script,
side_effect=' (+side effect)' if corrected_command.side_effect else '',
bold=color(colorama.Style.BRIGHT, settings),
green=color(colorama.Fore.GREEN, settings),
red=color(colorama.Fore.RED, settings),
reset=color(colorama.Style.RESET_ALL, settings),
blue=color(colorama.Fore.BLUE, settings)))
def debug(msg, settings): def debug(msg, settings):
if settings.debug: if settings.debug:
sys.stderr.write(u'{blue}{bold}DEBUG:{reset} {msg}\n'.format( sys.stderr.write(u'{blue}{bold}DEBUG:{reset} {msg}\n'.format(

View File

@ -1,4 +1,3 @@
from imp import load_source
from pathlib import Path from pathlib import Path
from os.path import expanduser from os.path import expanduser
from pprint import pformat from pprint import pformat
@ -9,6 +8,8 @@ from psutil import Process, TimeoutExpired
import colorama import colorama
import six import six
from . import logs, conf, types, shells from . import logs, conf, types, shells
from .corrector import get_corrected_commands
from .ui import select_command
def setup_user_dir(): def setup_user_dir():
@ -21,37 +22,6 @@ def setup_user_dir():
return user_dir return user_dir
def load_rule(rule):
"""Imports rule module and returns it."""
rule_module = load_source(rule.name[:-3], str(rule))
return types.Rule(rule.name[:-3], rule_module.match,
rule_module.get_new_command,
getattr(rule_module, 'enabled_by_default', True),
getattr(rule_module, 'side_effect', None),
getattr(rule_module, 'priority', conf.DEFAULT_PRIORITY),
getattr(rule_module, 'requires_output', True))
def _get_loaded_rules(rules, settings):
"""Yields all available rules."""
for rule in rules:
if rule.name != '__init__.py':
loaded_rule = load_rule(rule)
if loaded_rule in settings.rules:
yield loaded_rule
def get_rules(user_dir, settings):
"""Returns all enabled rules."""
bundled = Path(__file__).parent \
.joinpath('rules') \
.glob('*.py')
user = user_dir.joinpath('rules').glob('*.py')
rules = _get_loaded_rules(sorted(bundled) + sorted(user), settings)
return sorted(rules, key=lambda rule: settings.priority.get(
rule.name, rule.priority))
def wait_output(settings, popen): def wait_output(settings, popen):
"""Returns `True` if we can get output of the command in the """Returns `True` if we can get output of the command in the
`wait_command` time. `wait_command` time.
@ -100,46 +70,12 @@ def get_command(settings, args):
return types.Command(script, None, None) return types.Command(script, None, None)
def get_matched_rule(command, rules, settings): def run_command(command, settings):
"""Returns first matched rule for command."""
script_only = command.stdout is None and command.stderr is None
for rule in rules:
if script_only and rule.requires_output:
continue
try:
with logs.debug_time(u'Trying rule: {};'.format(rule.name),
settings):
if rule.match(command, settings):
return rule
except Exception:
logs.rule_failed(rule, sys.exc_info(), settings)
def confirm(new_command, side_effect, settings):
"""Returns `True` when running of new command confirmed."""
if not settings.require_confirmation:
logs.show_command(new_command, side_effect, settings)
return True
logs.confirm_command(new_command, side_effect, settings)
try:
sys.stdin.read(1)
return True
except KeyboardInterrupt:
logs.failed('Aborted', settings)
return False
def run_rule(rule, command, settings):
"""Runs command from rule for passed command.""" """Runs command from rule for passed command."""
new_command = shells.to_shell(rule.get_new_command(command, settings)) if command.side_effect:
if confirm(new_command, rule.side_effect, settings): command.side_effect(command, settings)
if rule.side_effect: shells.put_to_history(command.script)
rule.side_effect(command, settings) print(command.script)
shells.put_to_history(new_command)
print(new_command)
# Entry points: # Entry points:
@ -152,18 +88,10 @@ def main():
logs.debug(u'Run with settings: {}'.format(pformat(settings)), settings) logs.debug(u'Run with settings: {}'.format(pformat(settings)), settings)
command = get_command(settings, sys.argv) command = get_command(settings, sys.argv)
rules = get_rules(user_dir, settings) corrected_commands = get_corrected_commands(command, user_dir, settings)
logs.debug( selected_command = select_command(corrected_commands, settings)
u'Loaded rules: {}'.format(', '.join(rule.name for rule in rules)), if selected_command:
settings) run_command(selected_command, settings)
matched_rule = get_matched_rule(command, rules, settings)
if matched_rule:
logs.debug(u'Matched rule: {}'.format(matched_rule.name), settings)
run_rule(matched_rule, command, settings)
return
logs.failed('No fuck given', settings)
def print_alias(): def print_alias():

View File

@ -3,6 +3,8 @@ from collections import namedtuple
Command = namedtuple('Command', ('script', 'stdout', 'stderr')) Command = namedtuple('Command', ('script', 'stdout', 'stderr'))
CorrectedCommand = namedtuple('CorrectedCommand', ('script', 'side_effect', 'priority'))
Rule = namedtuple('Rule', ('name', 'match', 'get_new_command', Rule = namedtuple('Rule', ('name', 'match', 'get_new_command',
'enabled_by_default', 'side_effect', 'enabled_by_default', 'side_effect',
'priority', 'requires_output')) 'priority', 'requires_output'))

88
thefuck/ui.py Normal file
View File

@ -0,0 +1,88 @@
import sys
from getch import getch
from . import logs
SELECT = 0
ABORT = 1
PREVIOUS = 2
NEXT = 3
def read_actions():
"""Yields actions for pressed keys."""
buffer = []
ch = None
while True:
try:
try:
ch = getch()
except OverflowError: # Ctrl+C, KeyboardInterrupt will be reraised
pass
except KeyboardInterrupt:
yield ABORT
if ch in ('\n', '\r'): # Enter
yield SELECT
buffer.append(ch)
buffer = buffer[-3:]
if buffer == ['\x1b', '[', 'A']: # ↑
yield PREVIOUS
if buffer == ['\x1b', '[', 'B']: # ↓
yield NEXT
class CommandSelector(object):
def __init__(self, commands):
self._commands = commands
self._index = 0
self._on_change = lambda x: x
def next(self):
self._index = (self._index + 1) % len(self._commands)
self._on_change(self.value)
def previous(self):
self._index = (self._index - 1) % len(self._commands)
self._on_change(self.value)
@property
def value(self):
return self._commands[self._index]
def on_change(self, fn):
self._on_change = fn
fn(self.value)
def select_command(corrected_commands, settings):
"""Returns:
- the first command when confirmation disabled;
- None when ctrl+c pressed;
- selected command.
"""
if not corrected_commands:
logs.failed('No fuck given', settings)
return
selector = CommandSelector(corrected_commands)
if not settings.require_confirmation:
logs.show_corrected_command(selector.value, settings)
return selector.value
selector.on_change(lambda val: logs.confirm_text(val, settings))
for key in read_actions():
if key == SELECT:
sys.stderr.write('\n')
return selector.value
elif key == ABORT:
logs.failed('\nAborted', settings)
return
elif key == PREVIOUS:
selector.previous()
elif key == NEXT:
selector.next()