mirror of
https://github.com/nvbn/thefuck.git
synced 2025-01-19 04:21:14 +00:00
257 lines
7.5 KiB
Python
257 lines
7.5 KiB
Python
import dbm
|
|
import os
|
|
import pickle
|
|
import pkg_resources
|
|
import re
|
|
import shelve
|
|
from .conf import settings
|
|
from contextlib import closing
|
|
from decorator import decorator
|
|
from difflib import get_close_matches
|
|
from functools import wraps
|
|
from inspect import getargspec
|
|
from pathlib import Path
|
|
from warnings import warn
|
|
|
|
DEVNULL = open(os.devnull, 'w')
|
|
|
|
|
|
def memoize(fn):
|
|
"""Caches previous calls to the function."""
|
|
memo = {}
|
|
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
if not memoize.disabled:
|
|
key = pickle.dumps((args, kwargs))
|
|
if key not in memo:
|
|
memo[key] = fn(*args, **kwargs)
|
|
value = memo[key]
|
|
else:
|
|
# Memoize is disabled, call the function
|
|
value = fn(*args, **kwargs)
|
|
|
|
return value
|
|
|
|
return wrapper
|
|
memoize.disabled = False
|
|
|
|
|
|
@memoize
|
|
def which(program):
|
|
"""Returns `program` path or `None`."""
|
|
|
|
def is_exe(fpath):
|
|
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
|
|
|
|
fpath, fname = os.path.split(program)
|
|
if fpath:
|
|
if is_exe(program):
|
|
return program
|
|
else:
|
|
for path in os.environ["PATH"].split(os.pathsep):
|
|
path = path.strip('"')
|
|
exe_file = os.path.join(path, program)
|
|
if is_exe(exe_file):
|
|
return exe_file
|
|
|
|
return None
|
|
|
|
|
|
def default_settings(params):
|
|
"""Adds default values to settings if it not presented.
|
|
|
|
Usage:
|
|
|
|
@default_settings({'apt': '/usr/bin/apt'})
|
|
def match(command, settings):
|
|
print(settings.apt)
|
|
|
|
"""
|
|
def _default_settings(fn, command):
|
|
for k, w in params.items():
|
|
settings.setdefault(k, w)
|
|
return fn(command)
|
|
return decorator(_default_settings)
|
|
|
|
|
|
def get_closest(word, possibilities, n=3, cutoff=0.6, fallback_to_first=True):
|
|
"""Returns closest match or just first from possibilities."""
|
|
possibilities = list(possibilities)
|
|
try:
|
|
return get_close_matches(word, possibilities, n, cutoff)[0]
|
|
except IndexError:
|
|
if fallback_to_first:
|
|
return possibilities[0]
|
|
|
|
|
|
@memoize
|
|
def get_all_executables():
|
|
from thefuck.shells import thefuck_alias, get_aliases
|
|
|
|
def _safe(fn, fallback):
|
|
try:
|
|
return fn()
|
|
except OSError:
|
|
return fallback
|
|
|
|
tf_alias = thefuck_alias()
|
|
tf_entry_points = get_installation_info().get_entry_map()\
|
|
.get('console_scripts', {})\
|
|
.keys()
|
|
bins = [exe.name
|
|
for path in os.environ.get('PATH', '').split(':')
|
|
for exe in _safe(lambda: list(Path(path).iterdir()), [])
|
|
if not _safe(exe.is_dir, True)
|
|
and exe.name not in tf_entry_points]
|
|
aliases = [alias for alias in get_aliases() if alias != tf_alias]
|
|
return bins + aliases
|
|
|
|
|
|
def replace_argument(script, from_, to):
|
|
"""Replaces command line argument."""
|
|
replaced_in_the_end = re.sub(u' {}$'.format(from_), u' {}'.format(to),
|
|
script, count=1)
|
|
if replaced_in_the_end != script:
|
|
return replaced_in_the_end
|
|
else:
|
|
return script.replace(
|
|
u' {} '.format(from_), u' {} '.format(to), 1)
|
|
|
|
|
|
@decorator
|
|
def eager(fn, *args, **kwargs):
|
|
return list(fn(*args, **kwargs))
|
|
|
|
|
|
@eager
|
|
def get_all_matched_commands(stderr, separator='Did you mean'):
|
|
should_yield = False
|
|
for line in stderr.split('\n'):
|
|
if separator in line:
|
|
should_yield = True
|
|
elif should_yield and line:
|
|
yield line.strip()
|
|
|
|
|
|
def replace_command(command, broken, matched):
|
|
"""Helper for *_no_command rules."""
|
|
new_cmds = get_close_matches(broken, matched, cutoff=0.1)
|
|
return [replace_argument(command.script, broken, new_cmd.strip())
|
|
for new_cmd in new_cmds]
|
|
|
|
|
|
@memoize
|
|
def is_app(command, *app_names, **kwargs):
|
|
"""Returns `True` if command is call to one of passed app names."""
|
|
|
|
at_least = kwargs.pop('at_least', 0)
|
|
if kwargs:
|
|
raise TypeError("got an unexpected keyword argument '{}'".format(kwargs.keys()))
|
|
|
|
if command.script_parts is not None and len(command.script_parts) > at_least:
|
|
return command.script_parts[0] in app_names
|
|
|
|
return False
|
|
|
|
|
|
def for_app(*app_names, **kwargs):
|
|
"""Specifies that matching script is for on of app names."""
|
|
def _for_app(fn, command):
|
|
if is_app(command, *app_names, **kwargs):
|
|
return fn(command)
|
|
else:
|
|
return False
|
|
|
|
return decorator(_for_app)
|
|
|
|
|
|
def cache(*depends_on):
|
|
"""Caches function result in temporary file.
|
|
|
|
Cache will be expired when modification date of files from `depends_on`
|
|
will be changed.
|
|
|
|
Function wrapped in `cache` should be arguments agnostic.
|
|
|
|
"""
|
|
def _get_mtime(name):
|
|
path = os.path.join(os.path.expanduser('~'), name)
|
|
try:
|
|
return str(os.path.getmtime(path))
|
|
except OSError:
|
|
return '0'
|
|
|
|
def _get_cache_path():
|
|
default_xdg_cache_dir = os.path.expanduser("~/.cache")
|
|
cache_dir = os.getenv("XDG_CACHE_HOME", default_xdg_cache_dir)
|
|
cache_path = Path(cache_dir).joinpath('thefuck').as_posix()
|
|
|
|
# Ensure the cache_path exists, Python 2 does not have the exist_ok
|
|
# parameter
|
|
try:
|
|
os.makedirs(cache_dir)
|
|
except OSError:
|
|
if not os.path.isdir(cache_dir):
|
|
raise
|
|
|
|
return cache_path
|
|
|
|
@decorator
|
|
def _cache(fn, *args, **kwargs):
|
|
if cache.disabled:
|
|
return fn(*args, **kwargs)
|
|
|
|
# A bit obscure, but simplest way to generate unique key for
|
|
# functions and methods in python 2 and 3:
|
|
key = '{}.{}'.format(fn.__module__, repr(fn).split('at')[0])
|
|
|
|
etag = '.'.join(_get_mtime(name) for name in depends_on)
|
|
cache_path = _get_cache_path()
|
|
|
|
try:
|
|
with closing(shelve.open(cache_path)) as db:
|
|
if db.get(key, {}).get('etag') == etag:
|
|
return db[key]['value']
|
|
else:
|
|
value = fn(*args, **kwargs)
|
|
db[key] = {'etag': etag, 'value': value}
|
|
return value
|
|
except dbm.error:
|
|
# Caused when going from Python 2 to Python 3
|
|
warn("Removing possibly out-dated cache")
|
|
os.remove(cache_path)
|
|
|
|
with closing(shelve.open(cache_path)) as db:
|
|
value = fn(*args, **kwargs)
|
|
db[key] = {'etag': etag, 'value': value}
|
|
return value
|
|
|
|
return _cache
|
|
cache.disabled = False
|
|
|
|
|
|
def compatibility_call(fn, *args):
|
|
"""Special call for compatibility with user-defined old-style rules
|
|
with `settings` param.
|
|
|
|
"""
|
|
fn_args_count = len(getargspec(fn).args)
|
|
if fn.__name__ in ('match', 'get_new_command') and fn_args_count == 2:
|
|
warn("Two arguments `{}` from rule `{}` is deprecated, please "
|
|
"remove `settings` argument and use "
|
|
"`from thefuck.conf import settings` instead."
|
|
.format(fn.__name__, fn.__module__))
|
|
args += (settings,)
|
|
if fn.__name__ == 'side_effect' and fn_args_count == 3:
|
|
warn("Three arguments `side_effect` from rule `{}` is deprecated, "
|
|
"please remove `settings` argument and use `from thefuck.conf "
|
|
"import settings` instead."
|
|
.format(fn.__name__, fn.__module__))
|
|
args += (settings,)
|
|
return fn(*args)
|
|
|
|
|
|
def get_installation_info():
|
|
return pkg_resources.require('thefuck')[0]
|