1
0
mirror of https://github.com/nvbn/thefuck.git synced 2025-02-22 12:58:33 +00:00

Merge pull request #684 from nvbn/682-instant-fuck-mode

682 instant fuck mode
This commit is contained in:
Vladimir Iakovlev 2017-08-28 04:35:51 +03:00 committed by GitHub
commit bf3c16816d
24 changed files with 313 additions and 92 deletions

View File

@ -4,6 +4,8 @@ Magnificent app which corrects your previous console command,
inspired by a [@liamosaur](https://twitter.com/liamosaur/)
[tweet](https://twitter.com/liamosaur/status/506975850596536320).
The Fuck is too slow? [Try experimental instant mode!](#experimental-instant-mode)
[![gif with examples][examples-link]][examples-link]
Few more examples:
@ -395,6 +397,23 @@ export THEFUCK_PRIORITY='no_command=9999:apt_get=100'
export THEFUCK_HISTORY_LIMIT='2000'
```
## Experimental instant mode
By default The Fuck reruns a previous command and that takes time,
in instant mode The Fuck logs output with [script](https://en.wikipedia.org/wiki/Script_(Unix))
and just reads the log.
[![gif with instant mode][instant-mode-gif-link]][instant-mode-gif-link]
At the moment only Python 3 with bash or zsh is supported.
For enabling instant mode you need to add `--enable-experimental-instant-mode`
to alias initialization in your `.bashrc`, `.bash_profile` or `.zshrc` like:
```bash
eval $(thefuck --alias --enable-experimental-instant-mode)
```
## Developing
Install `The Fuck` for development:
@ -443,4 +462,5 @@ Project License can be found [here](LICENSE.md).
[coverage-link]: https://coveralls.io/github/nvbn/thefuck
[license-badge]: https://img.shields.io/badge/license-MIT-007EC7.svg
[examples-link]: https://raw.githubusercontent.com/nvbn/thefuck/master/example.gif
[instant-mode-gif-link]: https://raw.githubusercontent.com/nvbn/thefuck/682-instant-fuck-mode/example_instant_mode.gif
[homebrew]: http://brew.sh/

BIN
example_instant_mode.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 535 KiB

View File

@ -33,8 +33,9 @@ elif (3, 0) < version < (3, 3):
VERSION = '3.21'
install_requires = ['psutil', 'colorama', 'six', 'decorator']
install_requires = ['psutil', 'colorama', 'six', 'decorator', 'pyte']
extras_require = {':python_version<"3.4"': ['pathlib2'],
':python_version<"3.3"': ['backports.shutil_get_terminal_size'],
":sys_platform=='win32'": ['win_unicode_console']}
setup(name='thefuck',

View File

@ -6,7 +6,8 @@ from thefuck.const import ARGUMENT_PLACEHOLDER
def _args(**override):
args = {'alias': None, 'command': [], 'yes': False,
'help': False, 'version': False, 'debug': False,
'force_command': None, 'repeat': False}
'force_command': None, 'repeat': False,
'enable_experimental_instant_mode': False}
args.update(override)
return args
@ -14,6 +15,8 @@ def _args(**override):
@pytest.mark.parametrize('argv, result', [
(['thefuck'], _args()),
(['thefuck', '-a'], _args(alias='fuck')),
(['thefuck', '--alias', '--enable-experimental-instant-mode'],
_args(alias='fuck', enable_experimental_instant_mode=True)),
(['thefuck', '-a', 'fix'], _args(alias='fix')),
(['thefuck', 'git', 'branch', ARGUMENT_PLACEHOLDER, '-y'],
_args(command=['git', 'branch'], yes=True)),

View File

@ -110,13 +110,13 @@ class TestCommand(object):
Popen = Mock()
Popen.return_value.stdout.read.return_value = b'stdout'
Popen.return_value.stderr.read.return_value = b'stderr'
monkeypatch.setattr('thefuck.types.Popen', Popen)
monkeypatch.setattr('thefuck.output_readers.rerun.Popen', Popen)
return Popen
@pytest.fixture(autouse=True)
def prepare(self, monkeypatch):
monkeypatch.setattr('thefuck.types.Command._wait_output',
staticmethod(lambda *_: True))
monkeypatch.setattr('thefuck.output_readers.rerun._wait_output',
lambda *_: True)
def test_from_script_calls(self, Popen, settings, os_environ):
settings.env = {}

View File

@ -69,34 +69,40 @@ class TestSelectCommand(object):
def test_without_confirmation(self, capsys, commands, settings):
settings.require_confirmation = False
assert ui.select_command(iter(commands)) == commands[0]
assert capsys.readouterr() == ('', 'ls\n')
assert capsys.readouterr() == ('', const.USER_COMMAND_MARK + 'ls\n')
def test_without_confirmation_with_side_effects(
self, capsys, commands_with_side_effect, settings):
settings.require_confirmation = False
assert (ui.select_command(iter(commands_with_side_effect))
== commands_with_side_effect[0])
assert capsys.readouterr() == ('', 'ls (+side effect)\n')
assert capsys.readouterr() == ('', const.USER_COMMAND_MARK + 'ls (+side effect)\n')
def test_with_confirmation(self, capsys, patch_get_key, commands):
patch_get_key(['\n'])
assert ui.select_command(iter(commands)) == commands[0]
assert capsys.readouterr() == ('', u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\n')
assert capsys.readouterr() == (
'', const.USER_COMMAND_MARK + u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\n')
def test_with_confirmation_abort(self, capsys, patch_get_key, commands):
patch_get_key([const.KEY_CTRL_C])
assert ui.select_command(iter(commands)) is None
assert capsys.readouterr() == ('', u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\nAborted\n')
assert capsys.readouterr() == (
'', const.USER_COMMAND_MARK + u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\nAborted\n')
def test_with_confirmation_with_side_effct(self, capsys, patch_get_key,
commands_with_side_effect):
patch_get_key(['\n'])
assert (ui.select_command(iter(commands_with_side_effect))
== commands_with_side_effect[0])
assert capsys.readouterr() == ('', u'\x1b[1K\rls (+side effect) [enter/↑/↓/ctrl+c]\n')
assert capsys.readouterr() == (
'', const.USER_COMMAND_MARK + u'\x1b[1K\rls (+side effect) [enter/↑/↓/ctrl+c]\n')
def test_with_confirmation_select_second(self, capsys, patch_get_key, commands):
patch_get_key([const.KEY_DOWN, '\n'])
assert ui.select_command(iter(commands)) == commands[1]
assert capsys.readouterr() == (
'', u'\x1b[1K\rls [enter/↑/↓/ctrl+c]\x1b[1K\rcd [enter/↑/↓/ctrl+c]\n')
stderr = (
u'{mark}\x1b[1K\rls [enter/↑/↓/ctrl+c]'
u'{mark}\x1b[1K\rcd [enter/↑/↓/ctrl+c]\n'
).format(mark=const.USER_COMMAND_MARK)
assert capsys.readouterr() == ('', stderr)

View File

@ -25,6 +25,10 @@ class Parser(object):
nargs='?',
const=get_alias(),
help='[custom-alias-name] prints alias for current shell')
self._parser.add_argument(
'--enable-experimental-instant-mode',
action='store_true',
help='enable experimental instant mode, use on your own risk')
self._parser.add_argument(
'-h', '--help',
action='store_true',

View File

@ -98,7 +98,7 @@ class Settings(dict):
elif attr in ('wait_command', 'history_limit', 'wait_slow_command'):
return int(val)
elif attr in ('require_confirmation', 'no_colors', 'debug',
'alter_history'):
'alter_history', 'instant_mode'):
return val.lower() == 'true'
elif attr == 'slow_commands':
return val.split(':')

View File

@ -35,6 +35,7 @@ DEFAULT_SETTINGS = {'rules': DEFAULT_RULES,
'slow_commands': ['lein', 'react-native', 'gradle',
'./gradlew', 'vagrant'],
'repeat': False,
'instant_mode': False,
'env': {'LC_ALL': 'C', 'LANG': 'C', 'GIT_TRACE': '1'}}
ENV_TO_ATTR = {'THEFUCK_RULES': 'rules',
@ -48,7 +49,8 @@ ENV_TO_ATTR = {'THEFUCK_RULES': 'rules',
'THEFUCK_ALTER_HISTORY': 'alter_history',
'THEFUCK_WAIT_SLOW_COMMAND': 'wait_slow_command',
'THEFUCK_SLOW_COMMANDS': 'slow_commands',
'THEFUCK_REPEAT': 'repeat'}
'THEFUCK_REPEAT': 'repeat',
'THEFUCK_INSTANT_MODE': 'instant_mode'}
SETTINGS_HEADER = u"""# The Fuck settings file
#
@ -65,3 +67,7 @@ SETTINGS_HEADER = u"""# The Fuck settings file
ARGUMENT_PLACEHOLDER = 'THEFUCK_ARGUMENT_PLACEHOLDER'
CONFIGURATION_TIMEOUT = 60
USER_COMMAND_MARK = u'\u200B' * 10
LOG_SIZE = 1000

View File

@ -4,3 +4,7 @@ class EmptyCommand(Exception):
class NoRuleMatched(Exception):
"""Raised when no rule matched for some command."""
class ScriptNotInLog(Exception):
"""Script not found in log."""

View File

@ -6,6 +6,7 @@ import sys
from traceback import format_exception
import colorama
from .conf import settings
from . import const
def color(color_):
@ -16,6 +17,14 @@ def color(color_):
return color_
def warn(title):
sys.stderr.write(u'{warn}[WARN] {title}{reset}\n'.format(
warn=color(colorama.Back.RED + colorama.Fore.WHITE
+ colorama.Style.BRIGHT),
reset=color(colorama.Style.RESET_ALL),
title=title))
def exception(title, exc_info):
sys.stderr.write(
u'{warn}[WARN] {title}:{reset}\n{trace}'
@ -39,7 +48,8 @@ def failed(msg):
def show_corrected_command(corrected_command):
sys.stderr.write(u'{bold}{script}{reset}{side_effect}\n'.format(
sys.stderr.write(u'{prefix}{bold}{script}{reset}{side_effect}\n'.format(
prefix=const.USER_COMMAND_MARK,
script=corrected_command.script,
side_effect=u' (+side effect)' if corrected_command.side_effect else u'',
bold=color(colorama.Style.BRIGHT),
@ -48,9 +58,10 @@ def show_corrected_command(corrected_command):
def confirm_text(corrected_command):
sys.stderr.write(
(u'{clear}{bold}{script}{reset}{side_effect} '
(u'{prefix}{clear}{bold}{script}{reset}{side_effect} '
u'[{green}enter{reset}/{blue}{reset}/{blue}{reset}'
u'/{red}ctrl+c{reset}]').format(
prefix=const.USER_COMMAND_MARK,
script=corrected_command.script,
side_effect=' (+side effect)' if corrected_command.side_effect else '',
clear='\033[1K\r',

View File

@ -5,6 +5,7 @@ init_output()
from pprint import pformat # noqa: E402
import sys # noqa: E402
import six # noqa: E402
from . import logs, types # noqa: E402
from .shells import shell # noqa: E402
from .conf import settings # noqa: E402
@ -13,6 +14,7 @@ from .exceptions import EmptyCommand # noqa: E402
from .ui import select_command # noqa: E402
from .argument_parser import Parser # noqa: E402
from .utils import get_installation_info # noqa: E402
from .logs import warn # noqa: E402
def fix_command(known_args):
@ -50,6 +52,15 @@ def main():
elif known_args.command:
fix_command(known_args)
elif known_args.alias:
print(shell.app_alias(known_args.alias))
if known_args.enable_experimental_instant_mode:
if six.PY2:
warn("Instant mode not supported with Python 2")
alias = shell.app_alias(known_args.alias)
else:
alias = shell.instant_mode_alias(known_args.alias)
else:
alias = shell.app_alias(known_args.alias)
print(alias)
else:
parser.print_usage()

View File

@ -0,0 +1,18 @@
from ..conf import settings
from . import read_log, rerun
def get_output(script, expanded):
"""Get output of the script.
:param script: Console script.
:type script: str
:param expanded: Console script with expanded aliases.
:type expanded: str
:rtype: (str, str)
"""
if settings.instant_mode:
return read_log.get_output(script)
else:
return rerun.get_output(script, expanded)

View File

@ -0,0 +1,82 @@
import os
import shlex
try:
from shutil import get_terminal_size
except ImportError:
from backports.shutil_get_terminal_size import get_terminal_size
import six
import pyte
from ..exceptions import ScriptNotInLog
from ..logs import warn
from .. import const
def _group_by_calls(log):
script_line = None
lines = []
for line in log:
try:
line = line.decode()
except UnicodeDecodeError:
continue
if const.USER_COMMAND_MARK in line:
if script_line:
yield script_line, lines
script_line = line
lines = [line]
elif script_line is not None:
lines.append(line)
if script_line:
yield script_line, lines
def _get_script_group_lines(grouped, script):
parts = shlex.split(script)
for script_line, lines in reversed(grouped):
if all(part in script_line for part in parts):
return lines
raise ScriptNotInLog
def _get_output_lines(script, log_file):
lines = log_file.readlines()[-const.LOG_SIZE:]
grouped = list(_group_by_calls(lines))
script_lines = _get_script_group_lines(grouped, script)
screen = pyte.Screen(get_terminal_size().columns, len(script_lines))
stream = pyte.Stream(screen)
stream.feed(''.join(script_lines))
return screen.display
def get_output(script):
"""Reads script output from log.
:type script: str
:rtype: (str, str)
"""
if six.PY2:
warn('Experimental instant mode is Python 3+ only')
return None, None
if 'THEFUCK_OUTPUT_LOG' not in os.environ:
warn("Output log isn't specified")
return None, None
try:
with open(os.environ['THEFUCK_OUTPUT_LOG'], 'rb') as log_file:
lines = _get_output_lines(script, log_file)
output = '\n'.join(lines).strip()
return output, output
except OSError:
warn("Can't read output log")
return None, None
except ScriptNotInLog:
warn("Script not found in output log")
return None, None

View File

@ -0,0 +1,57 @@
import os
import shlex
from subprocess import Popen, PIPE
from psutil import Process, TimeoutExpired
from .. import logs
from ..conf import settings
def _wait_output(popen, is_slow):
"""Returns `True` if we can get output of the command in the
`settings.wait_command` time.
Command will be killed if it wasn't finished in the time.
:type popen: Popen
:rtype: bool
"""
proc = Process(popen.pid)
try:
proc.wait(settings.wait_slow_command if is_slow
else settings.wait_command)
return True
except TimeoutExpired:
for child in proc.children(recursive=True):
child.kill()
proc.kill()
return False
def get_output(script, expanded):
"""Runs the script and obtains stdin/stderr.
:type script: str
:type expanded: str
:rtype: (str, str)
"""
env = dict(os.environ)
env.update(settings.env)
is_slow = shlex.split(expanded) in settings.slow_commands
with logs.debug_time(u'Call: {}; with env: {}; is slow: '.format(
script, env, is_slow)):
result = Popen(expanded, shell=True, stdin=PIPE,
stdout=PIPE, stderr=PIPE, env=env)
if _wait_output(result, is_slow):
stdout = result.stdout.read().decode('utf-8')
stderr = result.stderr.read().decode('utf-8')
logs.debug(u'Received stdout: {}'.format(stdout))
logs.debug(u'Received stderr: {}'.format(stderr))
return stdout, stderr
else:
logs.debug(u'Execution timed out!')
return None, None

View File

@ -1,3 +1,4 @@
import re
from thefuck.utils import replace_argument
from thefuck.specific.git import git_support
@ -32,5 +33,6 @@ def get_new_command(command):
if len(command_parts) > upstream_option_index:
command_parts.pop(upstream_option_index)
push_upstream = command.stderr.split('\n')[-3].strip().partition('git ')[2]
return replace_argument(" ".join(command_parts), 'push', push_upstream)
arguments = re.findall(r'git push (.*)', command.stderr)[0].strip()
return replace_argument(" ".join(command_parts), 'push',
'push {}'.format(arguments))

View File

@ -1,6 +1,7 @@
import os
from uuid import uuid4
from ..conf import settings
from ..const import ARGUMENT_PLACEHOLDER
from ..const import ARGUMENT_PLACEHOLDER, USER_COMMAND_MARK
from ..utils import memoize
from .generic import Generic
@ -27,6 +28,21 @@ class Bash(Generic):
alter_history=('history -s $TF_CMD;'
if settings.alter_history else ''))
def instant_mode_alias(self, alias_name):
if os.environ.get('THEFUCK_INSTANT_MODE', '').lower() == 'true':
return '''
export PS1="{user_command_mark}$PS1";
{app_alias}
'''.format(user_command_mark=USER_COMMAND_MARK,
app_alias=self.app_alias(alias_name))
else:
return '''
export THEFUCK_INSTANT_MODE=True;
export THEFUCK_OUTPUT_LOG={log};
script -feq {log};
exit
'''.format(log='/tmp/thefuck-script-log-{}'.format(uuid4().hex))
def _parse_alias(self, alias):
name, value = alias.replace('alias ', '', 1).split('=', 1)
if value[0] == value[-1] == '"' or value[0] == value[-1] == "'":

View File

@ -18,7 +18,7 @@ class Fish(Generic):
default.add(alias.strip())
return default
def app_alias(self, fuck):
def app_alias(self, alias_name):
if settings.alter_history:
alter_history = (' builtin history delete --exact'
' --case-sensitive -- $fucked_up_command\n'
@ -33,7 +33,7 @@ class Fish(Generic):
' if [ "$unfucked_command" != "" ]\n'
' eval $unfucked_command\n{1}'
' end\n'
'end').format(fuck, alter_history)
'end').format(alias_name, alter_history)
@memoize
@cache('.config/fish/config.fish', '.config/fish/functions')

View File

@ -3,6 +3,7 @@ import os
import shlex
import six
from collections import namedtuple
from ..logs import warn
from ..utils import memoize
from ..conf import settings
from ..system import Path
@ -32,9 +33,13 @@ class Generic(object):
"""Prepares command for running in shell."""
return command_script
def app_alias(self, fuck):
def app_alias(self, alias_name):
return "alias {0}='eval $(TF_ALIAS={0} PYTHONIOENCODING=utf-8 " \
"thefuck $(fc -ln -1))'".format(fuck)
"thefuck $(fc -ln -1))'".format(alias_name)
def instant_mode_alias(self, alias_name):
warn("Instant mode not supported by your shell")
return self.app_alias(alias_name)
def _get_history_file_name(self):
return ''

View File

@ -2,8 +2,8 @@ from .generic import Generic, ShellConfiguration
class Powershell(Generic):
def app_alias(self, fuck):
return 'function ' + fuck + ' {\n' \
def app_alias(self, alias_name):
return 'function ' + alias_name + ' {\n' \
' $history = (Get-History -Count 1).CommandLine;\n' \
' if (-not [string]::IsNullOrWhiteSpace($history)) {\n' \
' $fuck = $(thefuck $history);\n' \

View File

@ -6,10 +6,10 @@ from .generic import Generic
class Tcsh(Generic):
def app_alias(self, fuck):
def app_alias(self, alias_name):
return ("alias {0} 'setenv TF_ALIAS {0} && "
"set fucked_cmd=`history -h 2 | head -n 1` && "
"eval `thefuck ${{fucked_cmd}}`'").format(fuck)
"eval `thefuck ${{fucked_cmd}}`'").format(alias_name)
def _parse_alias(self, alias):
name, value = alias.split("\t", 1)

View File

@ -1,7 +1,8 @@
from time import time
import os
from uuid import uuid4
from ..conf import settings
from ..const import ARGUMENT_PLACEHOLDER
from ..const import ARGUMENT_PLACEHOLDER, USER_COMMAND_MARK
from ..utils import memoize
from .generic import Generic
@ -26,6 +27,21 @@ class Zsh(Generic):
alter_history=('test -n "$TF_CMD" && print -s $TF_CMD'
if settings.alter_history else ''))
def instant_mode_alias(self, alias_name):
if os.environ.get('THEFUCK_INSTANT_MODE', '').lower() == 'true':
return '''
export PS1="{user_command_mark}$PS1";
{app_alias}
'''.format(user_command_mark=USER_COMMAND_MARK,
app_alias=self.app_alias(alias_name))
else:
return '''
export THEFUCK_INSTANT_MODE=True;
export THEFUCK_OUTPUT_LOG={log};
script -feq {log};
exit
'''.format(log='/tmp/thefuck-script-log-{}'.format(uuid4().hex))
def _parse_alias(self, alias):
name, value = alias.split('=', 1)
if value[0] == value[-1] == '"' or value[0] == value[-1] == "'":

View File

@ -1,15 +1,13 @@
from imp import load_source
from subprocess import Popen, PIPE
import os
import sys
import six
from psutil import Process, TimeoutExpired
from . import logs
from .shells import shell
from .conf import settings
from .const import DEFAULT_PRIORITY, ALL_ENABLED
from .exceptions import EmptyCommand
from .utils import get_alias
from .utils import get_alias, format_raw_script
from .output_readers import get_output
class Command(object):
@ -61,44 +59,6 @@ class Command(object):
kwargs.setdefault('stderr', self.stderr)
return Command(**kwargs)
@staticmethod
def _wait_output(popen, is_slow):
"""Returns `True` if we can get output of the command in the
`settings.wait_command` time.
Command will be killed if it wasn't finished in the time.
:type popen: Popen
:rtype: bool
"""
proc = Process(popen.pid)
try:
proc.wait(settings.wait_slow_command if is_slow
else settings.wait_command)
return True
except TimeoutExpired:
for child in proc.children(recursive=True):
child.kill()
proc.kill()
return False
@staticmethod
def _prepare_script(raw_script):
"""Creates single script from a list of script parts.
:type raw_script: [basestring]
:rtype: basestring
"""
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
else:
script = ' '.join(raw_script)
script = script.strip()
return shell.from_shell(script)
@classmethod
def from_raw_script(cls, raw_script):
"""Creates instance of `Command` from a list of script parts.
@ -108,29 +68,13 @@ class Command(object):
:raises: EmptyCommand
"""
script = cls._prepare_script(raw_script)
script = format_raw_script(raw_script)
if not script:
raise EmptyCommand
env = dict(os.environ)
env.update(settings.env)
is_slow = script.split(' ')[0] in settings.slow_commands
with logs.debug_time(u'Call: {}; with env: {}; is slow: '.format(
script, env, is_slow)):
result = Popen(script, shell=True, stdin=PIPE,
stdout=PIPE, stderr=PIPE, env=env)
if cls._wait_output(result, is_slow):
stdout = result.stdout.read().decode('utf-8')
stderr = result.stderr.read().decode('utf-8')
logs.debug(u'Received stdout: {}'.format(stdout))
logs.debug(u'Received stderr: {}'.format(stderr))
return cls(script, stdout, stderr)
else:
logs.debug(u'Execution timed out!')
return cls(script, None, None)
expanded = shell.from_shell(script)
stdout, stderr = get_output(script, expanded)
return cls(expanded, stdout, stderr)
class Rule(object):

View File

@ -7,7 +7,7 @@ from contextlib import closing
from decorator import decorator
from difflib import get_close_matches
from functools import wraps
from warnings import warn
from .logs import warn
from .conf import settings
from .system import Path
@ -282,3 +282,18 @@ def get_valid_history_without_current(command):
return [line for line in _not_corrected(history, tf_alias)
if not line.startswith(tf_alias) and not line == command.script
and line.split(' ')[0] in executables]
def format_raw_script(raw_script):
"""Creates single script from a list of script parts.
:type raw_script: [basestring]
:rtype: basestring
"""
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
else:
script = ' '.join(raw_script)
return script.strip()