1
0
mirror of https://github.com/nvbn/thefuck.git synced 2025-03-14 22:58:31 +00:00
thefuck/thefuck/utils.py

338 lines
9.0 KiB
Python
Raw Normal View History

2017-10-10 08:30:26 +02:00
import atexit
import os
2015-05-22 17:07:01 +03:00
import pickle
2015-07-16 20:23:31 +02:00
import re
import shelve
2018-10-18 00:35:18 +02:00
import sys
import six
from decorator import decorator
from difflib import get_close_matches as difflib_get_close_matches
from functools import wraps
2018-10-18 00:35:18 +02:00
from .logs import warn, exception
2016-08-14 15:15:03 +03:00
from .conf import settings
from .system import Path
2015-05-06 13:17:14 +02:00
DEVNULL = open(os.devnull, 'w')
if six.PY2:
import anydbm
shelve_open_error = anydbm.error
else:
import dbm
shelve_open_error = dbm.error
2015-05-06 13:17:14 +02:00
def memoize(fn):
"""Caches previous calls to the function."""
memo = {}
@wraps(fn)
def wrapper(*args, **kwargs):
if not memoize.disabled:
key = pickle.dumps((args, kwargs))
if key not in memo:
memo[key] = fn(*args, **kwargs)
value = memo[key]
else:
# Memoize is disabled, call the function
value = fn(*args, **kwargs)
return value
return wrapper
memoize.disabled = False
@memoize
def which(program):
"""Returns `program` path or `None`."""
2016-02-06 16:42:42 +03:00
try:
from shutil import which
return which(program)
except ImportError:
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
2016-02-06 16:42:42 +03:00
return None
2015-09-07 12:12:16 +03:00
def default_settings(params):
"""Adds default values to settings if it not presented.
Usage:
2015-09-07 12:12:16 +03:00
@default_settings({'apt': '/usr/bin/apt'})
2017-09-02 09:30:03 +02:00
def match(command):
print(settings.apt)
"""
def _default_settings(fn, command):
2015-09-07 12:12:16 +03:00
for k, w in params.items():
settings.setdefault(k, w)
return fn(command)
return decorator(_default_settings)
def get_closest(word, possibilities, cutoff=0.6, fallback_to_first=True):
"""Returns closest match or just first from possibilities."""
possibilities = list(possibilities)
try:
return difflib_get_close_matches(word, possibilities, 1, cutoff)[0]
except IndexError:
if fallback_to_first:
return possibilities[0]
def get_close_matches(word, possibilities, n=None, cutoff=0.6):
"""Overrides `difflib.get_close_match` to control argument `n`."""
if n is None:
n = settings.num_close_matches
return difflib_get_close_matches(word, possibilities, n, cutoff)
@memoize
def get_all_executables():
2016-01-29 13:09:40 +03:00
from thefuck.shells import shell
def _safe(fn, fallback):
try:
return fn()
except OSError:
return fallback
tf_alias = get_alias()
tf_entry_points = ['thefuck', 'fuck']
bins = [exe.name.decode('utf8') if six.PY2 else exe.name
for path in os.environ.get('PATH', '').split(os.pathsep)
for exe in _safe(lambda: list(Path(path).iterdir()), [])
2015-09-06 13:37:48 +03:00
if not _safe(exe.is_dir, True)
and exe.name not in tf_entry_points]
aliases = [alias.decode('utf8') if six.PY2 else alias
for alias in shell.get_aliases() if alias != tf_alias]
2015-09-06 13:37:48 +03:00
return bins + aliases
2015-07-24 00:39:56 +03:00
def replace_argument(script, from_, to):
"""Replaces command line argument."""
replaced_in_the_end = re.sub(u' {}$'.format(re.escape(from_)), u' {}'.format(to),
2015-07-24 00:39:56 +03:00
script, count=1)
if replaced_in_the_end != script:
return replaced_in_the_end
else:
return script.replace(
u' {} '.format(from_), u' {} '.format(to), 1)
2015-08-27 16:52:26 +03:00
@decorator
def eager(fn, *args, **kwargs):
return list(fn(*args, **kwargs))
@eager
def get_all_matched_commands(stderr, separator='Did you mean'):
if not isinstance(separator, list):
separator = [separator]
should_yield = False
for line in stderr.split('\n'):
for sep in separator:
if sep in line:
should_yield = True
break
else:
if should_yield and line:
yield line.strip()
def replace_command(command, broken, matched):
"""Helper for *_no_command rules."""
new_cmds = get_close_matches(broken, matched, cutoff=0.1)
return [replace_argument(command.script, broken, new_cmd.strip())
for new_cmd in new_cmds]
2015-08-27 16:08:29 +03:00
@memoize
def is_app(command, *app_names, **kwargs):
2015-08-27 16:08:29 +03:00
"""Returns `True` if command is call to one of passed app names."""
at_least = kwargs.pop('at_least', 0)
if kwargs:
raise TypeError("got an unexpected keyword argument '{}'".format(kwargs.keys()))
if len(command.script_parts) > at_least:
2015-10-29 00:13:59 +08:00
return command.script_parts[0] in app_names
2015-08-27 16:08:29 +03:00
return False
def for_app(*app_names, **kwargs):
2015-08-27 16:08:29 +03:00
"""Specifies that matching script is for on of app names."""
def _for_app(fn, command):
if is_app(command, *app_names, **kwargs):
return fn(command)
2015-08-27 16:52:26 +03:00
else:
return False
return decorator(_for_app)
2015-09-02 11:10:03 +03:00
2017-10-10 08:30:26 +02:00
class Cache(object):
"""Lazy read cache and save changes at exit."""
2017-10-10 08:30:26 +02:00
def __init__(self):
self._db = None
2017-10-10 08:30:26 +02:00
def _init_db(self):
2018-10-18 00:35:18 +02:00
try:
self._setup_db()
except Exception:
exception("Unable to init cache", sys.exc_info())
self._db = {}
def _setup_db(self):
2017-10-10 08:30:26 +02:00
cache_dir = self._get_cache_dir()
cache_path = Path(cache_dir).joinpath('thefuck').as_posix()
2017-10-10 08:30:26 +02:00
try:
self._db = shelve.open(cache_path)
2017-10-10 19:19:54 +02:00
except shelve_open_error + (ImportError,):
2017-10-10 08:30:26 +02:00
# Caused when switching between Python versions
warn("Removing possibly out-dated cache")
os.remove(cache_path)
self._db = shelve.open(cache_path)
2017-10-10 08:30:26 +02:00
atexit.register(self._db.close)
2015-09-02 11:10:03 +03:00
2017-10-10 08:30:26 +02:00
def _get_cache_dir(self):
default_xdg_cache_dir = os.path.expanduser("~/.cache")
cache_dir = os.getenv("XDG_CACHE_HOME", default_xdg_cache_dir)
2015-09-02 11:10:03 +03:00
2017-10-10 08:30:26 +02:00
# Ensure the cache_path exists, Python 2 does not have the exist_ok
# parameter
try:
os.makedirs(cache_dir)
except OSError:
if not os.path.isdir(cache_dir):
raise
2015-09-02 11:10:03 +03:00
2017-10-10 08:30:26 +02:00
return cache_dir
def _get_mtime(self, path):
2015-09-02 11:10:03 +03:00
try:
return str(os.path.getmtime(path))
except OSError:
return '0'
2017-10-10 08:30:26 +02:00
def _get_key(self, fn, depends_on, args, kwargs):
parts = (fn.__module__, repr(fn).split('at')[0],
depends_on, args, kwargs)
2017-10-10 19:14:42 +02:00
return str(pickle.dumps(parts))
2015-10-21 18:13:22 +08:00
2017-10-10 08:30:26 +02:00
def get_value(self, fn, depends_on, args, kwargs):
if self._db is None:
self._init_db()
2015-09-02 11:10:03 +03:00
2017-10-10 08:30:26 +02:00
depends_on = [Path(name).expanduser().absolute().as_posix()
for name in depends_on]
key = self._get_key(fn, depends_on, args, kwargs)
etag = '.'.join(self._get_mtime(path) for path in depends_on)
if self._db.get(key, {}).get('etag') == etag:
return self._db[key]['value']
else:
value = fn(*args, **kwargs)
self._db[key] = {'etag': etag, 'value': value}
return value
2015-09-02 11:10:03 +03:00
2017-10-10 08:30:26 +02:00
_cache = Cache()
2017-10-10 08:30:26 +02:00
def cache(*depends_on):
"""Caches function result in temporary file.
Cache will be expired when modification date of files from `depends_on`
will be changed.
2017-10-10 19:15:36 +02:00
Only functions should be wrapped in `cache`, not methods.
2017-10-10 08:30:26 +02:00
"""
def cache_decorator(fn):
@memoize
@wraps(fn)
def wrapper(*args, **kwargs):
if cache.disabled:
return fn(*args, **kwargs)
else:
return _cache.get_value(fn, depends_on, args, kwargs)
return wrapper
return cache_decorator
2015-09-02 11:10:03 +03:00
cache.disabled = False
2015-09-08 15:24:49 +03:00
def get_installation_info():
import pkg_resources
2015-09-08 15:24:49 +03:00
return pkg_resources.require('thefuck')[0]
def get_alias():
return os.environ.get('TF_ALIAS', 'fuck')
@memoize
def get_valid_history_without_current(command):
def _not_corrected(history, tf_alias):
"""Returns all lines from history except that comes before `fuck`."""
previous = None
for line in history:
if previous is not None and line != tf_alias:
yield previous
previous = line
if history:
yield history[-1]
from thefuck.shells import shell
history = shell.get_history()
tf_alias = get_alias()
2017-03-13 19:05:34 +01:00
executables = set(get_all_executables())\
.union(shell.get_builtin_commands())
return [line for line in _not_corrected(history, tf_alias)
if not line.startswith(tf_alias) and not line == command.script
and line.split(' ')[0] in executables]
def format_raw_script(raw_script):
"""Creates single script from a list of script parts.
:type raw_script: [basestring]
:rtype: basestring
"""
if six.PY2:
script = ' '.join(arg.decode('utf-8') for arg in raw_script)
else:
script = ' '.join(raw_script)
2017-08-26 06:29:38 +02:00
return script.strip()