1
0
mirror of https://github.com/nvbn/thefuck.git synced 2025-01-31 02:01:13 +00:00

#682: Implement experimental instant mode

This commit is contained in:
Vladimir Iakovlev 2017-08-25 11:44:07 +02:00
parent f76d2061d1
commit 20e678a38a
10 changed files with 168 additions and 66 deletions

View File

@ -33,7 +33,7 @@ elif (3, 0) < version < (3, 3):
VERSION = '3.21'
install_requires = ['psutil', 'colorama', 'six', 'decorator']
install_requires = ['psutil', 'colorama', 'six', 'decorator', 'pyte']
extras_require = {':python_version<"3.4"': ['pathlib2'],
":sys_platform=='win32'": ['win_unicode_console']}

View File

@ -98,7 +98,7 @@ class Settings(dict):
elif attr in ('wait_command', 'history_limit', 'wait_slow_command'):
return int(val)
elif attr in ('require_confirmation', 'no_colors', 'debug',
'alter_history'):
'alter_history', 'instant_mode'):
return val.lower() == 'true'
elif attr == 'slow_commands':
return val.split(':')

View File

@ -35,6 +35,7 @@ DEFAULT_SETTINGS = {'rules': DEFAULT_RULES,
'slow_commands': ['lein', 'react-native', 'gradle',
'./gradlew', 'vagrant'],
'repeat': False,
'instant_mode': False,
'env': {'LC_ALL': 'C', 'LANG': 'C', 'GIT_TRACE': '1'}}
ENV_TO_ATTR = {'THEFUCK_RULES': 'rules',
@ -48,7 +49,8 @@ ENV_TO_ATTR = {'THEFUCK_RULES': 'rules',
'THEFUCK_ALTER_HISTORY': 'alter_history',
'THEFUCK_WAIT_SLOW_COMMAND': 'wait_slow_command',
'THEFUCK_SLOW_COMMANDS': 'slow_commands',
'THEFUCK_REPEAT': 'repeat'}
'THEFUCK_REPEAT': 'repeat',
'THEFUCK_INSTANT_MODE': 'instant_mode'}
SETTINGS_HEADER = u"""# The Fuck settings file
#
@ -65,3 +67,7 @@ SETTINGS_HEADER = u"""# The Fuck settings file
ARGUMENT_PLACEHOLDER = 'THEFUCK_ARGUMENT_PLACEHOLDER'
CONFIGURATION_TIMEOUT = 60
USER_COMMAND_MARK = u'\u200B' * 10
LOG_SIZE = 1000

View File

@ -4,3 +4,7 @@ class EmptyCommand(Exception):
class NoRuleMatched(Exception):
"""Raised when no rule matched for some command."""
class ScriptNotInLog(Exception):
"""Script not found in log."""

View File

@ -6,6 +6,7 @@ import sys
from traceback import format_exception
import colorama
from .conf import settings
from . import const
def color(color_):
@ -39,7 +40,8 @@ def failed(msg):
def show_corrected_command(corrected_command):
sys.stderr.write(u'{bold}{script}{reset}{side_effect}\n'.format(
sys.stderr.write(u'{prefix}{bold}{script}{reset}{side_effect}\n'.format(
prefix=const.USER_COMMAND_MARK,
script=corrected_command.script,
side_effect=u' (+side effect)' if corrected_command.side_effect else u'',
bold=color(colorama.Style.BRIGHT),
@ -48,9 +50,10 @@ def show_corrected_command(corrected_command):
def confirm_text(corrected_command):
sys.stderr.write(
(u'{clear}{bold}{script}{reset}{side_effect} '
(u'{prefix}{clear}{bold}{script}{reset}{side_effect} '
u'[{green}enter{reset}/{blue}{reset}/{blue}{reset}'
u'/{red}ctrl+c{reset}]').format(
prefix=const.USER_COMMAND_MARK,
script=corrected_command.script,
side_effect=' (+side effect)' if corrected_command.side_effect else '',
clear='\033[1K\r',

View File

@ -0,0 +1,9 @@
from ..conf import settings
from . import read_log, rerun
def get_output(script):
if settings.instant_mode:
return read_log.get_output(script)
else:
return rerun.get_output(script)

View File

@ -0,0 +1,68 @@
import os
import shlex
from shutil import get_terminal_size
from warnings import warn
import pyte
from ..exceptions import ScriptNotInLog
from .. import const
def _group_by_calls(log):
script_line = None
lines = []
for line in log:
try:
line = line.decode()
except UnicodeDecodeError:
continue
if const.USER_COMMAND_MARK in line:
if script_line:
yield script_line, lines
script_line = line
lines = [line]
elif script_line is not None:
lines.append(line)
if script_line:
yield script_line, lines
def _get_script_group_lines(grouped, script):
parts = shlex.split(script)
for script_line, lines in reversed(grouped):
if all(part in script_line for part in parts):
return lines
raise ScriptNotInLog
def _get_output_lines(script, log_file):
lines = log_file.readlines()[-const.LOG_SIZE:]
grouped = list(_group_by_calls(lines))
script_lines = _get_script_group_lines(grouped, script)
screen = pyte.Screen(get_terminal_size().columns, len(script_lines))
stream = pyte.Stream(screen)
stream.feed(''.join(script_lines))
return screen.display
def get_output(script):
if 'THEFUCK_OUTPUT_LOG' not in os.environ:
warn("Output log isn't specified")
return None, None
try:
with open(os.environ['THEFUCK_OUTPUT_LOG'], 'rb') as log_file:
lines = _get_output_lines(script, log_file)
output = '\n'.join(lines).strip()
return output, output
except FileNotFoundError:
warn("Can't read output log")
return None, None
except ScriptNotInLog:
warn("Script not found in output log")
return None, None

50
thefuck/output/rerun.py Normal file
View File

@ -0,0 +1,50 @@
import os
import shlex
from subprocess import Popen, PIPE
from psutil import Process, TimeoutExpired
from .. import logs
from ..conf import settings
def _wait_output(popen, is_slow):
"""Returns `True` if we can get output of the command in the
`settings.wait_command` time.
Command will be killed if it wasn't finished in the time.
:type popen: Popen
:rtype: bool
"""
proc = Process(popen.pid)
try:
proc.wait(settings.wait_slow_command if is_slow
else settings.wait_command)
return True
except TimeoutExpired:
for child in proc.children(recursive=True):
child.kill()
proc.kill()
return False
def get_output(script):
env = dict(os.environ)
env.update(settings.env)
is_slow = shlex.split(script) in settings.slow_commands
with logs.debug_time(u'Call: {}; with env: {}; is slow: '.format(
script, env, is_slow)):
result = Popen(script, shell=True, stdin=PIPE,
stdout=PIPE, stderr=PIPE, env=env)
if _wait_output(result, is_slow):
stdout = result.stdout.read().decode('utf-8')
stderr = result.stderr.read().decode('utf-8')
logs.debug(u'Received stdout: {}'.format(stdout))
logs.debug(u'Received stderr: {}'.format(stderr))
return stdout, stderr
else:
logs.debug(u'Execution timed out!')
return None, None

View File

@ -2,14 +2,13 @@ from imp import load_source
from subprocess import Popen, PIPE
import os
import sys
import six
from psutil import Process, TimeoutExpired
from . import logs
from .shells import shell
from .conf import settings
from .const import DEFAULT_PRIORITY, ALL_ENABLED
from .exceptions import EmptyCommand
from .utils import get_alias
from .utils import get_alias, format_raw_script
from .output import get_output
class Command(object):
@ -61,44 +60,6 @@ class Command(object):
kwargs.setdefault('stderr', self.stderr)
return Command(**kwargs)
@staticmethod
def _wait_output(popen, is_slow):
"""Returns `True` if we can get output of the command in the
`settings.wait_command` time.
Command will be killed if it wasn't finished in the time.
:type popen: Popen
:rtype: bool
"""
proc = Process(popen.pid)
try:
proc.wait(settings.wait_slow_command if is_slow
else settings.wait_command)
return True
except TimeoutExpired:
for child in proc.children(recursive=True):
child.kill()
proc.kill()
return False
@staticmethod
def _prepare_script(raw_script):
"""Creates single script from a list of script parts.
:type raw_script: [basestring]
:rtype: basestring
"""
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
else:
script = ' '.join(raw_script)
script = script.strip()
return shell.from_shell(script)
@classmethod
def from_raw_script(cls, raw_script):
"""Creates instance of `Command` from a list of script parts.
@ -108,29 +69,12 @@ class Command(object):
:raises: EmptyCommand
"""
script = cls._prepare_script(raw_script)
script = format_raw_script(raw_script)
if not script:
raise EmptyCommand
env = dict(os.environ)
env.update(settings.env)
is_slow = script.split(' ')[0] in settings.slow_commands
with logs.debug_time(u'Call: {}; with env: {}; is slow: '.format(
script, env, is_slow)):
result = Popen(script, shell=True, stdin=PIPE,
stdout=PIPE, stderr=PIPE, env=env)
if cls._wait_output(result, is_slow):
stdout = result.stdout.read().decode('utf-8')
stderr = result.stderr.read().decode('utf-8')
logs.debug(u'Received stdout: {}'.format(stdout))
logs.debug(u'Received stderr: {}'.format(stderr))
stdout, stderr = get_output(script)
return cls(script, stdout, stderr)
else:
logs.debug(u'Execution timed out!')
return cls(script, None, None)
class Rule(object):

View File

@ -282,3 +282,21 @@ def get_valid_history_without_current(command):
return [line for line in _not_corrected(history, tf_alias)
if not line.startswith(tf_alias) and not line == command.script
and line.split(' ')[0] in executables]
def format_raw_script(raw_script):
"""Creates single script from a list of script parts.
:type raw_script: [basestring]
:rtype: basestring
"""
from .shells import shell
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
else:
script = ' '.join(raw_script)
script = script.strip()
return shell.from_shell(script)