1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-09-02 03:12:34 +01:00

WA3 Exsisting Code

This commit is contained in:
Marc Bonnici
2017-02-21 13:37:11 +00:00
parent 067f76adf3
commit 1f1f2b12c6
54 changed files with 9239 additions and 0 deletions

0
wa/utils/__init__.py Normal file
View File

46
wa/utils/counter.py Normal file
View File

@@ -0,0 +1,46 @@
"""
An auto incremeting value (kind of like an AUTO INCREMENT field in SQL).
Optionally, the name of the counter to be used is specified (each counter
increments separately).
Counts start at 1, not 0.
"""
from collections import defaultdict
__all__ = [
'next',
'reset',
'reset_all',
'counter',
]
__counters = defaultdict(int)
def next(name=None):
__counters[name] += 1
value = __counters[name]
return value
def reset_all(value=0):
for k in __counters:
reset(k, value)
def reset(name=None, value=0):
__counters[name] = value
class counter(object):
def __init__(self, name):
self.name = name
def next(self):
return next(self.name)
def reset(self, value=0):
return reset(self.name, value)

81
wa/utils/diff.py Normal file
View File

@@ -0,0 +1,81 @@
from wa.utils.misc import write_table
def diff_interrupt_files(before, after, result): # pylint: disable=R0914
output_lines = []
with open(before) as bfh:
with open(after) as ofh:
for bline, aline in izip(bfh, ofh):
bchunks = bline.strip().split()
while True:
achunks = aline.strip().split()
if achunks[0] == bchunks[0]:
diffchunks = ['']
diffchunks.append(achunks[0])
diffchunks.extend([diff_tokens(b, a) for b, a
in zip(bchunks[1:], achunks[1:])])
output_lines.append(diffchunks)
break
else: # new category appeared in the after file
diffchunks = ['>'] + achunks
output_lines.append(diffchunks)
try:
aline = ofh.next()
except StopIteration:
break
# Offset heading columns by one to allow for row labels on subsequent
# lines.
output_lines[0].insert(0, '')
# Any "columns" that do not have headings in the first row are not actually
# columns -- they are a single column where space-spearated words got
# split. Merge them back together to prevent them from being
# column-aligned by write_table.
table_rows = [output_lines[0]]
num_cols = len(output_lines[0])
for row in output_lines[1:]:
table_row = row[:num_cols]
table_row.append(' '.join(row[num_cols:]))
table_rows.append(table_row)
with open(result, 'w') as wfh:
write_table(table_rows, wfh)
def diff_sysfs_dirs(before, after, result): # pylint: disable=R0914
before_files = []
os.path.walk(before,
lambda arg, dirname, names: arg.extend([os.path.join(dirname, f) for f in names]),
before_files
)
before_files = filter(os.path.isfile, before_files)
files = [os.path.relpath(f, before) for f in before_files]
after_files = [os.path.join(after, f) for f in files]
diff_files = [os.path.join(result, f) for f in files]
for bfile, afile, dfile in zip(before_files, after_files, diff_files):
if not os.path.isfile(afile):
logger.debug('sysfs_diff: {} does not exist or is not a file'.format(afile))
continue
with open(bfile) as bfh, open(afile) as afh: # pylint: disable=C0321
with open(_f(dfile), 'w') as dfh:
for i, (bline, aline) in enumerate(izip_longest(bfh, afh), 1):
if aline is None:
logger.debug('Lines missing from {}'.format(afile))
break
bchunks = re.split(r'(\W+)', bline)
achunks = re.split(r'(\W+)', aline)
if len(bchunks) != len(achunks):
logger.debug('Token length mismatch in {} on line {}'.format(bfile, i))
dfh.write('xxx ' + bline)
continue
if ((len([c for c in bchunks if c.strip()]) == len([c for c in achunks if c.strip()]) == 2) and
(bchunks[0] == achunks[0])):
# if there are only two columns and the first column is the
# same, assume it's a "header" column and do not diff it.
dchunks = [bchunks[0]] + [diff_tokens(b, a) for b, a in zip(bchunks[1:], achunks[1:])]
else:
dchunks = [diff_tokens(b, a) for b, a in zip(bchunks, achunks)]
dfh.write(''.join(dchunks))

307
wa/utils/doc.py Normal file
View File

@@ -0,0 +1,307 @@
# Copyright 2014-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Utilities for working with and formatting documentation.
"""
import os
import re
import inspect
from itertools import cycle
USER_HOME = os.path.expanduser('~')
BULLET_CHARS = '-*'
def get_summary(aclass):
"""
Returns the summary description for an extension class. The summary is the
first paragraph (separated by blank line) of the description taken either from
the ``descripton`` attribute of the class, or if that is not present, from the
class' docstring.
"""
return get_description(aclass).split('\n\n')[0]
def get_description(aclass):
"""
Return the description of the specified extension class. The description is taken
either from ``description`` attribute of the class or its docstring.
"""
if hasattr(aclass, 'description') and aclass.description:
return inspect.cleandoc(aclass.description)
if aclass.__doc__:
return inspect.getdoc(aclass)
else:
return 'no documentation found for {}'.format(aclass.__name__)
def get_type_name(obj):
"""Returns the name of the type object or function specified. In case of a lambda,
the definiition is returned with the parameter replaced by "value"."""
match = re.search(r"<(type|class|function) '?(.*?)'?>", str(obj))
if isinstance(obj, tuple):
name = obj[1]
elif match.group(1) == 'function':
text = str(obj)
name = text.split()[1]
if name == '<lambda>':
source = inspect.getsource(obj).strip().replace('\n', ' ')
match = re.search(r'lambda\s+(\w+)\s*:\s*(.*?)\s*[\n,]', source)
if not match:
raise ValueError('could not get name for {}'.format(obj))
name = match.group(2).replace(match.group(1), 'value')
else:
name = match.group(2)
if '.' in name:
name = name.split('.')[-1]
return name
def count_leading_spaces(text):
"""
Counts the number of leading space characters in a string.
TODO: may need to update this to handle whitespace, but shouldn't
be necessary as there should be no tabs in Python source.
"""
nspaces = 0
for c in text:
if c == ' ':
nspaces += 1
else:
break
return nspaces
def format_column(text, width):
"""
Formats text into a column of specified width. If a line is too long,
it will be broken on a word boundary. The new lines will have the same
number of leading spaces as the original line.
Note: this will not attempt to join up lines that are too short.
"""
formatted = []
for line in text.split('\n'):
line_len = len(line)
if line_len <= width:
formatted.append(line)
else:
words = line.split(' ')
new_line = words.pop(0)
while words:
next_word = words.pop(0)
if (len(new_line) + len(next_word) + 1) < width:
new_line += ' ' + next_word
else:
formatted.append(new_line)
new_line = ' ' * count_leading_spaces(new_line) + next_word
formatted.append(new_line)
return '\n'.join(formatted)
def format_bullets(text, width, char='-', shift=3, outchar=None):
"""
Formats text into bulleted list. Assumes each line of input that starts with
``char`` (possibly preceeded with whitespace) is a new bullet point. Note: leading
whitespace in the input will *not* be preserved. Instead, it will be determined by
``shift`` parameter.
:text: the text to be formated
:width: format width (note: must be at least ``shift`` + 4).
:char: character that indicates a new bullet point in the input text.
:shift: How far bulleted entries will be indented. This indicates the indentation
level of the bullet point. Text indentation level will be ``shift`` + 3.
:outchar: character that will be used to mark bullet points in the output. If
left as ``None``, ``char`` will be used.
"""
bullet_lines = []
output = ''
def __process_bullet(bullet_lines):
if bullet_lines:
bullet = format_paragraph(indent(' '.join(bullet_lines), shift + 2), width)
bullet = bullet[:3] + outchar + bullet[4:]
del bullet_lines[:]
return bullet + '\n'
else:
return ''
if outchar is None:
outchar = char
for line in text.split('\n'):
line = line.strip()
if line.startswith(char): # new bullet
output += __process_bullet(bullet_lines)
line = line[1:].strip()
bullet_lines.append(line)
output += __process_bullet(bullet_lines)
return output
def format_simple_table(rows, headers=None, align='>', show_borders=True, borderchar='='): # pylint: disable=R0914
"""Formats a simple table."""
if not rows:
return ''
rows = [map(str, r) for r in rows]
num_cols = len(rows[0])
# cycle specified alignments until we have num_cols of them. This is
# consitent with how such cases are handled in R, pandas, etc.
it = cycle(align)
align = [it.next() for _ in xrange(num_cols)]
cols = zip(*rows)
col_widths = [max(map(len, c)) for c in cols]
if headers:
col_widths = [max(len(h), cw) for h, cw in zip(headers, col_widths)]
row_format = ' '.join(['{:%s%s}' % (align[i], w) for i, w in enumerate(col_widths)])
row_format += '\n'
border = row_format.format(*[borderchar * cw for cw in col_widths])
result = border if show_borders else ''
if headers:
result += row_format.format(*headers)
result += border
for row in rows:
result += row_format.format(*row)
if show_borders:
result += border
return result
def format_paragraph(text, width):
"""
Format the specified text into a column of specified with. The text is
assumed to be a single paragraph and existing line breaks will not be preserved.
Leading spaces (of the initial line), on the other hand, will be preserved.
"""
text = re.sub('\n\n*\\s*', ' ', text.strip('\n'))
return format_column(text, width)
def format_body(text, width):
"""
Format the specified text into a column of specified width. The text is
assumed to be a "body" of one or more paragraphs separated by one or more
blank lines. The initial indentation of the first line of each paragraph
will be presevered, but any other formatting may be clobbered.
"""
text = re.sub('\n\\s*\n', '\n\n', text.strip('\n')) # get rid of all-whitespace lines
paragraphs = re.split('\n\n+', text)
formatted_paragraphs = []
for p in paragraphs:
if p.strip() and p.strip()[0] in BULLET_CHARS:
formatted_paragraphs.append(format_bullets(p, width))
else:
formatted_paragraphs.append(format_paragraph(p, width))
return '\n\n'.join(formatted_paragraphs)
def strip_inlined_text(text):
"""
This function processes multiline inlined text (e.g. form docstrings)
to strip away leading spaces and leading and trailing new lines.
"""
text = text.strip('\n')
lines = [ln.rstrip() for ln in text.split('\n')]
# first line is special as it may not have the indet that follows the
# others, e.g. if it starts on the same as the multiline quote (""").
nspaces = count_leading_spaces(lines[0])
if len([ln for ln in lines if ln]) > 1:
to_strip = min(count_leading_spaces(ln) for ln in lines[1:] if ln)
if nspaces >= to_strip:
stripped = [lines[0][to_strip:]]
else:
stripped = [lines[0][nspaces:]]
stripped += [ln[to_strip:] for ln in lines[1:]]
else:
stripped = [lines[0][nspaces:]]
return '\n'.join(stripped).strip('\n')
def indent(text, spaces=4):
"""Indent the lines i the specified text by ``spaces`` spaces."""
indented = []
for line in text.split('\n'):
if line:
indented.append(' ' * spaces + line)
else: # do not indent emtpy lines
indented.append(line)
return '\n'.join(indented)
def format_literal(lit):
if isinstance(lit, basestring):
return '``\'{}\'``'.format(lit)
elif hasattr(lit, 'pattern'): # regex
return '``r\'{}\'``'.format(lit.pattern)
else:
return '``{}``'.format(lit)
def get_params_rst(ext):
text = ''
for param in ext.parameters:
text += '{} : {} {}\n'.format(param.name, get_type_name(param.kind),
param.mandatory and '(mandatory)' or ' ')
desc = strip_inlined_text(param.description or '')
text += indent('{}\n'.format(desc))
if param.allowed_values:
text += indent('\nallowed values: {}\n'.format(', '.join(map(format_literal, param.allowed_values))))
elif param.constraint:
text += indent('\nconstraint: ``{}``\n'.format(get_type_name(param.constraint)))
if param.default:
value = param.default
if isinstance(value, basestring) and value.startswith(USER_HOME):
value = value.replace(USER_HOME, '~')
text += indent('\ndefault: {}\n'.format(format_literal(value)))
text += '\n'
return text
def underline(text, symbol='='):
return '{}\n{}\n\n'.format(text, symbol * len(text))
def get_rst_from_extension(ext):
text = underline(ext.name, '-')
if hasattr(ext, 'description'):
desc = strip_inlined_text(ext.description or '')
elif ext.__doc__:
desc = strip_inlined_text(ext.__doc__)
else:
desc = ''
text += desc + '\n\n'
params_rst = get_params_rst(ext)
if params_rst:
text += underline('parameters', '~') + params_rst
return text + '\n'

643
wa/utils/misc.py Normal file
View File

@@ -0,0 +1,643 @@
# Copyright 2013-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Miscellaneous functions that don't fit anywhere else.
"""
from __future__ import division
import os
import sys
import re
import math
import imp
import uuid
import string
import threading
import signal
import subprocess
import pkgutil
import traceback
import logging
import random
from datetime import datetime, timedelta
from operator import mul, itemgetter
from StringIO import StringIO
from itertools import cycle, groupby
from distutils.spawn import find_executable
import yaml
from dateutil import tz
from wa.framework.version import get_wa_version
# ABI --> architectures list
ABI_MAP = {
'armeabi': ['armeabi', 'armv7', 'armv7l', 'armv7el', 'armv7lh'],
'arm64': ['arm64', 'armv8', 'arm64-v8a'],
}
def preexec_function():
# Ignore the SIGINT signal by setting the handler to the standard
# signal handler SIG_IGN.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Change process group in case we have to kill the subprocess and all of
# its children later.
# TODO: this is Unix-specific; would be good to find an OS-agnostic way
# to do this in case we wanna port WA to Windows.
os.setpgrp()
check_output_logger = logging.getLogger('check_output')
# Defined here rather than in wlauto.exceptions due to module load dependencies
class TimeoutError(Exception):
"""Raised when a subprocess command times out. This is basically a ``WAError``-derived version
of ``subprocess.CalledProcessError``, the thinking being that while a timeout could be due to
programming error (e.g. not setting long enough timers), it is often due to some failure in the
environment, and there fore should be classed as a "user error"."""
def __init__(self, command, output):
super(TimeoutError, self).__init__('Timed out: {}'.format(command))
self.command = command
self.output = output
def __str__(self):
return '\n'.join([self.message, 'OUTPUT:', self.output or ''])
def check_output(command, timeout=None, ignore=None, **kwargs):
"""This is a version of subprocess.check_output that adds a timeout parameter to kill
the subprocess if it does not return within the specified time."""
# pylint: disable=too-many-branches
if ignore is None:
ignore = []
elif isinstance(ignore, int):
ignore = [ignore]
elif not isinstance(ignore, list) and ignore != 'all':
message = 'Invalid value for ignore parameter: "{}"; must be an int or a list'
raise ValueError(message.format(ignore))
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
def callback(pid):
try:
check_output_logger.debug('{} timed out; sending SIGKILL'.format(pid))
os.killpg(pid, signal.SIGKILL)
except OSError:
pass # process may have already terminated.
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=preexec_function, **kwargs)
if timeout:
timer = threading.Timer(timeout, callback, [process.pid, ])
timer.start()
try:
output, error = process.communicate()
finally:
if timeout:
timer.cancel()
retcode = process.poll()
if retcode:
if retcode == -9: # killed, assume due to timeout callback
raise TimeoutError(command, output='\n'.join([output, error]))
elif ignore != 'all' and retcode not in ignore:
raise subprocess.CalledProcessError(retcode, command, output='\n'.join([output, error]))
return output, error
def init_argument_parser(parser):
parser.add_argument('-c', '--config', help='specify an additional config.py')
parser.add_argument('-v', '--verbose', action='count',
help='The scripts will produce verbose output.')
parser.add_argument('--debug', action='store_true',
help='Enable debug mode. Note: this implies --verbose.')
parser.add_argument('--version', action='version', version='%(prog)s {}'.format(get_wa_version()))
return parser
def walk_modules(path):
"""
Given a path to a Python package, iterate over all the modules and
sub-packages in that package.
"""
try:
root_mod = __import__(path, {}, {}, [''])
yield root_mod
except ImportError as e:
e.path = path
raise e
if not hasattr(root_mod, '__path__'): # module, not package
return
for _, name, ispkg in pkgutil.iter_modules(root_mod.__path__):
try:
submod_path = '.'.join([path, name])
if ispkg:
for submod in walk_modules(submod_path):
yield submod
else:
yield __import__(submod_path, {}, {}, [''])
except ImportError as e:
e.path = submod_path
raise e
def ensure_directory_exists(dirpath):
"""A filter for directory paths to ensure they exist."""
if not os.path.isdir(dirpath):
os.makedirs(dirpath)
return dirpath
def ensure_file_directory_exists(filepath):
"""
A filter for file paths to ensure the directory of the
file exists and the file can be created there. The file
itself is *not* going to be created if it doesn't already
exist.
"""
ensure_directory_exists(os.path.dirname(filepath))
return filepath
def diff_tokens(before_token, after_token):
"""
Creates a diff of two tokens.
If the two tokens are the same it just returns returns the token
(whitespace tokens are considered the same irrespective of type/number
of whitespace characters in the token).
If the tokens are numeric, the difference between the two values
is returned.
Otherwise, a string in the form [before -> after] is returned.
"""
if before_token.isspace() and after_token.isspace():
return after_token
elif before_token.isdigit() and after_token.isdigit():
try:
diff = int(after_token) - int(before_token)
return str(diff)
except ValueError:
return "[%s -> %s]" % (before_token, after_token)
elif before_token == after_token:
return after_token
else:
return "[%s -> %s]" % (before_token, after_token)
def prepare_table_rows(rows):
"""Given a list of lists, make sure they are prepared to be formatted into a table
by making sure each row has the same number of columns and stringifying all values."""
rows = [map(str, r) for r in rows]
max_cols = max(map(len, rows))
for row in rows:
pad = max_cols - len(row)
for _ in xrange(pad):
row.append('')
return rows
def write_table(rows, wfh, align='>', headers=None): # pylint: disable=R0914
"""Write a column-aligned table to the specified file object."""
if not rows:
return
rows = prepare_table_rows(rows)
num_cols = len(rows[0])
# cycle specified alignments until we have max_cols of them. This is
# consitent with how such cases are handled in R, pandas, etc.
it = cycle(align)
align = [it.next() for _ in xrange(num_cols)]
cols = zip(*rows)
col_widths = [max(map(len, c)) for c in cols]
row_format = ' '.join(['{:%s%s}' % (align[i], w) for i, w in enumerate(col_widths)])
row_format += '\n'
if headers:
wfh.write(row_format.format(*headers))
underlines = ['-' * len(h) for h in headers]
wfh.write(row_format.format(*underlines))
for row in rows:
wfh.write(row_format.format(*row))
def get_null():
"""Returns the correct null sink based on the OS."""
return 'NUL' if os.name == 'nt' else '/dev/null'
def get_traceback(exc=None):
"""
Returns the string with the traceback for the specifiec exc
object, or for the current exception exc is not specified.
"""
if exc is None:
exc = sys.exc_info()
if not exc:
return None
tb = exc[2]
sio = StringIO()
traceback.print_tb(tb, file=sio)
del tb # needs to be done explicitly see: http://docs.python.org/2/library/sys.html#sys.exc_info
return sio.getvalue()
def normalize(value, dict_type=dict):
"""Normalize values. Recursively normalizes dict keys to be lower case,
no surrounding whitespace, underscore-delimited strings."""
if isinstance(value, dict):
normalized = dict_type()
for k, v in value.iteritems():
if isinstance(k, basestring):
k = k.strip().lower().replace(' ', '_')
normalized[k] = normalize(v, dict_type)
return normalized
elif isinstance(value, list):
return [normalize(v, dict_type) for v in value]
elif isinstance(value, tuple):
return tuple([normalize(v, dict_type) for v in value])
else:
return value
VALUE_REGEX = re.compile(r'(\d+(?:\.\d+)?)\s*(\w*)')
UNITS_MAP = {
's': 'seconds',
'ms': 'milliseconds',
'us': 'microseconds',
'ns': 'nanoseconds',
'V': 'volts',
'A': 'amps',
'mA': 'milliamps',
'J': 'joules',
}
def parse_value(value_string):
"""parses a string representing a numerical value and returns
a tuple (value, units), where value will be either int or float,
and units will be a string representing the units or None."""
match = VALUE_REGEX.search(value_string)
if match:
vs = match.group(1)
value = float(vs) if '.' in vs else int(vs)
us = match.group(2)
units = UNITS_MAP.get(us, us)
return (value, units)
else:
return (value_string, None)
def get_meansd(values):
"""Returns mean and standard deviation of the specified values."""
if not values:
return float('nan'), float('nan')
mean = sum(values) / len(values)
sd = math.sqrt(sum([(v - mean) ** 2 for v in values]) / len(values))
return mean, sd
def geomean(values):
"""Returns the geometric mean of the values."""
return reduce(mul, values) ** (1.0 / len(values))
def capitalize(text):
"""Capitalises the specified text: first letter upper case,
all subsequent letters lower case."""
if not text:
return ''
return text[0].upper() + text[1:].lower()
def convert_new_lines(text):
""" Convert new lines to a common format. """
return text.replace('\r\n', '\n').replace('\r', '\n')
def escape_quotes(text):
"""Escape quotes, and escaped quotes, in the specified text."""
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\\\'').replace('\"', '\\\"')
def escape_single_quotes(text):
"""Escape single quotes, and escaped single quotes, in the specified text."""
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\'\\\'\'')
def escape_double_quotes(text):
"""Escape double quotes, and escaped double quotes, in the specified text."""
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\"', '\\\"')
def getch(count=1):
"""Read ``count`` characters from standard input."""
if os.name == 'nt':
import msvcrt # pylint: disable=F0401
return ''.join([msvcrt.getch() for _ in xrange(count)])
else: # assume Unix
import tty # NOQA
import termios # NOQA
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(count)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def isiterable(obj):
"""Returns ``True`` if the specified object is iterable and
*is not a string type*, ``False`` otherwise."""
return hasattr(obj, '__iter__') and not isinstance(obj, basestring)
def utc_to_local(dt):
"""Convert naive datetime to local time zone, assuming UTC."""
return dt.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal())
def local_to_utc(dt):
"""Convert naive datetime to UTC, assuming local time zone."""
return dt.replace(tzinfo=tz.tzlocal()).astimezone(tz.tzutc())
def as_relative(path):
"""Convert path to relative by stripping away the leading '/' on UNIX or
the equivant on other platforms."""
path = os.path.splitdrive(path)[1]
return path.lstrip(os.sep)
def get_cpu_mask(cores):
"""Return a string with the hex for the cpu mask for the specified core numbers."""
mask = 0
for i in cores:
mask |= 1 << i
return '0x{0:x}'.format(mask)
def load_class(classpath):
"""Loads the specified Python class. ``classpath`` must be a fully-qualified
class name (i.e. namspaced under module/package)."""
modname, clsname = classpath.rsplit('.', 1)
return getattr(__import__(modname), clsname)
def get_pager():
"""Returns the name of the system pager program."""
pager = os.getenv('PAGER')
if pager is None:
pager = find_executable('less')
if pager is None:
pager = find_executable('more')
return pager
def enum_metaclass(enum_param, return_name=False, start=0):
"""
Returns a ``type`` subclass that may be used as a metaclass for
an enum.
Paremeters:
:enum_param: the name of class attribute that defines enum values.
The metaclass will add a class attribute for each value in
``enum_param``. The value of the attribute depends on the type
of ``enum_param`` and on the values of ``return_name``. If
``return_name`` is ``True``, then the value of the new attribute is
the name of that attribute; otherwise, if ``enum_param`` is a ``list``
or a ``tuple``, the value will be the index of that param in
``enum_param``, optionally offset by ``start``, otherwise, it will
be assumed that ``enum_param`` implementa a dict-like inteface and
the value will be ``enum_param[attr_name]``.
:return_name: If ``True``, the enum values will the names of enum attributes. If
``False``, the default, the values will depend on the type of
``enum_param`` (see above).
:start: If ``enum_param`` is a list or a tuple, and ``return_name`` is ``False``,
this specifies an "offset" that will be added to the index of the attribute
within ``enum_param`` to form the value.
"""
class __EnumMeta(type):
def __new__(mcs, clsname, bases, attrs):
cls = type.__new__(mcs, clsname, bases, attrs)
values = getattr(cls, enum_param, [])
if return_name:
for name in values:
setattr(cls, name, name)
else:
if isinstance(values, list) or isinstance(values, tuple):
for i, name in enumerate(values):
setattr(cls, name, i + start)
else: # assume dict-like
for name in values:
setattr(cls, name, values[name])
return cls
return __EnumMeta
def which(name):
"""Platform-independent version of UNIX which utility."""
if os.name == 'nt':
paths = os.getenv('PATH').split(os.pathsep)
exts = os.getenv('PATHEXT').split(os.pathsep)
for path in paths:
testpath = os.path.join(path, name)
if os.path.isfile(testpath):
return testpath
for ext in exts:
testpathext = testpath + ext
if os.path.isfile(testpathext):
return testpathext
return None
else: # assume UNIX-like
try:
result = check_output(['which', name])[0]
return result.strip() # pylint: disable=E1103
except subprocess.CalledProcessError:
return None
_bash_color_regex = re.compile('\x1b\\[[0-9;]+m')
def strip_bash_colors(text):
return _bash_color_regex.sub('', text)
def format_duration(seconds, sep=' ', order=['day', 'hour', 'minute', 'second']): # pylint: disable=dangerous-default-value
"""
Formats the specified number of seconds into human-readable duration.
"""
if isinstance(seconds, timedelta):
td = seconds
else:
td = timedelta(seconds=seconds)
dt = datetime(1, 1, 1) + td
result = []
for item in order:
value = getattr(dt, item, None)
if item is 'day':
value -= 1
if not value:
continue
suffix = '' if value == 1 else 's'
result.append('{} {}{}'.format(value, item, suffix))
return sep.join(result)
def get_article(word):
"""
Returns the appropriate indefinite article for the word (ish).
.. note:: Indefinite article assignment in English is based on
sound rather than spelling, so this will not work correctly
in all case; e.g. this will return ``"a hour"``.
"""
return'an' if word[0] in 'aoeiu' else 'a'
def get_random_string(length):
"""Returns a random ASCII string of the specified length)."""
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in xrange(length))
RAND_MOD_NAME_LEN = 30
BAD_CHARS = string.punctuation + string.whitespace
TRANS_TABLE = string.maketrans(BAD_CHARS, '_' * len(BAD_CHARS))
def to_identifier(text):
"""Converts text to a valid Python identifier by replacing all
whitespace and punctuation."""
result = re.sub('_+', '_', text.translate(TRANS_TABLE))
if result and result[0] in string.digits:
result = '_' + result
return result
def unique(alist):
"""
Returns a list containing only unique elements from the input list (but preserves
order, unlike sets).
"""
result = []
for item in alist:
if item not in result:
result.append(item)
return result
def open_file(filepath):
"""
Open the specified file path with the associated launcher in an OS-agnostic way.
"""
if os.name == 'nt': # Windows
return os.startfile(filepath) # pylint: disable=no-member
elif sys.platform == 'darwin': # Mac OSX
return subprocess.call(['open', filepath])
else: # assume Linux or similar running a freedesktop-compliant GUI
return subprocess.call(['xdg-open', filepath])
def ranges_to_list(ranges_string):
"""Converts a sysfs-style ranges string, e.g. ``"0,2-4"``, into a list ,e.g ``[0,2,3,4]``"""
values = []
for rg in ranges_string.split(','):
if '-' in rg:
first, last = map(int, rg.split('-'))
values.extend(xrange(first, last + 1))
else:
values.append(int(rg))
return values
def list_to_ranges(values):
"""Converts a list, e.g ``[0,2,3,4]``, into a sysfs-style ranges string, e.g. ``"0,2-4"``"""
range_groups = []
for _, g in groupby(enumerate(values), lambda (i, x): i - x):
range_groups.append(map(itemgetter(1), g))
range_strings = []
for group in range_groups:
if len(group) == 1:
range_strings.append(str(group[0]))
else:
range_strings.append('{}-{}'.format(group[0], group[-1]))
return ','.join(range_strings)
def list_to_mask(values, base=0x0):
"""Converts the specified list of integer values into
a bit mask for those values. Optinally, the list can be
applied to an existing mask."""
for v in values:
base |= (1 << v)
return base
def mask_to_list(mask):
"""Converts the specfied integer bitmask into a list of
indexes of bits that are set in the mask."""
size = len(bin(mask)) - 2 # because of "0b"
return [size - i - 1 for i in xrange(size)
if mask & (1 << size - i - 1)]
class Namespace(dict):
"""
A dict-like object that allows treating keys and attributes
interchangeably (this means that keys are restricted to strings
that are valid Python identifiers).
"""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
self[name] = value
def __setitem__(self, name, value):
if to_identifier(name) != name:
message = 'Key must be a valid identifier; got "{}"'
raise ValueError(message.format(name))
dict.__setitem__(self, name, value)

245
wa/utils/serializer.py Normal file
View File

@@ -0,0 +1,245 @@
"""
This module contains wrappers for Python serialization modules for
common formats that make it easier to serialize/deserialize WA
Plain Old Data structures (serilizable WA classes implement
``to_pod()``/``from_pod()`` methods for converting between POD
structures and Python class instances).
The modifications to standard serilization procedures are:
- mappings are deserialized as ``OrderedDict``\ 's are than standard
Python ``dict``\ 's. This allows for cleaner syntax in certain parts
of WA configuration (e.g. values to be written to files can be specified
as a dict, and they will be written in the order specified in the config).
- regular expressions are automatically encoded/decoded. This allows for
configuration values to be transparently specified as strings or regexes
in the POD config.
This module exports the "wrapped" versions of serialization libraries,
and this should be imported and used instead of importing the libraries
directly. i.e. ::
from wa.utils.serializer import yaml
pod = yaml.load(fh)
instead of ::
import yaml
pod = yaml.load(fh)
It's also possible to suse the serializer directly::
from wa.utils import serializer
pod = serializer.load(fh)
This can also be used to ``dump()`` POD structures. By default,
``dump()`` will produce JSON, but ``fmt`` parameter may be used to
specify an alternative format (``yaml`` or ``python``). ``load()`` will
use the file extension to guess the format, but ``fmt`` may also be used
to specify it explicitly.
"""
import os
import re
import sys
import json as _json
from collections import OrderedDict
from datetime import datetime
import yaml as _yaml
import dateutil.parser
from wa.framework.exception import SerializerSyntaxError
from wa.utils.types import regex_type
from wa.utils.misc import isiterable
__all__ = [
'json',
'yaml',
'read_pod',
'dump',
'load',
]
class WAJSONEncoder(_json.JSONEncoder):
def default(self, obj):
if hasattr(obj, 'to_pod'):
return obj.to_pod()
elif isinstance(obj, regex_type):
return 'REGEX:{}:{}'.format(obj.flags, obj.pattern)
elif isinstance(obj, datetime):
return 'DATET:{}'.format(obj.isoformat())
else:
return _json.JSONEncoder.default(self, obj)
class WAJSONDecoder(_json.JSONDecoder):
def decode(self, s):
d = _json.JSONDecoder.decode(self, s)
def try_parse_object(v):
if isinstance(v, basestring) and v.startswith('REGEX:'):
_, flags, pattern = v.split(':', 2)
return re.compile(pattern, int(flags or 0))
elif isinstance(v, basestring) and v.startswith('DATET:'):
_, pattern = v.split(':', 1)
return dateutil.parser.parse(pattern)
else:
return v
def load_objects(d):
pairs = []
for k, v in d.iteritems():
if hasattr(v, 'iteritems'):
pairs.append((k, load_objects(v)))
elif isiterable(v):
pairs.append((k, [try_parse_object(i) for i in v]))
else:
pairs.append((k, try_parse_object(v)))
return OrderedDict(pairs)
return load_objects(d)
class json(object):
@staticmethod
def dump(o, wfh, indent=4, *args, **kwargs):
return _json.dump(o, wfh, cls=WAJSONEncoder, indent=indent, *args, **kwargs)
@staticmethod
def load(fh, *args, **kwargs):
try:
return _json.load(fh, cls=WAJSONDecoder, object_pairs_hook=OrderedDict, *args, **kwargs)
except ValueError as e:
raise SerializerSyntaxError(e.message)
@staticmethod
def loads(s, *args, **kwargs):
try:
return _json.loads(s, cls=WAJSONDecoder, object_pairs_hook=OrderedDict, *args, **kwargs)
except ValueError as e:
raise SerializerSyntaxError(e.message)
_mapping_tag = _yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG
_regex_tag = u'tag:wa:regex'
def _wa_dict_representer(dumper, data):
return dumper.represent_mapping(_mapping_tag, data.iteritems())
def _wa_regex_representer(dumper, data):
text = '{}:{}'.format(data.flags, data.pattern)
return dumper.represent_scalar(_regex_tag, text)
def _wa_dict_constructor(loader, node):
pairs = loader.construct_pairs(node)
seen_keys = set()
for k, _ in pairs:
if k in seen_keys:
raise ValueError('Duplicate entry: {}'.format(k))
seen_keys.add(k)
return OrderedDict(pairs)
def _wa_regex_constructor(loader, node):
value = loader.construct_scalar(node)
flags, pattern = value.split(':', 1)
return re.compile(pattern, int(flags or 0))
_yaml.add_representer(OrderedDict, _wa_dict_representer)
_yaml.add_representer(regex_type, _wa_regex_representer)
_yaml.add_constructor(_mapping_tag, _wa_dict_constructor)
_yaml.add_constructor(_regex_tag, _wa_regex_constructor)
class yaml(object):
@staticmethod
def dump(o, wfh, *args, **kwargs):
return _yaml.dump(o, wfh, *args, **kwargs)
@staticmethod
def load(fh, *args, **kwargs):
try:
return _yaml.load(fh, *args, **kwargs)
except _yaml.YAMLError as e:
lineno = None
if hasattr(e, 'problem_mark'):
lineno = e.problem_mark.line
raise SerializerSyntaxError(e.message, lineno)
loads = load
class python(object):
@staticmethod
def dump(o, wfh, *args, **kwargs):
raise NotImplementedError()
@classmethod
def load(cls, fh, *args, **kwargs):
return cls.loads(fh.read())
@staticmethod
def loads(s, *args, **kwargs):
pod = {}
try:
exec s in pod
except SyntaxError as e:
raise SerializerSyntaxError(e.message, e.lineno)
for k in pod.keys():
if k.startswith('__'):
del pod[k]
return pod
def read_pod(source, fmt=None):
if isinstance(source, basestring):
with open(source) as fh:
return _read_pod(fh, fmt)
elif hasattr(source, 'read') and (hasattr(sourc, 'name') or fmt):
return _read_pod(source, fmt)
else:
message = 'source must be a path or an open file handle; got {}'
raise ValueError(message.format(type(source)))
def dump(o, wfh, fmt='json', *args, **kwargs):
serializer = {
'yaml': yaml,
'json': json,
'python': python,
'py': python,
}.get(fmt)
if serializer is None:
raise ValueError('Unknown serialization format: "{}"'.format(fmt))
serializer.dump(o, wfh, *args, **kwargs)
def load(s, fmt='json', *args, **kwargs):
return read_pod(s, fmt=fmt)
def _read_pod(fh, fmt=None):
if fmt is None:
fmt = os.path.splitext(fh.name)[1].lower().strip('.')
if fmt == 'yaml':
return yaml.load(fh)
elif fmt == 'json':
return json.load(fh)
elif fmt == 'py':
return python.load(fh)
else:
raise ValueError('Unknown format "{}": {}'.format(fmt, path))

497
wa/utils/types.py Normal file
View File

@@ -0,0 +1,497 @@
# Copyright 2014-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Routines for doing various type conversions. These usually embody some higher-level
semantics than are present in standard Python types (e.g. ``boolean`` will convert the
string ``"false"`` to ``False``, where as non-empty strings are usually considered to be
``True``).
A lot of these are intened to stpecify type conversions declaratively in place like
``Parameter``'s ``kind`` argument. These are basically "hacks" around the fact that Python
is not the best language to use for configuration.
"""
import os
import re
import math
import shlex
import numbers
from bisect import insort
from collections import defaultdict
from wa.utils.misc import isiterable, to_identifier
def identifier(text):
"""Converts text to a valid Python identifier by replacing all
whitespace and punctuation."""
return to_identifier(text)
def boolean(value):
"""
Returns bool represented by the value. This is different from
calling the builtin bool() in that it will interpret string representations.
e.g. boolean('0') and boolean('false') will both yield False.
"""
false_strings = ['', '0', 'n', 'no']
if isinstance(value, basestring):
value = value.lower()
if value in false_strings or 'false'.startswith(value):
return False
return bool(value)
def integer(value):
"""Handles conversions for string respresentations of binary, octal and hex."""
if isinstance(value, basestring):
return int(value, 0)
else:
return int(value)
def numeric(value):
"""
Returns the value as number (int if possible, or float otherwise), or
raises ``ValueError`` if the specified ``value`` does not have a straight
forward numeric conversion.
"""
if isinstance(value, int):
return value
try:
fvalue = float(value)
except ValueError:
raise ValueError('Not numeric: {}'.format(value))
if not math.isnan(fvalue) and not math.isinf(fvalue):
ivalue = int(fvalue)
# yeah, yeah, I know. Whatever. This is best-effort.
if ivalue == fvalue:
return ivalue
return fvalue
def list_of_strs(value):
"""
Value must be iterable. All elements will be converted to strings.
"""
if not isiterable(value):
raise ValueError(value)
return map(str, value)
list_of_strings = list_of_strs
def list_of_ints(value):
"""
Value must be iterable. All elements will be converted to ``int``\ s.
"""
if not isiterable(value):
raise ValueError(value)
return map(int, value)
list_of_integers = list_of_ints
def list_of_numbers(value):
"""
Value must be iterable. All elements will be converted to numbers (either ``ints`` or
``float``\ s depending on the elements).
"""
if not isiterable(value):
raise ValueError(value)
return map(numeric, value)
def list_of_bools(value, interpret_strings=True):
"""
Value must be iterable. All elements will be converted to ``bool``\ s.
.. note:: By default, ``boolean()`` conversion function will be used, which means that
strings like ``"0"`` or ``"false"`` will be interpreted as ``False``. If this
is undesirable, set ``interpret_strings`` to ``False``.
"""
if not isiterable(value):
raise ValueError(value)
if interpret_strings:
return map(boolean, value)
else:
return map(bool, value)
def list_of(type_):
"""Generates a "list of" callable for the specified type. The callable
attempts to convert all elements in the passed value to the specifed
``type_``, raising ``ValueError`` on error."""
def __init__(self, values):
list.__init__(self, map(type_, values))
def append(self, value):
list.append(self, type_(value))
def extend(self, other):
list.extend(self, map(type_, other))
def __setitem__(self, idx, value):
list.__setitem__(self, idx, type_(value))
return type('list_of_{}s'.format(type_.__name__),
(list, ), {
"__init__": __init__,
"__setitem__": __setitem__,
"append": append,
"extend": extend,
})
def list_or_string(value):
"""
Converts the value into a list of strings. If the value is not iterable,
a one-element list with stringified value will be returned.
"""
if isinstance(value, basestring):
return [value]
else:
try:
return list(value)
except ValueError:
return [str(value)]
def list_or_caseless_string(value):
"""
Converts the value into a list of ``caseless_string``'s. If the value is not iterable
a one-element list with stringified value will be returned.
"""
if isinstance(value, basestring):
return [caseless_string(value)]
else:
try:
return map(caseless_string, value)
except ValueError:
return [caseless_string(value)]
def list_or(type_):
"""
Generator for "list or" types. These take either a single value or a list values
and return a list of the specfied ``type_`` performing the conversion on the value
(if a single value is specified) or each of the elemented of the specified list.
"""
list_type = list_of(type_)
class list_or_type(list_type):
def __init__(self, value):
# pylint: disable=non-parent-init-called,super-init-not-called
if isiterable(value):
list_type.__init__(self, value)
else:
list_type.__init__(self, [value])
return list_or_type
list_or_integer = list_or(integer)
list_or_number = list_or(numeric)
list_or_bool = list_or(boolean)
regex_type = type(re.compile(''))
def regex(value):
"""
Regular expression. If value is a string, it will be complied with no flags. If you
want to specify flags, value must be precompiled.
"""
if isinstance(value, regex_type):
return value
else:
return re.compile(value)
class caseless_string(str):
"""
Just like built-in Python string except case-insensitive on comparisons. However, the
case is preserved otherwise.
"""
def __eq__(self, other):
if isinstance(other, basestring):
other = other.lower()
return self.lower() == other
def __ne__(self, other):
return not self.__eq__(other)
def __cmp__(self, other):
if isinstance(basestring, other):
other = other.lower()
return cmp(self.lower(), other)
def format(self, *args, **kwargs):
return caseless_string(super(caseless_string, self).format(*args, **kwargs))
class arguments(list):
"""
Represents command line arguments to be passed to a program.
"""
def __init__(self, value=None):
if isiterable(value):
super(arguments, self).__init__(map(str, value))
elif isinstance(value, basestring):
posix = os.name != 'nt'
super(arguments, self).__init__(shlex.split(value, posix=posix))
elif value is None:
super(arguments, self).__init__()
else:
super(arguments, self).__init__([str(value)])
def append(self, value):
return super(arguments, self).append(str(value))
def extend(self, values):
return super(arguments, self).extend(map(str, values))
def __str__(self):
return ' '.join(self)
class prioritylist(object):
def __init__(self):
"""
Returns an OrderedReceivers object that externaly behaves
like a list but it maintains the order of its elements
according to their priority.
"""
self.elements = defaultdict(list)
self.is_ordered = True
self.priorities = []
self.size = 0
self._cached_elements = None
def add(self, new_element, priority=0):
"""
adds a new item in the list.
- ``new_element`` the element to be inserted in the prioritylist
- ``priority`` is the priority of the element which specifies its
order withing the List
"""
self._add_element(new_element, priority)
def add_before(self, new_element, element):
priority, index = self._priority_index(element)
self._add_element(new_element, priority, index)
def add_after(self, new_element, element):
priority, index = self._priority_index(element)
self._add_element(new_element, priority, index + 1)
def index(self, element):
return self._to_list().index(element)
def remove(self, element):
index = self.index(element)
self.__delitem__(index)
def _priority_index(self, element):
for priority, elements in self.elements.iteritems():
if element in elements:
return (priority, elements.index(element))
raise IndexError(element)
def _to_list(self):
if self._cached_elements is None:
self._cached_elements = []
for priority in self.priorities:
self._cached_elements += self.elements[priority]
return self._cached_elements
def _add_element(self, element, priority, index=None):
if index is None:
self.elements[priority].append(element)
else:
self.elements[priority].insert(index, element)
self.size += 1
self._cached_elements = None
if priority not in self.priorities:
insort(self.priorities, priority)
def _delete(self, priority, priority_index):
del self.elements[priority][priority_index]
self.size -= 1
if len(self.elements[priority]) == 0:
self.priorities.remove(priority)
self._cached_elements = None
def __iter__(self):
for priority in reversed(self.priorities): # highest priority first
for element in self.elements[priority]:
yield element
def __getitem__(self, index):
return self._to_list()[index]
def __delitem__(self, index):
if isinstance(index, numbers.Integral):
index = int(index)
if index < 0:
index_range = [len(self) + index]
else:
index_range = [index]
elif isinstance(index, slice):
index_range = range(index.start or 0, index.stop, index.step or 1)
else:
raise ValueError('Invalid index {}'.format(index))
current_global_offset = 0
priority_counts = {priority: count for (priority, count) in
zip(self.priorities, [len(self.elements[p]) for p in self.priorities])}
for priority in self.priorities:
if not index_range:
break
priority_offset = 0
while index_range:
del_index = index_range[0]
if priority_counts[priority] + current_global_offset <= del_index:
current_global_offset += priority_counts[priority]
break
within_priority_index = del_index - \
(current_global_offset + priority_offset)
self._delete(priority, within_priority_index)
priority_offset += 1
index_range.pop(0)
def __len__(self):
return self.size
class TreeNode(object):
@property
def is_root(self):
return self.parent is None
@property
def is_leaf(self):
return not self.children
@property
def parent(self):
return self._parent
@parent.setter
def parent(self, parent):
if self._parent:
self._parent.remove_child(self)
self._parent = parent
if self._parent:
self._parent.add_child(self)
@property
def children(self):
return [c for c in self._children]
def __init__(self):
self._parent = None
self._children = []
def add_child(self, node):
if node == self:
raise ValueError('A node cannot be its own child.')
if node in self._children:
return
for ancestor in self.iter_ancestors():
if ancestor == node:
raise ValueError('Can\'t add {} as a child, as it already an ancestor')
if node.parent and node.parent != self:
raise ValueError('Cannot add {}, as it already has a parent.'.format(node))
self._children.append(node)
node._parent = self
def remove_child(self, node):
if node not in self._children:
message = 'Cannot remove: {} is not a child of {}'
raise ValueError(message.format(node, self))
self._children.remove(node)
node._parent = None
def iter_ancestors(self, after=None, upto=None):
if upto == self:
return
ancestor = self
if after:
while ancestor != after:
ancestor = ancestor.parent
while ancestor and ancestor != upto:
yield ancestor
ancestor = ancestor.parent
def iter_descendants(self):
for child in self.children:
yield child
for grandchild in child.iter_descendants():
yield grandchild
def iter_leaves(self):
for descendant in self.iter_descendants():
if descendant.is_leaf:
yield descendant
def get_common_ancestor(self, other):
if self.has_ancestor(other):
return other
if other.has_ancestor(self):
return self
for my_ancestor in self.iter_ancestors():
for other_ancestor in other.iter_ancestors():
if my_ancestor == other_ancestor:
return my_ancestor
def get_root(self):
node = self
while not node.is_root:
node = node.parent
return node
def has_ancestor(self, other):
for ancestor in self.iter_ancestors():
if other == ancestor:
return True
return False
def has_descendant(self, other):
for descendant in self.iter_descendants():
if other == descendant:
return True
return False