mirror of
https://github.com/ARM-software/devlib.git
synced 2025-04-04 17:00:03 +01:00
671 lines
22 KiB
Python
671 lines
22 KiB
Python
# Copyright 2013-2018 ARM Limited
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
|
|
"""
|
|
Miscellaneous functions that don't fit anywhere else.
|
|
|
|
"""
|
|
from __future__ import division
|
|
from functools import partial, reduce
|
|
from itertools import groupby
|
|
from operator import itemgetter
|
|
|
|
import ctypes
|
|
import logging
|
|
import os
|
|
import pkgutil
|
|
import random
|
|
import re
|
|
import signal
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import wrapt
|
|
|
|
|
|
from past.builtins import basestring
|
|
|
|
# pylint: disable=redefined-builtin
|
|
from devlib.exception import HostError, TimeoutError
|
|
|
|
|
|
# ABI --> architectures list
|
|
ABI_MAP = {
|
|
'armeabi': ['armeabi', 'armv7', 'armv7l', 'armv7el', 'armv7lh', 'armeabi-v7a'],
|
|
'arm64': ['arm64', 'armv8', 'arm64-v8a', 'aarch64'],
|
|
}
|
|
|
|
# Vendor ID --> CPU part ID --> CPU variant ID --> Core Name
|
|
# None means variant is not used.
|
|
CPU_PART_MAP = {
|
|
0x41: { # ARM
|
|
0x926: {None: 'ARM926'},
|
|
0x946: {None: 'ARM946'},
|
|
0x966: {None: 'ARM966'},
|
|
0xb02: {None: 'ARM11MPCore'},
|
|
0xb36: {None: 'ARM1136'},
|
|
0xb56: {None: 'ARM1156'},
|
|
0xb76: {None: 'ARM1176'},
|
|
0xc05: {None: 'A5'},
|
|
0xc07: {None: 'A7'},
|
|
0xc08: {None: 'A8'},
|
|
0xc09: {None: 'A9'},
|
|
0xc0e: {None: 'A17'},
|
|
0xc0f: {None: 'A15'},
|
|
0xc14: {None: 'R4'},
|
|
0xc15: {None: 'R5'},
|
|
0xc17: {None: 'R7'},
|
|
0xc18: {None: 'R8'},
|
|
0xc20: {None: 'M0'},
|
|
0xc60: {None: 'M0+'},
|
|
0xc21: {None: 'M1'},
|
|
0xc23: {None: 'M3'},
|
|
0xc24: {None: 'M4'},
|
|
0xc27: {None: 'M7'},
|
|
0xd01: {None: 'A32'},
|
|
0xd03: {None: 'A53'},
|
|
0xd04: {None: 'A35'},
|
|
0xd07: {None: 'A57'},
|
|
0xd08: {None: 'A72'},
|
|
0xd09: {None: 'A73'},
|
|
},
|
|
0x42: { # Broadcom
|
|
0x516: {None: 'Vulcan'},
|
|
},
|
|
0x43: { # Cavium
|
|
0x0a1: {None: 'Thunderx'},
|
|
0x0a2: {None: 'Thunderx81xx'},
|
|
},
|
|
0x4e: { # Nvidia
|
|
0x0: {None: 'Denver'},
|
|
},
|
|
0x50: { # AppliedMicro
|
|
0x0: {None: 'xgene'},
|
|
},
|
|
0x51: { # Qualcomm
|
|
0x02d: {None: 'Scorpion'},
|
|
0x04d: {None: 'MSM8960'},
|
|
0x06f: { # Krait
|
|
0x2: 'Krait400',
|
|
0x3: 'Krait450',
|
|
},
|
|
0x205: {0x1: 'KryoSilver'},
|
|
0x211: {0x1: 'KryoGold'},
|
|
0x800: {None: 'Falkor'},
|
|
},
|
|
0x53: { # Samsung LSI
|
|
0x001: {0x1: 'MongooseM1'},
|
|
},
|
|
0x56: { # Marvell
|
|
0x131: {
|
|
0x2: 'Feroceon 88F6281',
|
|
}
|
|
},
|
|
}
|
|
|
|
|
|
def get_cpu_name(implementer, part, variant):
|
|
part_data = CPU_PART_MAP.get(implementer, {}).get(part, {})
|
|
if None in part_data: # variant does not determine core Name for this vendor
|
|
name = part_data[None]
|
|
else:
|
|
name = part_data.get(variant)
|
|
return name
|
|
|
|
|
|
def preexec_function():
|
|
# Ignore the SIGINT signal by setting the handler to the standard
|
|
# signal handler SIG_IGN.
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
# Change process group in case we have to kill the subprocess and all of
|
|
# its children later.
|
|
# TODO: this is Unix-specific; would be good to find an OS-agnostic way
|
|
# to do this in case we wanna port WA to Windows.
|
|
os.setpgrp()
|
|
|
|
|
|
check_output_logger = logging.getLogger('check_output')
|
|
# Popen is not thread safe. If two threads attempt to call it at the same time,
|
|
# one may lock up. See https://bugs.python.org/issue12739.
|
|
check_output_lock = threading.Lock()
|
|
|
|
|
|
def check_output(command, timeout=None, ignore=None, inputtext=None,
|
|
combined_output=False, **kwargs):
|
|
"""This is a version of subprocess.check_output that adds a timeout parameter to kill
|
|
the subprocess if it does not return within the specified time."""
|
|
# pylint: disable=too-many-branches
|
|
if ignore is None:
|
|
ignore = []
|
|
elif isinstance(ignore, int):
|
|
ignore = [ignore]
|
|
elif not isinstance(ignore, list) and ignore != 'all':
|
|
message = 'Invalid value for ignore parameter: "{}"; must be an int or a list'
|
|
raise ValueError(message.format(ignore))
|
|
if 'stdout' in kwargs:
|
|
raise ValueError('stdout argument not allowed, it will be overridden.')
|
|
|
|
def callback(pid):
|
|
try:
|
|
check_output_logger.debug('{} timed out; sending SIGKILL'.format(pid))
|
|
os.killpg(pid, signal.SIGKILL)
|
|
except OSError:
|
|
pass # process may have already terminated.
|
|
|
|
with check_output_lock:
|
|
stderr = subprocess.STDOUT if combined_output else subprocess.PIPE
|
|
process = subprocess.Popen(command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=stderr,
|
|
stdin=subprocess.PIPE,
|
|
preexec_fn=preexec_function,
|
|
**kwargs)
|
|
|
|
if timeout:
|
|
timer = threading.Timer(timeout, callback, [process.pid, ])
|
|
timer.start()
|
|
|
|
try:
|
|
output, error = process.communicate(inputtext)
|
|
if sys.version_info[0] == 3:
|
|
# Currently errors=replace is needed as 0x8c throws an error
|
|
output = output.decode(sys.stdout.encoding or 'utf-8', "replace")
|
|
if error:
|
|
error = error.decode(sys.stderr.encoding or 'utf-8', "replace")
|
|
finally:
|
|
if timeout:
|
|
timer.cancel()
|
|
|
|
retcode = process.poll()
|
|
if retcode:
|
|
if retcode == -9: # killed, assume due to timeout callback
|
|
raise TimeoutError(command, output='\n'.join([output or '', error or '']))
|
|
elif ignore != 'all' and retcode not in ignore:
|
|
raise subprocess.CalledProcessError(retcode, command, output='\n'.join([output or '', error or '']))
|
|
return output, error
|
|
|
|
|
|
def walk_modules(path):
|
|
"""
|
|
Given package name, return a list of all modules (including submodules, etc)
|
|
in that package.
|
|
|
|
:raises HostError: if an exception is raised while trying to import one of the
|
|
modules under ``path``. The exception will have addtional
|
|
attributes set: ``module`` will be set to the qualified name
|
|
of the originating module, and ``orig_exc`` will contain
|
|
the original exception.
|
|
|
|
"""
|
|
|
|
def __try_import(path):
|
|
try:
|
|
return __import__(path, {}, {}, [''])
|
|
except Exception as e:
|
|
he = HostError('Could not load {}: {}'.format(path, str(e)))
|
|
he.module = path
|
|
he.exc_info = sys.exc_info()
|
|
he.orig_exc = e
|
|
raise he
|
|
|
|
root_mod = __try_import(path)
|
|
mods = [root_mod]
|
|
if not hasattr(root_mod, '__path__'):
|
|
# root is a module not a package -- nothing to walk
|
|
return mods
|
|
for _, name, ispkg in pkgutil.iter_modules(root_mod.__path__):
|
|
submod_path = '.'.join([path, name])
|
|
if ispkg:
|
|
mods.extend(walk_modules(submod_path))
|
|
else:
|
|
submod = __try_import(submod_path)
|
|
mods.append(submod)
|
|
return mods
|
|
|
|
|
|
def ensure_directory_exists(dirpath):
|
|
"""A filter for directory paths to ensure they exist."""
|
|
if not os.path.isdir(dirpath):
|
|
os.makedirs(dirpath)
|
|
return dirpath
|
|
|
|
|
|
def ensure_file_directory_exists(filepath):
|
|
"""
|
|
A filter for file paths to ensure the directory of the
|
|
file exists and the file can be created there. The file
|
|
itself is *not* going to be created if it doesn't already
|
|
exist.
|
|
|
|
"""
|
|
ensure_directory_exists(os.path.dirname(filepath))
|
|
return filepath
|
|
|
|
|
|
def merge_dicts(*args, **kwargs):
|
|
if not len(args) >= 2:
|
|
raise ValueError('Must specify at least two dicts to merge.')
|
|
func = partial(_merge_two_dicts, **kwargs)
|
|
return reduce(func, args)
|
|
|
|
|
|
def _merge_two_dicts(base, other, list_duplicates='all', match_types=False, # pylint: disable=R0912,R0914
|
|
dict_type=dict, should_normalize=True, should_merge_lists=True):
|
|
"""Merge dicts normalizing their keys."""
|
|
merged = dict_type()
|
|
base_keys = list(base.keys())
|
|
other_keys = list(other.keys())
|
|
norm = normalize if should_normalize else lambda x, y: x
|
|
|
|
base_only = []
|
|
other_only = []
|
|
both = []
|
|
union = []
|
|
for k in base_keys:
|
|
if k in other_keys:
|
|
both.append(k)
|
|
else:
|
|
base_only.append(k)
|
|
union.append(k)
|
|
for k in other_keys:
|
|
if k in base_keys:
|
|
union.append(k)
|
|
else:
|
|
union.append(k)
|
|
other_only.append(k)
|
|
|
|
for k in union:
|
|
if k in base_only:
|
|
merged[k] = norm(base[k], dict_type)
|
|
elif k in other_only:
|
|
merged[k] = norm(other[k], dict_type)
|
|
elif k in both:
|
|
base_value = base[k]
|
|
other_value = other[k]
|
|
base_type = type(base_value)
|
|
other_type = type(other_value)
|
|
if (match_types and (base_type != other_type) and
|
|
(base_value is not None) and (other_value is not None)):
|
|
raise ValueError('Type mismatch for {} got {} ({}) and {} ({})'.format(k, base_value, base_type,
|
|
other_value, other_type))
|
|
if isinstance(base_value, dict):
|
|
merged[k] = _merge_two_dicts(base_value, other_value, list_duplicates, match_types, dict_type)
|
|
elif isinstance(base_value, list):
|
|
if should_merge_lists:
|
|
merged[k] = _merge_two_lists(base_value, other_value, list_duplicates, dict_type)
|
|
else:
|
|
merged[k] = _merge_two_lists([], other_value, list_duplicates, dict_type)
|
|
|
|
elif isinstance(base_value, set):
|
|
merged[k] = norm(base_value.union(other_value), dict_type)
|
|
else:
|
|
merged[k] = norm(other_value, dict_type)
|
|
else: # Should never get here
|
|
raise AssertionError('Unexpected merge key: {}'.format(k))
|
|
|
|
return merged
|
|
|
|
|
|
def merge_lists(*args, **kwargs):
|
|
if not len(args) >= 2:
|
|
raise ValueError('Must specify at least two lists to merge.')
|
|
func = partial(_merge_two_lists, **kwargs)
|
|
return reduce(func, args)
|
|
|
|
|
|
def _merge_two_lists(base, other, duplicates='all', dict_type=dict): # pylint: disable=R0912
|
|
"""
|
|
Merge lists, normalizing their entries.
|
|
|
|
parameters:
|
|
|
|
:base, other: the two lists to be merged. ``other`` will be merged on
|
|
top of base.
|
|
:duplicates: Indicates the strategy of handling entries that appear
|
|
in both lists. ``all`` will keep occurrences from both
|
|
lists; ``first`` will only keep occurrences from
|
|
``base``; ``last`` will only keep occurrences from
|
|
``other``;
|
|
|
|
.. note:: duplicate entries that appear in the *same* list
|
|
will never be removed.
|
|
|
|
"""
|
|
if not isiterable(base):
|
|
base = [base]
|
|
if not isiterable(other):
|
|
other = [other]
|
|
if duplicates == 'all':
|
|
merged_list = []
|
|
for v in normalize(base, dict_type) + normalize(other, dict_type):
|
|
if not _check_remove_item(merged_list, v):
|
|
merged_list.append(v)
|
|
return merged_list
|
|
elif duplicates == 'first':
|
|
base_norm = normalize(base, dict_type)
|
|
merged_list = normalize(base, dict_type)
|
|
for v in base_norm:
|
|
_check_remove_item(merged_list, v)
|
|
for v in normalize(other, dict_type):
|
|
if not _check_remove_item(merged_list, v):
|
|
if v not in base_norm:
|
|
merged_list.append(v) # pylint: disable=no-member
|
|
return merged_list
|
|
elif duplicates == 'last':
|
|
other_norm = normalize(other, dict_type)
|
|
merged_list = []
|
|
for v in normalize(base, dict_type):
|
|
if not _check_remove_item(merged_list, v):
|
|
if v not in other_norm:
|
|
merged_list.append(v)
|
|
for v in other_norm:
|
|
if not _check_remove_item(merged_list, v):
|
|
merged_list.append(v)
|
|
return merged_list
|
|
else:
|
|
raise ValueError('Unexpected value for list duplicates argument: {}. '.format(duplicates) +
|
|
'Must be in {"all", "first", "last"}.')
|
|
|
|
|
|
def _check_remove_item(the_list, item):
|
|
"""Helper function for merge_lists that implements checking wether an items
|
|
should be removed from the list and doing so if needed. Returns ``True`` if
|
|
the item has been removed and ``False`` otherwise."""
|
|
if not isinstance(item, basestring):
|
|
return False
|
|
if not item.startswith('~'):
|
|
return False
|
|
actual_item = item[1:]
|
|
if actual_item in the_list:
|
|
del the_list[the_list.index(actual_item)]
|
|
return True
|
|
|
|
|
|
def normalize(value, dict_type=dict):
|
|
"""Normalize values. Recursively normalizes dict keys to be lower case,
|
|
no surrounding whitespace, underscore-delimited strings."""
|
|
if isinstance(value, dict):
|
|
normalized = dict_type()
|
|
for k, v in value.items():
|
|
key = k.strip().lower().replace(' ', '_')
|
|
normalized[key] = normalize(v, dict_type)
|
|
return normalized
|
|
elif isinstance(value, list):
|
|
return [normalize(v, dict_type) for v in value]
|
|
elif isinstance(value, tuple):
|
|
return tuple([normalize(v, dict_type) for v in value])
|
|
else:
|
|
return value
|
|
|
|
|
|
def convert_new_lines(text):
|
|
""" Convert new lines to a common format. """
|
|
return text.replace('\r\n', '\n').replace('\r', '\n')
|
|
|
|
|
|
def escape_quotes(text):
|
|
"""Escape quotes, and escaped quotes, in the specified text."""
|
|
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\\\'').replace('\"', '\\\"')
|
|
|
|
|
|
def escape_single_quotes(text):
|
|
"""Escape single quotes, and escaped single quotes, in the specified text."""
|
|
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\'\\\'\'')
|
|
|
|
|
|
def escape_double_quotes(text):
|
|
"""Escape double quotes, and escaped double quotes, in the specified text."""
|
|
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\"', '\\\"')
|
|
|
|
|
|
def escape_spaces(text):
|
|
"""Escape spaces in the specified text"""
|
|
return text.replace(' ', '\ ')
|
|
|
|
|
|
def getch(count=1):
|
|
"""Read ``count`` characters from standard input."""
|
|
if os.name == 'nt':
|
|
import msvcrt # pylint: disable=F0401
|
|
return ''.join([msvcrt.getch() for _ in range(count)])
|
|
else: # assume Unix
|
|
import tty # NOQA
|
|
import termios # NOQA
|
|
fd = sys.stdin.fileno()
|
|
old_settings = termios.tcgetattr(fd)
|
|
try:
|
|
tty.setraw(sys.stdin.fileno())
|
|
ch = sys.stdin.read(count)
|
|
finally:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
|
return ch
|
|
|
|
|
|
def isiterable(obj):
|
|
"""Returns ``True`` if the specified object is iterable and
|
|
*is not a string type*, ``False`` otherwise."""
|
|
return hasattr(obj, '__iter__') and not isinstance(obj, basestring)
|
|
|
|
|
|
def as_relative(path):
|
|
"""Convert path to relative by stripping away the leading '/' on UNIX or
|
|
the equivant on other platforms."""
|
|
path = os.path.splitdrive(path)[1]
|
|
return path.lstrip(os.sep)
|
|
|
|
|
|
def commonprefix(file_list, sep=os.sep):
|
|
"""
|
|
Find the lowest common base folder of a passed list of files.
|
|
"""
|
|
common_path = os.path.commonprefix(file_list)
|
|
cp_split = common_path.split(sep)
|
|
other_split = file_list[0].split(sep)
|
|
last = len(cp_split) - 1
|
|
if cp_split[last] != other_split[last]:
|
|
cp_split = cp_split[:-1]
|
|
return sep.join(cp_split)
|
|
|
|
|
|
def get_cpu_mask(cores):
|
|
"""Return a string with the hex for the cpu mask for the specified core numbers."""
|
|
mask = 0
|
|
for i in cores:
|
|
mask |= 1 << i
|
|
return '0x{0:x}'.format(mask)
|
|
|
|
|
|
def which(name):
|
|
"""Platform-independent version of UNIX which utility."""
|
|
if os.name == 'nt':
|
|
paths = os.getenv('PATH').split(os.pathsep)
|
|
exts = os.getenv('PATHEXT').split(os.pathsep)
|
|
for path in paths:
|
|
testpath = os.path.join(path, name)
|
|
if os.path.isfile(testpath):
|
|
return testpath
|
|
for ext in exts:
|
|
testpathext = testpath + ext
|
|
if os.path.isfile(testpathext):
|
|
return testpathext
|
|
return None
|
|
else: # assume UNIX-like
|
|
try:
|
|
return check_output(['which', name])[0].strip() # pylint: disable=E1103
|
|
except subprocess.CalledProcessError:
|
|
return None
|
|
|
|
|
|
# This matches most ANSI escape sequences, not just colors
|
|
_bash_color_regex = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]')
|
|
|
|
def strip_bash_colors(text):
|
|
return _bash_color_regex.sub('', text)
|
|
|
|
|
|
def get_random_string(length):
|
|
"""Returns a random ASCII string of the specified length)."""
|
|
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length))
|
|
|
|
|
|
class LoadSyntaxError(Exception):
|
|
|
|
@property
|
|
def message(self):
|
|
if self.args:
|
|
return self.args[0]
|
|
return str(self)
|
|
|
|
def __init__(self, message, filepath, lineno):
|
|
super(LoadSyntaxError, self).__init__(message)
|
|
self.filepath = filepath
|
|
self.lineno = lineno
|
|
|
|
def __str__(self):
|
|
message = 'Syntax Error in {}, line {}:\n\t{}'
|
|
return message.format(self.filepath, self.lineno, self.message)
|
|
|
|
|
|
RAND_MOD_NAME_LEN = 30
|
|
BAD_CHARS = string.punctuation + string.whitespace
|
|
# pylint: disable=no-member
|
|
if sys.version_info[0] == 3:
|
|
TRANS_TABLE = str.maketrans(BAD_CHARS, '_' * len(BAD_CHARS))
|
|
else:
|
|
TRANS_TABLE = string.maketrans(BAD_CHARS, '_' * len(BAD_CHARS))
|
|
|
|
|
|
def to_identifier(text):
|
|
"""Converts text to a valid Python identifier by replacing all
|
|
whitespace and punctuation and adding a prefix if starting with a digit"""
|
|
if text[:1].isdigit():
|
|
text = '_' + text
|
|
return re.sub('_+', '_', str(text).translate(TRANS_TABLE))
|
|
|
|
|
|
def unique(alist):
|
|
"""
|
|
Returns a list containing only unique elements from the input list (but preserves
|
|
order, unlike sets).
|
|
|
|
"""
|
|
result = []
|
|
for item in alist:
|
|
if item not in result:
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def ranges_to_list(ranges_string):
|
|
"""Converts a sysfs-style ranges string, e.g. ``"0,2-4"``, into a list ,e.g ``[0,2,3,4]``"""
|
|
values = []
|
|
for rg in ranges_string.split(','):
|
|
if '-' in rg:
|
|
first, last = list(map(int, rg.split('-')))
|
|
values.extend(range(first, last + 1))
|
|
else:
|
|
values.append(int(rg))
|
|
return values
|
|
|
|
|
|
def list_to_ranges(values):
|
|
"""Converts a list, e.g ``[0,2,3,4]``, into a sysfs-style ranges string, e.g. ``"0,2-4"``"""
|
|
range_groups = []
|
|
for _, g in groupby(enumerate(values), lambda i_x: i_x[0] - i_x[1]):
|
|
range_groups.append(list(map(itemgetter(1), g)))
|
|
range_strings = []
|
|
for group in range_groups:
|
|
if len(group) == 1:
|
|
range_strings.append(str(group[0]))
|
|
else:
|
|
range_strings.append('{}-{}'.format(group[0], group[-1]))
|
|
return ','.join(range_strings)
|
|
|
|
|
|
def list_to_mask(values, base=0x0):
|
|
"""Converts the specified list of integer values into
|
|
a bit mask for those values. Optinally, the list can be
|
|
applied to an existing mask."""
|
|
for v in values:
|
|
base |= (1 << v)
|
|
return base
|
|
|
|
|
|
def mask_to_list(mask):
|
|
"""Converts the specfied integer bitmask into a list of
|
|
indexes of bits that are set in the mask."""
|
|
size = len(bin(mask)) - 2 # because of "0b"
|
|
return [size - i - 1 for i in range(size)
|
|
if mask & (1 << size - i - 1)]
|
|
|
|
|
|
__memo_cache = {}
|
|
|
|
|
|
def reset_memo_cache():
|
|
__memo_cache.clear()
|
|
|
|
|
|
def __get_memo_id(obj):
|
|
"""
|
|
An object's id() may be re-used after an object is freed, so it's not
|
|
sufficiently unique to identify params for the memo cache (two different
|
|
params may end up with the same id). this attempts to generate a more unique
|
|
ID string.
|
|
"""
|
|
obj_id = id(obj)
|
|
try:
|
|
return '{}/{}'.format(obj_id, hash(obj))
|
|
except TypeError: # obj is not hashable
|
|
obj_pyobj = ctypes.cast(obj_id, ctypes.py_object)
|
|
# TODO: Note: there is still a possibility of a clash here. If Two
|
|
# different objects get assigned the same ID, an are large and are
|
|
# identical in the first thirty two bytes. This shouldn't be much of an
|
|
# issue in the current application of memoizing Target calls, as it's very
|
|
# unlikely that a target will get passed large params; but may cause
|
|
# problems in other applications, e.g. when memoizing results of operations
|
|
# on large arrays. I can't really think of a good way around that apart
|
|
# form, e.g., md5 hashing the entire raw object, which will have an
|
|
# undesirable impact on performance.
|
|
num_bytes = min(ctypes.sizeof(obj_pyobj), 32)
|
|
obj_bytes = ctypes.string_at(ctypes.addressof(obj_pyobj), num_bytes)
|
|
return '{}/{}'.format(obj_id, obj_bytes)
|
|
|
|
|
|
@wrapt.decorator
|
|
def memoized(wrapped, instance, args, kwargs): # pylint: disable=unused-argument
|
|
"""
|
|
A decorator for memoizing functions and methods.
|
|
|
|
.. warning:: this may not detect changes to mutable types. As long as the
|
|
memoized function was used with an object as an argument
|
|
before, the cached result will be returned, even if the
|
|
structure of the object (e.g. a list) has changed in the mean time.
|
|
|
|
"""
|
|
func_id = repr(wrapped)
|
|
|
|
def memoize_wrapper(*args, **kwargs):
|
|
id_string = func_id + ','.join([__get_memo_id(a) for a in args])
|
|
id_string += ','.join('{}={}'.format(k, v)
|
|
for k, v in kwargs.items())
|
|
if id_string not in __memo_cache:
|
|
__memo_cache[id_string] = wrapped(*args, **kwargs)
|
|
return __memo_cache[id_string]
|
|
|
|
return memoize_wrapper(*args, **kwargs)
|