1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-02-20 20:09:11 +00:00

utils: Removed utils code that is also in devlib

Since WA3 is now very much dependant on devlib there is no need
to duplicate utility code between the two projects.

Quite a few of the modules aren't even needed by WA because they
were for communicating with devices.
This commit is contained in:
Sebastian Goscik 2016-03-18 14:51:33 +00:00
parent 0057a531fd
commit 2d3254470c
8 changed files with 11 additions and 1491 deletions

View File

@ -12,5 +12,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -28,37 +28,7 @@ import re
from wlauto.exceptions import DeviceError, ConfigError, HostError
from wlauto.utils.misc import check_output, escape_single_quotes, escape_double_quotes, get_null
MAX_TRIES = 5
logger = logging.getLogger('android')
# See:
# http://developer.android.com/guide/topics/manifest/uses-sdk-element.html#ApiLevels
ANDROID_VERSION_MAP = {
22: 'LOLLIPOP_MR1',
21: 'LOLLIPOP',
20: 'KITKAT_WATCH',
19: 'KITKAT',
18: 'JELLY_BEAN_MR2',
17: 'JELLY_BEAN_MR1',
16: 'JELLY_BEAN',
15: 'ICE_CREAM_SANDWICH_MR1',
14: 'ICE_CREAM_SANDWICH',
13: 'HONEYCOMB_MR2',
12: 'HONEYCOMB_MR1',
11: 'HONEYCOMB',
10: 'GINGERBREAD_MR1',
9: 'GINGERBREAD',
8: 'FROYO',
7: 'ECLAIR_MR1',
6: 'ECLAIR_0_1',
5: 'ECLAIR',
4: 'DONUT',
3: 'CUPCAKE',
2: 'BASE_1_1',
1: 'BASE',
}
from devlib.utils.android import ANDROID_VERSION_MAP, adb_command, ApkInfo
# See:
# http://developer.android.com/guide/topics/security/normal-permissions.html
@ -97,311 +67,3 @@ ANDROID_NORMAL_PERMISSIONS = [
'INSTALL_SHORTCUT',
'UNINSTALL_SHORTCUT',
]
# TODO: these are set to their actual values near the bottom of the file. There
# is some HACKery involved to ensure that ANDROID_HOME does not need to be set
# or adb added to path for root when installing as root, and the whole
# implemenationt is kinda clunky and messier than I'd like. The only file that
# rivals this one in levels of mess is bootstrap.py (for very much the same
# reasons). There must be a neater way to ensure that enviromental dependencies
# are met when they are needed, and are not imposed when they are not.
android_home = None
platform_tools = None
adb = None
aapt = None
fastboot = None
class _AndroidEnvironment(object):
def __init__(self):
self.android_home = None
self.platform_tools = None
self.adb = None
self.aapt = None
self.fastboot = None
class AndroidProperties(object):
def __init__(self, text):
self._properties = {}
self.parse(text)
def parse(self, text):
self._properties = dict(re.findall(r'\[(.*?)\]:\s+\[(.*?)\]', text))
def __iter__(self):
return iter(self._properties)
def __getattr__(self, name):
return self._properties.get(name)
__getitem__ = __getattr__
class ApkInfo(object):
version_regex = re.compile(r"name='(?P<name>[^']+)' versionCode='(?P<vcode>[^']+)' versionName='(?P<vname>[^']+)'")
name_regex = re.compile(r"name='(?P<name>[^']+)'")
def __init__(self, path=None):
self.path = path
self.package = None
self.activity = None
self.label = None
self.version_name = None
self.version_code = None
self.parse(path)
def parse(self, apk_path):
_check_env()
command = [aapt, 'dump', 'badging', apk_path]
logger.debug(' '.join(command))
output = subprocess.check_output(command)
for line in output.split('\n'):
if line.startswith('application-label:'):
self.label = line.split(':')[1].strip().replace('\'', '')
elif line.startswith('package:'):
match = self.version_regex.search(line)
if match:
self.package = match.group('name')
self.version_code = match.group('vcode')
self.version_name = match.group('vname')
elif line.startswith('launchable-activity:'):
match = self.name_regex.search(line)
self.activity = match.group('name')
else:
pass # not interested
def fastboot_command(command, timeout=None):
_check_env()
full_command = "fastboot {}".format(command)
logger.debug(full_command)
output, _ = check_output(full_command, timeout, shell=True)
return output
def fastboot_flash_partition(partition, path_to_image):
command = 'flash {} {}'.format(partition, path_to_image)
fastboot_command(command)
def adb_get_device():
"""
Returns the serial number of a connected android device.
If there are more than one device connected to the machine, or it could not
find any device connected, :class:`wlauto.exceptions.ConfigError` is raised.
"""
_check_env()
# TODO this is a hacky way to issue a adb command to all listed devices
# The output of calling adb devices consists of a heading line then
# a list of the devices sperated by new line
# The last line is a blank new line. in otherwords, if there is a device found
# then the output length is 2 + (1 for each device)
output = adb_command('0', "devices").splitlines() # pylint: disable=E1103
output_length = len(output)
if output_length == 3:
# output[1] is the 2nd line in the output which has the device name
# Splitting the line by '\t' gives a list of two indexes, which has
# device serial in 0 number and device type in 1.
return output[1].split('\t')[0]
elif output_length > 3:
raise ConfigError('Number of discovered devices is {}, it should be 1'.format(output_length - 2))
else:
raise ConfigError('No device is connected and available')
def adb_connect(device, timeout=None):
_check_env()
command = "adb connect " + device
if ":" in device:
port = device.split(':')[-1]
logger.debug(command)
output, _ = check_output(command, shell=True, timeout=timeout)
logger.debug(output)
#### due to a rare adb bug sometimes an extra :5555 is appended to the IP address
if output.find('{}:{}'.format(port, port)) != -1:
logger.debug('ADB BUG with extra port')
command = "adb connect " + device.replace(':{}'.format(port), '')
tries = 0
output = None
while not poll_for_file(device, "/proc/cpuinfo"):
logger.debug("adb connect failed, retrying now...")
tries += 1
if tries > MAX_TRIES:
raise DeviceError('Cannot connect to adb server on the device.')
logger.debug(command)
output, _ = check_output(command, shell=True, timeout=timeout)
time.sleep(10)
if tries and output.find('connected to') == -1:
raise DeviceError('Could not connect to {}'.format(device))
def adb_disconnect(device):
_check_env()
if ":5555" in device:
command = "adb disconnect " + device
logger.debug(command)
retval = subprocess.call(command, stdout=open(os.devnull, 'wb'), shell=True)
if retval:
raise DeviceError('"{}" returned {}'.format(command, retval))
def poll_for_file(device, dfile):
_check_env()
device_string = '-s {}'.format(device) if device else ''
command = "adb " + device_string + " shell \" if [ -f " + dfile + " ] ; then true ; else false ; fi\" "
logger.debug(command)
result = subprocess.call(command, stderr=subprocess.PIPE, shell=True)
return not bool(result)
am_start_error = re.compile(r"Error: Activity class {[\w|.|/]*} does not exist")
def adb_shell(device, command, timeout=None, check_exit_code=False, as_root=False): # NOQA
_check_env()
if as_root:
command = 'echo \'{}\' | su'.format(escape_single_quotes(command))
device_string = '-s {}'.format(device) if device else ''
full_command = 'adb {} shell "{}"'.format(device_string, escape_double_quotes(command))
logger.debug(full_command)
if check_exit_code:
actual_command = "adb {} shell '({}); echo; echo $?'".format(device_string, escape_single_quotes(command))
raw_output, error = check_output(actual_command, timeout, shell=True)
if raw_output:
try:
output, exit_code, _ = raw_output.rsplit('\r\n', 2)
except ValueError:
exit_code, _ = raw_output.rsplit('\r\n', 1)
output = ''
else: # raw_output is empty
exit_code = '969696' # just because
output = ''
exit_code = exit_code.strip()
if exit_code.isdigit():
if int(exit_code):
message = 'Got exit code {}\nfrom: {}\nSTDOUT: {}\nSTDERR: {}'.format(exit_code, full_command,
output, error)
raise DeviceError(message)
elif am_start_error.findall(output):
message = 'Could not start activity; got the following:'
message += '\n{}'.format(am_start_error.findall(output)[0])
raise DeviceError(message)
else: # not all digits
if am_start_error.findall(output):
message = 'Could not start activity; got the following:'
message += '\n{}'.format(am_start_error.findall(output)[0])
raise DeviceError(message)
else:
raise DeviceError('adb has returned early; did not get an exit code. Was kill-server invoked?')
else: # do not check exit code
output, _ = check_output(full_command, timeout, shell=True)
return output
def adb_background_shell(device, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False):
"""Runs the sepcified command in a subprocess, returning the the Popen object."""
_check_env()
if as_root:
command = 'echo \'{}\' | su'.format(escape_single_quotes(command))
device_string = '-s {}'.format(device) if device else ''
full_command = 'adb {} shell "{}"'.format(device_string, escape_double_quotes(command))
logger.debug(full_command)
return subprocess.Popen(full_command, stdout=stdout, stderr=stderr, shell=True)
class AdbDevice(object):
def __init__(self, name, status):
self.name = name
self.status = status
def __cmp__(self, other):
if isinstance(other, AdbDevice):
return cmp(self.name, other.name)
else:
return cmp(self.name, other)
def adb_list_devices():
_check_env()
output = adb_command(None, 'devices')
devices = []
for line in output.splitlines():
parts = [p.strip() for p in line.split()]
if len(parts) == 2:
devices.append(AdbDevice(*parts))
return devices
def adb_command(device, command, timeout=None):
_check_env()
device_string = '-s {}'.format(device) if device else ''
full_command = "adb {} {}".format(device_string, command)
logger.debug(full_command)
output, _ = check_output(full_command, timeout, shell=True)
return output
# Messy environment initialisation stuff...
def _initialize_with_android_home(env):
logger.debug('Using ANDROID_HOME from the environment.')
env.android_home = android_home
env.platform_tools = os.path.join(android_home, 'platform-tools')
os.environ['PATH'] += os.pathsep + env.platform_tools
_init_common(env)
return env
def _initialize_without_android_home(env):
if os.name == 'nt':
raise HostError('Please set ANDROID_HOME to point to the location of the Android SDK.')
# Assuming Unix in what follows.
if subprocess.call('adb version >{}'.format(get_null()), shell=True):
raise HostError('ANDROID_HOME is not set and adb is not in PATH. Have you installed Android SDK?')
logger.debug('Discovering ANDROID_HOME from adb path.')
env.platform_tools = os.path.dirname(subprocess.check_output('which adb', shell=True))
env.android_home = os.path.dirname(env.platform_tools)
_init_common(env)
return env
def _init_common(env):
logger.debug('ANDROID_HOME: {}'.format(env.android_home))
build_tools_directory = os.path.join(env.android_home, 'build-tools')
if not os.path.isdir(build_tools_directory):
msg = 'ANDROID_HOME ({}) does not appear to have valid Android SDK install (cannot find build-tools)'
raise HostError(msg.format(env.android_home))
versions = os.listdir(build_tools_directory)
for version in reversed(sorted(versions)):
aapt_path = os.path.join(build_tools_directory, version, 'aapt')
if os.path.isfile(aapt_path):
logger.debug('Using aapt for version {}'.format(version))
env.aapt = aapt_path
break
else:
raise HostError('aapt not found. Please make sure at least one Android platform is installed.')
def _check_env():
global android_home, platform_tools, adb, aapt # pylint: disable=W0603
if not android_home:
android_home = os.getenv('ANDROID_HOME')
if android_home:
_env = _initialize_with_android_home(_AndroidEnvironment())
else:
_env = _initialize_without_android_home(_AndroidEnvironment())
android_home = _env.android_home
platform_tools = _env.platform_tools
adb = _env.adb
aapt = _env.aapt

View File

@ -43,31 +43,20 @@ from distutils.spawn import find_executable
import yaml
from dateutil import tz
# ABI --> architectures list
ABI_MAP = {
'armeabi': ['armeabi', 'armv7', 'armv7l', 'armv7el', 'armv7lh'],
'arm64': ['arm64', 'armv8', 'arm64-v8a', 'aarch64'],
}
def preexec_function():
# Ignore the SIGINT signal by setting the handler to the standard
# signal handler SIG_IGN.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Change process group in case we have to kill the subprocess and all of
# its children later.
# TODO: this is Unix-specific; would be good to find an OS-agnostic way
# to do this in case we wanna port WA to Windows.
os.setpgrp()
from devlib.utils.misc import ABI_MAP, check_output, walk_modules, \
ensure_directory_exists, ensure_file_directory_exists, \
merge_dicts, merge_lists, normalize, convert_new_lines, \
escape_quotes, escape_single_quotes, escape_double_quotes, \
isiterable, getch, as_relative, ranges_to_list, \
list_to_ranges, list_to_mask, mask_to_list, which, \
get_cpu_mask, unique
check_output_logger = logging.getLogger('check_output')
# Defined here rather than in wlauto.exceptions due to module load dependencies
class TimeoutError(Exception):
"""Raised when a subprocess command times out. This is basically a ``WAError``-derived version
"""Raised when a subprocess command times out. This is basically a ``WAError``-derived version{}
of ``subprocess.CalledProcessError``, the thinking being that while a timeout could be due to
programming error (e.g. not setting long enough timers), it is often due to some failure in the
environment, and there fore should be classed as a "user error"."""
@ -81,86 +70,6 @@ class TimeoutError(Exception):
return '\n'.join([self.message, 'OUTPUT:', self.output or ''])
def check_output(command, timeout=None, ignore=None, **kwargs):
"""This is a version of subprocess.check_output that adds a timeout parameter to kill
the subprocess if it does not return within the specified time."""
# pylint: disable=too-many-branches
if ignore is None:
ignore = []
elif isinstance(ignore, int):
ignore = [ignore]
elif not isinstance(ignore, list) and ignore != 'all':
message = 'Invalid value for ignore parameter: "{}"; must be an int or a list'
raise ValueError(message.format(ignore))
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
def callback(pid):
try:
check_output_logger.debug('{} timed out; sending SIGKILL'.format(pid))
os.killpg(pid, signal.SIGKILL)
except OSError:
pass # process may have already terminated.
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=preexec_function, **kwargs)
if timeout:
timer = threading.Timer(timeout, callback, [process.pid, ])
timer.start()
try:
output, error = process.communicate()
finally:
if timeout:
timer.cancel()
retcode = process.poll()
if retcode:
if retcode == -9: # killed, assume due to timeout callback
raise TimeoutError(command, output='\n'.join([output, error]))
elif ignore != 'all' and retcode not in ignore:
raise subprocess.CalledProcessError(retcode, command, output='\n'.join([output, error]))
return output, error
def walk_modules(path):
"""
Given package name, return a list of all modules (including submodules, etc)
in that package.
"""
root_mod = __import__(path, {}, {}, [''])
mods = [root_mod]
for _, name, ispkg in pkgutil.iter_modules(root_mod.__path__):
submod_path = '.'.join([path, name])
if ispkg:
mods.extend(walk_modules(submod_path))
else:
submod = __import__(submod_path, {}, {}, [''])
mods.append(submod)
return mods
def ensure_directory_exists(dirpath):
"""A filter for directory paths to ensure they exist."""
if not os.path.isdir(dirpath):
os.makedirs(dirpath)
return dirpath
def ensure_file_directory_exists(filepath):
"""
A filter for file paths to ensure the directory of the
file exists and the file can be created there. The file
itself is *not* going to be created if it doesn't already
exist.
"""
ensure_directory_exists(os.path.dirname(filepath))
return filepath
def diff_tokens(before_token, after_token):
"""
Creates a diff of two tokens.
@ -249,131 +158,6 @@ def get_traceback(exc=None):
return sio.getvalue()
def merge_dicts(*args, **kwargs):
if len(args) < 2:
raise ValueError('Must specify at least two dicts to merge.')
func = partial(_merge_two_dicts, **kwargs)
return reduce(func, args)
def _merge_two_dicts(base, other, list_duplicates='all', match_types=False, # pylint: disable=R0912,R0914
dict_type=dict, should_normalize=True, should_merge_lists=True):
"""Merge dicts normalizing their keys."""
merged = dict_type()
base_keys = base.keys()
other_keys = other.keys()
norm = normalize if should_normalize else lambda x, y: x
base_only = []
other_only = []
both = []
union = []
for k in base_keys:
if k in other_keys:
both.append(k)
else:
base_only.append(k)
union.append(k)
for k in other_keys:
if k in base_keys:
union.append(k)
else:
union.append(k)
other_only.append(k)
for k in union:
if k in base_only:
merged[k] = norm(base[k], dict_type)
elif k in other_only:
merged[k] = norm(other[k], dict_type)
elif k in both:
base_value = base[k]
other_value = other[k]
base_type = type(base_value)
other_type = type(other_value)
if (match_types and (base_type != other_type) and
(base_value is not None) and (other_value is not None)):
raise ValueError('Type mismatch for {} got {} ({}) and {} ({})'.format(k, base_value, base_type,
other_value, other_type))
if isinstance(base_value, dict):
merged[k] = _merge_two_dicts(base_value, other_value, list_duplicates, match_types, dict_type)
elif isinstance(base_value, list):
if should_merge_lists:
merged[k] = _merge_two_lists(base_value, other_value, list_duplicates, dict_type)
else:
merged[k] = _merge_two_lists([], other_value, list_duplicates, dict_type)
elif isinstance(base_value, set):
merged[k] = norm(base_value.union(other_value), dict_type)
else:
merged[k] = norm(other_value, dict_type)
else: # Should never get here
raise AssertionError('Unexpected merge key: {}'.format(k))
return merged
def merge_lists(*args, **kwargs):
if len(args) < 2:
raise ValueError('Must specify at least two lists to merge.')
func = partial(_merge_two_lists, **kwargs)
return reduce(func, args)
def _merge_two_lists(base, other, duplicates='all', dict_type=dict): # pylint: disable=R0912
"""
Merge lists, normalizing their entries.
parameters:
:base, other: the two lists to be merged. ``other`` will be merged on
top of base.
:duplicates: Indicates the strategy of handling entries that appear
in both lists. ``all`` will keep occurrences from both
lists; ``first`` will only keep occurrences from
``base``; ``last`` will only keep occurrences from
``other``;
.. note:: duplicate entries that appear in the *same* list
will never be removed.
"""
if not isiterable(base):
base = [base]
if not isiterable(other):
other = [other]
if duplicates == 'all':
merged_list = []
for v in normalize(base, dict_type) + normalize(other, dict_type):
if not _check_remove_item(merged_list, v):
merged_list.append(v)
return merged_list
elif duplicates == 'first':
base_norm = normalize(base, dict_type)
merged_list = normalize(base, dict_type)
for v in base_norm:
_check_remove_item(merged_list, v)
for v in normalize(other, dict_type):
if not _check_remove_item(merged_list, v):
if v not in base_norm:
merged_list.append(v) # pylint: disable=no-member
return merged_list
elif duplicates == 'last':
other_norm = normalize(other, dict_type)
merged_list = []
for v in normalize(base, dict_type):
if not _check_remove_item(merged_list, v):
if v not in other_norm:
merged_list.append(v)
for v in other_norm:
if not _check_remove_item(merged_list, v):
merged_list.append(v)
return merged_list
else:
raise ValueError('Unexpected value for list duplicates argument: {}. '.format(duplicates) +
'Must be in {"all", "first", "last"}.')
def _check_remove_item(the_list, item):
"""Helper function for merge_lists that implements checking wether an items
should be removed from the list and doing so if needed. Returns ``True`` if
@ -388,24 +172,6 @@ def _check_remove_item(the_list, item):
return True
def normalize(value, dict_type=dict):
"""Normalize values. Recursively normalizes dict keys to be lower case,
no surrounding whitespace, underscore-delimited strings."""
if isinstance(value, dict):
normalized = dict_type()
for k, v in value.iteritems():
if isinstance(k, basestring):
k = k.strip().lower().replace(' ', '_')
normalized[k] = normalize(v, dict_type)
return normalized
elif isinstance(value, list):
return [normalize(v, dict_type) for v in value]
elif isinstance(value, tuple):
return tuple([normalize(v, dict_type) for v in value])
else:
return value
VALUE_REGEX = re.compile(r'(\d+(?:\.\d+)?)\s*(\w*)')
UNITS_MAP = {
@ -457,50 +223,6 @@ def capitalize(text):
return text[0].upper() + text[1:].lower()
def convert_new_lines(text):
""" Convert new lines to a common format. """
return text.replace('\r\n', '\n').replace('\r', '\n')
def escape_quotes(text):
"""Escape quotes, and escaped quotes, in the specified text."""
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\\\'').replace('\"', '\\\"')
def escape_single_quotes(text):
"""Escape single quotes, and escaped single quotes, in the specified text."""
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\'\\\'\'')
def escape_double_quotes(text):
"""Escape double quotes, and escaped double quotes, in the specified text."""
return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\"', '\\\"')
def getch(count=1):
"""Read ``count`` characters from standard input."""
if os.name == 'nt':
import msvcrt # pylint: disable=F0401
return ''.join([msvcrt.getch() for _ in xrange(count)])
else: # assume Unix
import tty # NOQA
import termios # NOQA
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(count)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def isiterable(obj):
"""Returns ``True`` if the specified object is iterable and
*is not a string type*, ``False`` otherwise."""
return hasattr(obj, '__iter__') and not isinstance(obj, basestring)
def utc_to_local(dt):
"""Convert naive datetime to local time zone, assuming UTC."""
return dt.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal())
@ -511,21 +233,6 @@ def local_to_utc(dt):
return dt.replace(tzinfo=tz.tzlocal()).astimezone(tz.tzutc())
def as_relative(path):
"""Convert path to relative by stripping away the leading '/' on UNIX or
the equivant on other platforms."""
path = os.path.splitdrive(path)[1]
return path.lstrip(os.sep)
def get_cpu_mask(cores):
"""Return a string with the hex for the cpu mask for the specified core numbers."""
mask = 0
for i in cores:
mask |= 1 << i
return '0x{0:x}'.format(mask)
def load_class(classpath):
"""Loads the specified Python class. ``classpath`` must be a fully-qualified
class name (i.e. namspaced under module/package)."""
@ -587,27 +294,6 @@ def enum_metaclass(enum_param, return_name=False, start=0):
return __EnumMeta
def which(name):
"""Platform-independent version of UNIX which utility."""
if os.name == 'nt':
paths = os.getenv('PATH').split(os.pathsep)
exts = os.getenv('PATHEXT').split(os.pathsep)
for path in paths:
testpath = os.path.join(path, name)
if os.path.isfile(testpath):
return testpath
for ext in exts:
testpathext = testpath + ext
if os.path.isfile(testpathext):
return testpathext
return None
else: # assume UNIX-like
try:
return check_output(['which', name])[0].strip()
except subprocess.CalledProcessError:
return None
_bash_color_regex = re.compile('\x1b\[[0-9;]+m')
@ -733,19 +419,6 @@ def load_struct_from_file(filepath):
raise ValueError('Unknown format "{}": {}'.format(extn, filepath))
def unique(alist):
"""
Returns a list containing only unique elements from the input list (but preserves
order, unlike sets).
"""
result = []
for item in alist:
if item not in result:
result.append(item)
return result
def open_file(filepath):
"""
Open the specified file path with the associated launcher in an OS-agnostic way.
@ -759,49 +432,6 @@ def open_file(filepath):
return subprocess.call(['xdg-open', filepath])
def ranges_to_list(ranges_string):
"""Converts a sysfs-style ranges string, e.g. ``"0,2-4"``, into a list ,e.g ``[0,2,3,4]``"""
values = []
for rg in ranges_string.split(','):
if '-' in rg:
first, last = map(int, rg.split('-'))
values.extend(xrange(first, last + 1))
else:
values.append(int(rg))
return values
def list_to_ranges(values):
"""Converts a list, e.g ``[0,2,3,4]``, into a sysfs-style ranges string, e.g. ``"0,2-4"``"""
range_groups = []
for _, g in groupby(enumerate(values), lambda (i, x): i - x):
range_groups.append(map(itemgetter(1), g))
range_strings = []
for group in range_groups:
if len(group) == 1:
range_strings.append(str(group[0]))
else:
range_strings.append('{}-{}'.format(group[0], group[-1]))
return ','.join(range_strings)
def list_to_mask(values, base=0x0):
"""Converts the specified list of integer values into
a bit mask for those values. Optinally, the list can be
applied to an existing mask."""
for v in values:
base |= (1 << v)
return base
def mask_to_list(mask):
"""Converts the specfied integer bitmask into a list of
indexes of bits that are set in the mask."""
size = len(bin(mask)) - 2 # because of "0b"
return [size - i - 1 for i in xrange(size)
if mask & (1 << size - i - 1)]
def sha256(path, chunk=2048):
"""Calculates SHA256 hexdigest of the file at the specified path."""
h = hashlib.sha256()

View File

@ -27,11 +27,8 @@ if V(pexpect.__version__) < V('4.0.0'):
else:
from pexpect import fdpexpect
# Adding pexpect exceptions into this module's namespace
from pexpect import EOF, TIMEOUT # NOQA pylint: disable=W0611
from wlauto.exceptions import HostError
from wlauto.utils.log import LogWriter
from devlib.utils.serial_port import pulse_dtr, get_connection, open_serial_connection
class PexpectLogger(LogWriter):
@ -52,71 +49,3 @@ class PexpectLogger(LogWriter):
self.kind = kind
logger_name = 'serial_{}'.format(kind) if kind else 'serial'
super(PexpectLogger, self).__init__(logger_name)
def pulse_dtr(conn, state=True, duration=0.1):
"""Set the DTR line of the specified serial connection to the specified state
for the specified duration (note: the initial state of the line is *not* checked."""
conn.setDTR(state)
time.sleep(duration)
conn.setDTR(not state)
def get_connection(timeout, init_dtr=None, *args, **kwargs):
if init_dtr is not None:
kwargs['dsrdtr'] = True
try:
conn = serial.Serial(*args, **kwargs)
except serial.SerialException as e:
raise HostError(e.message)
if init_dtr is not None:
conn.setDTR(init_dtr)
conn.nonblocking()
conn.flushOutput()
target = fdpexpect.fdspawn(conn.fileno(), timeout=timeout)
target.logfile_read = PexpectLogger('read')
target.logfile_send = PexpectLogger('send')
# Monkey-patching sendline to introduce a short delay after
# chacters are sent to the serial. If two sendline s are issued
# one after another the second one might start putting characters
# into the serial device before the first one has finished, causing
# corruption. The delay prevents that.
tsln = target.sendline
def sendline(x):
tsln(x)
time.sleep(0.1)
target.sendline = sendline
return target, conn
@contextmanager
def open_serial_connection(timeout, get_conn=False, init_dtr=None, *args, **kwargs):
"""
Opens a serial connection to a device.
:param timeout: timeout for the fdpexpect spawn object.
:param conn: ``bool`` that specfies whether the underlying connection
object should be yielded as well.
:param init_dtr: specifies the initial DTR state stat should be set.
All arguments are passed into the __init__ of serial.Serial. See
pyserial documentation for details:
http://pyserial.sourceforge.net/pyserial_api.html#serial.Serial
:returns: a pexpect spawn object connected to the device.
See: http://pexpect.sourceforge.net/pexpect.html
"""
target, conn = get_connection(timeout, init_dtr=init_dtr, *args, **kwargs)
if get_conn:
yield target, conn
else:
yield target
target.close() # Closes the file descriptor used by the conn.
del conn

View File

@ -1,276 +0,0 @@
# Copyright 2014-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import stat
import logging
import subprocess
import re
import threading
import tempfile
import shutil
from pexpect import EOF, TIMEOUT, spawn, pxssh
from wlauto.exceptions import HostError, DeviceError, TimeoutError, ConfigError
from wlauto.utils.misc import which, strip_bash_colors, escape_single_quotes, check_output
ssh = None
scp = None
sshpass = None
logger = logging.getLogger('ssh')
def ssh_get_shell(host, username, password=None, keyfile=None, port=None, timeout=10, telnet=False):
_check_env()
if telnet:
if keyfile:
raise ConfigError('keyfile may not be used with a telnet connection.')
conn = TelnetConnection()
else: # ssh
conn = pxssh.pxssh() # pylint: disable=redefined-variable-type
try:
if keyfile:
conn.login(host, username, ssh_key=keyfile, port=port, login_timeout=timeout)
else:
conn.login(host, username, password, port=port, login_timeout=timeout)
except EOF:
raise DeviceError('Could not connect to {}; is the host name correct?'.format(host))
return conn
class TelnetConnection(pxssh.pxssh):
# pylint: disable=arguments-differ
def login(self, server, username, password='', original_prompt=r'[#$]', login_timeout=10,
auto_prompt_reset=True, sync_multiplier=1, port=23):
cmd = 'telnet -l {} {} {}'.format(username, server, port)
spawn._spawn(self, cmd) # pylint: disable=protected-access
try:
i = self.expect('(?i)(?:password)', timeout=login_timeout)
if i == 0:
self.sendline(password)
i = self.expect([original_prompt, 'Login incorrect'], timeout=login_timeout)
if i:
raise pxssh.ExceptionPxssh('could not log in: password was incorrect')
except TIMEOUT:
if not password:
# There was no password prompt before TIMEOUT, and we didn't
# have a password to enter. Assume everything is OK.
pass
else:
raise pxssh.ExceptionPxssh('could not log in: did not see a password prompt')
if not self.sync_original_prompt(sync_multiplier):
self.close()
raise pxssh.ExceptionPxssh('could not synchronize with original prompt')
if auto_prompt_reset:
if not self.set_unique_prompt():
self.close()
message = 'could not set shell prompt (recieved: {}, expected: {}).'
raise pxssh.ExceptionPxssh(message.format(self.before, self.PROMPT))
return True
def check_keyfile(keyfile):
"""
keyfile must have the right access premissions in order to be useable. If the specified
file doesn't, create a temporary copy and set the right permissions for that.
Returns either the ``keyfile`` (if the permissions on it are correct) or the path to a
temporary copy with the right permissions.
"""
desired_mask = stat.S_IWUSR | stat.S_IRUSR
actual_mask = os.stat(keyfile).st_mode & 0xFF
if actual_mask != desired_mask:
tmp_file = os.path.join(tempfile.gettempdir(), os.path.basename(keyfile))
shutil.copy(keyfile, tmp_file)
os.chmod(tmp_file, desired_mask)
return tmp_file
else: # permissions on keyfile are OK
return keyfile
class SshShell(object):
default_password_prompt = '[sudo] password'
max_cancel_attempts = 5
def __init__(self, password_prompt=None, timeout=10, telnet=False):
self.password_prompt = password_prompt if password_prompt is not None else self.default_password_prompt
self.timeout = timeout
self.telnet = telnet
self.conn = None
self.lock = threading.Lock()
self.connection_lost = False
def login(self, host, username, password=None, keyfile=None, port=None, timeout=None):
# pylint: disable=attribute-defined-outside-init
logger.debug('Logging in {}@{}'.format(username, host))
self.host = host
self.username = username
self.password = password
self.keyfile = check_keyfile(keyfile) if keyfile else keyfile
self.port = port
timeout = self.timeout if timeout is None else timeout
self.conn = ssh_get_shell(host, username, password, self.keyfile, port, timeout, self.telnet)
def push_file(self, source, dest, timeout=30):
dest = '{}@{}:{}'.format(self.username, self.host, dest)
return self._scp(source, dest, timeout)
def pull_file(self, source, dest, timeout=30):
source = '{}@{}:{}'.format(self.username, self.host, source)
return self._scp(source, dest, timeout)
def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
port_string = '-p {}'.format(self.port) if self.port else ''
keyfile_string = '-i {}'.format(self.keyfile) if self.keyfile else ''
command = '{} {} {} {}@{} {}'.format(ssh, keyfile_string, port_string, self.username, self.host, command)
logger.debug(command)
if self.password:
command = _give_password(self.password, command)
return subprocess.Popen(command, stdout=stdout, stderr=stderr, shell=True)
def reconnect(self):
self.conn = ssh_get_shell(self.host, self.username, self.password,
self.keyfile, self.port, self.timeout, self.telnet)
def execute(self, command, timeout=None, check_exit_code=True, as_root=False, strip_colors=True):
try:
with self.lock:
if self.connection_lost:
logger.debug('Attempting to reconnect...')
self.reconnect()
self.connection_lost = False
output = self._execute_and_wait_for_prompt(command, timeout, as_root, strip_colors)
if check_exit_code:
exit_code_text = self._execute_and_wait_for_prompt('echo $?', strip_colors=strip_colors, log=False)
try:
exit_code = int(exit_code_text.split()[0])
if exit_code:
message = 'Got exit code {}\nfrom: {}\nOUTPUT: {}'
raise DeviceError(message.format(exit_code, command, output))
except (ValueError, IndexError):
logger.warning('Could not get exit code for "{}",\ngot: "{}"'.format(command, exit_code_text))
return output
except EOF:
logger.error('Dropped connection detected.')
self.connection_lost = True
raise
def logout(self):
logger.debug('Logging out {}@{}'.format(self.username, self.host))
self.conn.logout()
def cancel_running_command(self):
# simulate impatiently hitting ^C until command prompt appears
logger.debug('Sending ^C')
for _ in xrange(self.max_cancel_attempts):
self.conn.sendline(chr(3))
if self.conn.prompt(0.1):
return True
return False
def _execute_and_wait_for_prompt(self, command, timeout=None, as_root=False, strip_colors=True, log=True):
self.conn.prompt(0.1) # clear an existing prompt if there is one.
if as_root:
command = "sudo -- sh -c '{}'".format(escape_single_quotes(command))
if log:
logger.debug(command)
self.conn.sendline(command)
index = self.conn.expect_exact([self.password_prompt, TIMEOUT], timeout=0.5)
if index == 0:
self.conn.sendline(self.password)
timed_out = self._wait_for_prompt(timeout)
output = re.sub(r' \r([^\n])', r'\1', self.conn.before)
output = process_backspaces(output)
output = re.sub(r'.*?{}'.format(re.escape(command)), '', output, 1).strip()
else:
if log:
logger.debug(command)
self.conn.sendline(command)
timed_out = self._wait_for_prompt(timeout)
# the regex removes line breaks potential introduced when writing
# command to shell.
output = re.sub(r' \r([^\n])', r'\1', self.conn.before)
output = process_backspaces(output)
command_index = output.find(command)
output = output[command_index + len(command):].strip()
if timed_out:
self.cancel_running_command()
raise TimeoutError(command, output)
if strip_colors:
output = strip_bash_colors(output)
return output
def _wait_for_prompt(self, timeout=None):
if timeout:
return not self.conn.prompt(timeout)
else: # cannot timeout; wait forever
while not self.conn.prompt(self.timeout):
pass
return False
def _scp(self, source, dest, timeout=30):
# NOTE: the version of scp in Ubuntu 12.04 occasionally (and bizarrely)
# fails to connect to a device if port is explicitly specified using -P
# option, even if it is the default port, 22. To minimize this problem,
# only specify -P for scp if the port is *not* the default.
port_string = '-P {}'.format(self.port) if (self.port and self.port != 22) else ''
keyfile_string = '-i {}'.format(self.keyfile) if self.keyfile else ''
command = '{} -r {} {} {} {}'.format(scp, keyfile_string, port_string, source, dest)
pass_string = ''
logger.debug(command)
if self.password:
command = _give_password(self.password, command)
try:
check_output(command, timeout=timeout, shell=True)
except subprocess.CalledProcessError as e:
raise subprocess.CalledProcessError(e.returncode, e.cmd.replace(pass_string, ''), e.output)
except TimeoutError as e:
raise TimeoutError(e.command.replace(pass_string, ''), e.output)
def _give_password(password, command):
if not sshpass:
raise HostError('Must have sshpass installed on the host in order to use password-based auth.')
pass_string = "sshpass -p '{}' ".format(password)
return pass_string + command
def _check_env():
global ssh, scp, sshpass # pylint: disable=global-statement
if not ssh:
ssh = which('ssh')
scp = which('scp')
sshpass = which('sshpass')
if not (ssh and scp):
raise HostError('OpenSSH must be installed on the host.')
def process_backspaces(text):
chars = []
for c in text:
if c == chr(8) and chars: # backspace
chars.pop()
else:
chars.append(c)
return ''.join(chars)

View File

@ -33,55 +33,7 @@ from bisect import insort
from collections import defaultdict
from wlauto.utils.misc import isiterable, to_identifier
def identifier(text):
"""Converts text to a valid Python identifier by replacing all
whitespace and punctuation."""
return to_identifier(text)
def boolean(value):
"""
Returns bool represented by the value. This is different from
calling the builtin bool() in that it will interpret string representations.
e.g. boolean('0') and boolean('false') will both yield False.
"""
false_strings = ['', '0', 'n', 'no', 'off']
if isinstance(value, basestring):
value = value.lower()
if value in false_strings or 'false'.startswith(value):
return False
return bool(value)
def integer(value):
"""Handles conversions for string respresentations of binary, octal and hex."""
if isinstance(value, basestring):
return int(value, 0)
else:
return int(value)
def numeric(value):
"""
Returns the value as number (int if possible, or float otherwise), or
raises ``ValueError`` if the specified ``value`` does not have a straight
forward numeric conversion.
"""
if isinstance(value, int):
return value
try:
fvalue = float(value)
except ValueError:
raise ValueError('Not numeric: {}'.format(value))
if not math.isnan(fvalue) and not math.isinf(fvalue):
ivalue = int(fvalue)
if ivalue == fvalue: # yeah, yeah, I know. Whatever. This is best-effort.
return ivalue
return fvalue
from devlib.utils.types import identifier, boolean, integer, numeric, caseless_string
def list_of_strs(value):
@ -251,30 +203,6 @@ def counter(name=None):
return value
class caseless_string(str):
"""
Just like built-in Python string except case-insensitive on comparisons. However, the
case is preserved otherwise.
"""
def __eq__(self, other):
if isinstance(other, basestring):
other = other.lower()
return self.lower() == other
def __ne__(self, other):
return not self.__eq__(other)
def __cmp__(self, other):
if isinstance(basestring, other):
other = other.lower()
return cmp(self.lower(), other)
def format(self, *args, **kwargs):
return caseless_string(super(caseless_string, self).format(*args, **kwargs))
class arguments(list):
"""
Represents command line arguments to be passed to a program.

View File

@ -1,116 +0,0 @@
# Copyright 2014-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import time
import logging
from wlauto.utils.serial_port import TIMEOUT
logger = logging.getLogger('U-Boot')
class UbootMenu(object):
"""
Allows navigating Das U-boot menu over serial (it relies on a pexpect connection).
"""
option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M)
prompt_regex = re.compile(r'^([^\r\n]+):\s*', re.M)
invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M)
load_delay = 1 # seconds
default_timeout = 60 # seconds
def __init__(self, conn, start_prompt='Hit any key to stop autoboot'):
"""
:param conn: A serial connection as returned by ``pexect.spawn()``.
:param prompt: U-Boot menu prompt
:param start_prompt: The starting prompt to wait for during ``open()``.
"""
self.conn = conn
self.conn.crlf = '\n\r' # TODO: this has *got* to be a bug in U-Boot...
self.start_prompt = start_prompt
self.options = {}
self.prompt = None
def open(self, timeout=default_timeout):
"""
"Open" the UEFI menu by sending an interrupt on STDIN after seeing the
starting prompt (configurable upon creation of the ``UefiMenu`` object.
"""
self.conn.expect(self.start_prompt, timeout)
self.conn.sendline('')
time.sleep(self.load_delay)
self.conn.readline() # garbage
self.conn.sendline('')
self.prompt = self.conn.readline().strip()
def getenv(self):
output = self.enter('printenv')
result = {}
for line in output.split('\n'):
if '=' in line:
variable, value = line.split('=', 1)
result[variable.strip()] = value.strip()
return result
def setenv(self, variable, value, force=False):
force_str = ' -f' if force else ''
if value is not None:
command = 'setenv{} {} {}'.format(force_str, variable, value)
else:
command = 'setenv{} {}'.format(force_str, variable)
return self.enter(command)
def boot(self):
self.write_characters('boot')
def nudge(self):
"""Send a little nudge to ensure there is something to read. This is useful when you're not
sure if all out put from the serial has been read already."""
self.enter('')
def enter(self, value, delay=load_delay):
"""Like ``select()`` except no resolution is performed -- the value is sent directly
to the serial connection."""
# Empty the buffer first, so that only response to the input about to
# be sent will be processed by subsequent commands.
value = str(value)
self.empty_buffer()
self.write_characters(value)
self.conn.expect(self.prompt, timeout=delay)
return self.conn.before
def write_characters(self, line):
line = line.rstrip('\r\n')
for c in line:
self.conn.send(c)
time.sleep(0.05)
self.conn.sendline('')
def empty_buffer(self):
try:
while True:
time.sleep(0.1)
self.conn.read_nonblocking(size=1024, timeout=0.1)
except TIMEOUT:
pass
self.conn.buffer = ''

View File

@ -1,235 +0,0 @@
# Copyright 2014-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import time
import logging
from copy import copy
from wlauto.exceptions import ConfigError
from wlauto.utils.serial_port import TIMEOUT
from wlauto.utils.types import boolean
logger = logging.getLogger('UEFI')
class UefiConfig(object):
def __init__(self, config_dict):
if isinstance(config_dict, UefiConfig):
self.__dict__ = copy(config_dict.__dict__)
else:
try:
self.image_name = config_dict['image_name']
self.image_args = config_dict['image_args']
self.fdt_support = boolean(config_dict['fdt_support'])
except KeyError as e:
raise ConfigError('Missing mandatory parameter for UEFI entry config: "{}"'.format(e))
self.initrd = config_dict.get('initrd')
self.fdt_path = config_dict.get('fdt_path')
if self.fdt_path and not self.fdt_support:
raise ConfigError('FDT path has been specfied for UEFI entry, when FDT support is "False"')
class UefiMenu(object):
"""
Allows navigating UEFI menu over serial (it relies on a pexpect connection).
"""
option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M)
prompt_regex = re.compile(r'^(\S[^\r\n]+):\s*', re.M)
invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M)
load_delay = 1 # seconds
default_timeout = 60 # seconds
def __init__(self, conn, prompt='The default boot selection will start in'):
"""
:param conn: A serial connection as returned by ``pexect.spawn()``.
:param prompt: The starting prompt to wait for during ``open()``.
"""
self.conn = conn
self.start_prompt = prompt
self.options = {}
self.prompt = None
def open(self, timeout=default_timeout):
"""
"Open" the UEFI menu by sending an interrupt on STDIN after seeing the
starting prompt (configurable upon creation of the ``UefiMenu`` object.
"""
self.conn.expect(self.start_prompt, timeout)
self.conn.sendline('')
time.sleep(self.load_delay)
def create_entry(self, name, config):
"""Create a new UEFI entry using the parameters. The menu is assumed
to be at the top level. Upon return, the menu will be at the top level."""
logger.debug('Creating UEFI entry {}'.format(name))
self.nudge()
self.select('Boot Manager')
self.select('Add Boot Device Entry')
self.select('NOR Flash')
self.enter(config.image_name)
self.enter('y' if config.fdt_support else 'n')
if config.initrd:
self.enter('y')
self.enter(config.initrd)
else:
self.enter('n')
self.enter(config.image_args)
self.enter(name)
if config.fdt_path:
self.select('Update FDT path')
self.enter(config.fdt_path)
self.select('Return to main menu')
def delete_entry(self, name):
"""Delete the specified UEFI entry. The menu is assumed
to be at the top level. Upon return, the menu will be at the top level."""
logger.debug('Removing UEFI entry {}'.format(name))
self.nudge()
self.select('Boot Manager')
self.select('Remove Boot Device Entry')
self.select(name)
self.select('Return to main menu')
def select(self, option, timeout=default_timeout):
"""
Select the specified option from the current menu.
:param option: Could be an ``int`` index of the option, or a string/regex to
match option text against.
:param timeout: If a non-``int`` option is specified, the option list may need
need to be parsed (if it hasn't been already), this may block
and the timeout is used to cap that , resulting in a ``TIMEOUT``
exception.
:param delay: A fixed delay to wait after sending the input to the serial connection.
This should be set if input this action is known to result in a
long-running operation.
"""
if isinstance(option, basestring):
option = self.get_option_index(option, timeout)
self.enter(option)
def enter(self, value, delay=load_delay):
"""Like ``select()`` except no resolution is performed -- the value is sent directly
to the serial connection."""
# Empty the buffer first, so that only response to the input about to
# be sent will be processed by subsequent commands.
value = str(value)
self._reset()
self.write_characters(value)
# TODO: in case the value is long an complicated, things may get
# screwed up (e.g. there may be line breaks injected), additionally,
# special chars might cause regex to fail. To avoid these issues i'm
# only matching against the first 5 chars of the value. This is
# entirely arbitrary and I'll probably have to find a better way of
# doing this at some point.
self.conn.expect(value[:5], timeout=delay)
time.sleep(self.load_delay)
def read_menu(self, timeout=default_timeout):
"""Parse serial output to get the menu options and the following prompt."""
attempting_timeout_retry = False
attempting_invalid_retry = False
while True:
index = self.conn.expect([self.option_regex, self.prompt_regex, self.invalid_regex, TIMEOUT],
timeout=timeout)
match = self.conn.match
if index == 0: # matched menu option
self.options[match.group(1)] = match.group(2)
elif index == 1: # matched prompt
self.prompt = match.group(1)
break
elif index == 2: # matched invalid selection
# We've sent an invalid input (which includes an empty line) at
# the top-level menu. To get back the menu options, it seems we
# need to enter what the error reports as the max + 1, so...
if not attempting_invalid_retry:
attempting_invalid_retry = True
val = int(match.group(1)) + 1
self.empty_buffer()
self.enter(val)
else: # OK, that didn't work; panic!
raise RuntimeError('Could not read menu entries stuck on "{}" prompt'.format(self.prompt))
elif index == 3: # timed out
if not attempting_timeout_retry:
attempting_timeout_retry = True
self.nudge()
else: # Didn't help. Run away!
raise RuntimeError('Did not see a valid UEFI menu.')
else:
raise AssertionError('Unexpected response waiting for UEFI menu') # should never get here
def get_option_index(self, text, timeout=default_timeout):
"""Returns the menu index of the specified option text (uses regex matching). If the option
is not in the current menu, ``LookupError`` will be raised."""
if not self.prompt:
self.read_menu(timeout)
for k, v in self.options.iteritems():
if re.search(text, v):
return k
raise LookupError(text)
def has_option(self, text, timeout=default_timeout):
"""Returns ``True`` if at least one of the options in the current menu has
matched (using regex) the specified text."""
try:
self.get_option_index(text, timeout)
return True
except LookupError:
return False
def nudge(self):
"""Send a little nudge to ensure there is something to read. This is useful when you're not
sure if all out put from the serial has been read already."""
self.enter('')
def empty_buffer(self):
"""Read everything from the serial and clear the internal pexpect buffer. This ensures
that the next ``expect()`` call will time out (unless further input will be sent to the
serial beforehand. This is used to create a "known" state and avoid unexpected matches."""
try:
while True:
time.sleep(0.1)
self.conn.read_nonblocking(size=1024, timeout=0.1)
except TIMEOUT:
pass
self.conn.buffer = ''
def write_characters(self, line):
"""Write a single line out to serial charcter-by-character. This will ensure that nothing will
be dropped for longer lines."""
line = line.rstrip('\r\n')
for c in line:
self.conn.send(c)
time.sleep(0.05)
self.conn.sendline('')
def _reset(self):
self.options = {}
self.prompt = None
self.empty_buffer()