1
0
mirror of https://github.com/nvbn/thefuck.git synced 2025-03-15 07:08:49 +00:00
thefuck/thefuck/utils.py

195 lines
5.1 KiB
Python
Raw Normal View History

from difflib import get_close_matches
from functools import wraps
2015-09-02 11:10:03 +03:00
import shelve
2015-08-27 16:52:26 +03:00
from decorator import decorator
2015-09-02 11:54:58 +03:00
from contextlib import closing
2015-09-02 11:10:03 +03:00
import tempfile
import os
2015-05-22 17:07:01 +03:00
import pickle
2015-07-16 20:23:31 +02:00
import re
from pathlib import Path
import six
2015-05-06 13:17:14 +02:00
DEVNULL = open(os.devnull, 'w')
2015-07-04 16:45:30 +02:00
if six.PY2:
from pipes import quote
2015-07-04 16:45:30 +02:00
else:
from shlex import quote
2015-07-04 16:45:30 +02:00
2015-05-06 13:17:14 +02:00
def memoize(fn):
"""Caches previous calls to the function."""
memo = {}
@wraps(fn)
def wrapper(*args, **kwargs):
key = pickle.dumps((args, kwargs))
if key not in memo or memoize.disabled:
memo[key] = fn(*args, **kwargs)
return memo[key]
return wrapper
memoize.disabled = False
@memoize
def which(program):
"""Returns `program` path or `None`."""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def wrap_settings(params):
"""Adds default values to settings if it not presented.
Usage:
@wrap_settings({'apt': '/usr/bin/apt'})
def match(command, settings):
print(settings.apt)
"""
2015-08-27 16:52:26 +03:00
def _wrap_settings(fn, command, settings):
return fn(command, settings.update(**params))
return decorator(_wrap_settings)
def get_closest(word, possibilities, n=3, cutoff=0.6, fallback_to_first=True):
"""Returns closest match or just first from possibilities."""
possibilities = list(possibilities)
try:
return get_close_matches(word, possibilities, n, cutoff)[0]
except IndexError:
if fallback_to_first:
return possibilities[0]
@memoize
def get_all_executables():
from thefuck.shells import thefuck_alias, get_aliases
def _safe(fn, fallback):
try:
return fn()
except OSError:
return fallback
tf_alias = thefuck_alias()
return [exe.name
for path in os.environ.get('PATH', '').split(':')
for exe in _safe(lambda: list(Path(path).iterdir()), [])
if not _safe(exe.is_dir, True)] + [
alias for alias in get_aliases() if alias != tf_alias]
2015-07-24 00:39:56 +03:00
def replace_argument(script, from_, to):
"""Replaces command line argument."""
replaced_in_the_end = re.sub(u' {}$'.format(from_), u' {}'.format(to),
script, count=1)
if replaced_in_the_end != script:
return replaced_in_the_end
else:
return script.replace(
u' {} '.format(from_), u' {} '.format(to), 1)
2015-08-27 16:52:26 +03:00
@decorator
def eager(fn, *args, **kwargs):
return list(fn(*args, **kwargs))
@eager
def get_all_matched_commands(stderr, separator='Did you mean'):
should_yield = False
for line in stderr.split('\n'):
if separator in line:
should_yield = True
elif should_yield and line:
yield line.strip()
def replace_command(command, broken, matched):
"""Helper for *_no_command rules."""
new_cmds = get_close_matches(broken, matched, cutoff=0.1)
return [replace_argument(command.script, broken, new_cmd.strip())
for new_cmd in new_cmds]
2015-08-27 16:08:29 +03:00
@memoize
def is_app(command, *app_names):
"""Returns `True` if command is call to one of passed app names."""
for name in app_names:
2015-08-27 16:10:50 +03:00
if command.script == name \
or command.script.startswith(u'{} '.format(name)):
2015-08-27 16:08:29 +03:00
return True
return False
def for_app(*app_names):
"""Specifies that matching script is for on of app names."""
2015-08-27 16:52:26 +03:00
def _for_app(fn, command, settings):
if is_app(command, *app_names):
return fn(command, settings)
else:
return False
return decorator(_for_app)
2015-09-02 11:10:03 +03:00
def cache(*depends_on):
"""Caches function result in temporary file.
Cache will be expired when modification date of files from `depends_on`
will be changed.
Function wrapped in `cache` should be arguments agnostic.
"""
def _get_mtime(name):
path = os.path.join(os.path.expanduser('~'), name)
try:
return str(os.path.getmtime(path))
except OSError:
return '0'
@decorator
def _cache(fn, *args, **kwargs):
if cache.disabled:
return fn(*args, **kwargs)
cache_path = os.path.join(tempfile.gettempdir(), '.thefuck-cache')
2015-09-02 11:54:58 +03:00
# A bit obscure, but simplest way to generate unique key for
# functions and methods in python 2 and 3:
2015-09-02 11:10:03 +03:00
key = '{}.{}'.format(fn.__module__, repr(fn).split('at')[0])
etag = '.'.join(_get_mtime(name) for name in depends_on)
2015-09-02 11:54:58 +03:00
with closing(shelve.open(cache_path)) as db:
2015-09-02 11:10:03 +03:00
if db.get(key, {}).get('etag') == etag:
return db[key]['value']
else:
value = fn(*args, **kwargs)
db[key] = {'etag': etag, 'value': value}
return value
return _cache
cache.disabled = False