diff --git a/wlauto/utils/__init__.py b/wlauto/utils/__init__.py index cd5d64d6..3e74b613 100644 --- a/wlauto/utils/__init__.py +++ b/wlauto/utils/__init__.py @@ -12,5 +12,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # - - diff --git a/wlauto/utils/android.py b/wlauto/utils/android.py index 5d00c193..79a4cdcd 100644 --- a/wlauto/utils/android.py +++ b/wlauto/utils/android.py @@ -28,37 +28,7 @@ import re from wlauto.exceptions import DeviceError, ConfigError, HostError from wlauto.utils.misc import check_output, escape_single_quotes, escape_double_quotes, get_null - -MAX_TRIES = 5 - -logger = logging.getLogger('android') - -# See: -# http://developer.android.com/guide/topics/manifest/uses-sdk-element.html#ApiLevels -ANDROID_VERSION_MAP = { - 22: 'LOLLIPOP_MR1', - 21: 'LOLLIPOP', - 20: 'KITKAT_WATCH', - 19: 'KITKAT', - 18: 'JELLY_BEAN_MR2', - 17: 'JELLY_BEAN_MR1', - 16: 'JELLY_BEAN', - 15: 'ICE_CREAM_SANDWICH_MR1', - 14: 'ICE_CREAM_SANDWICH', - 13: 'HONEYCOMB_MR2', - 12: 'HONEYCOMB_MR1', - 11: 'HONEYCOMB', - 10: 'GINGERBREAD_MR1', - 9: 'GINGERBREAD', - 8: 'FROYO', - 7: 'ECLAIR_MR1', - 6: 'ECLAIR_0_1', - 5: 'ECLAIR', - 4: 'DONUT', - 3: 'CUPCAKE', - 2: 'BASE_1_1', - 1: 'BASE', -} +from devlib.utils.android import ANDROID_VERSION_MAP, adb_command, ApkInfo # See: # http://developer.android.com/guide/topics/security/normal-permissions.html @@ -97,311 +67,3 @@ ANDROID_NORMAL_PERMISSIONS = [ 'INSTALL_SHORTCUT', 'UNINSTALL_SHORTCUT', ] - -# TODO: these are set to their actual values near the bottom of the file. There -# is some HACKery involved to ensure that ANDROID_HOME does not need to be set -# or adb added to path for root when installing as root, and the whole -# implemenationt is kinda clunky and messier than I'd like. The only file that -# rivals this one in levels of mess is bootstrap.py (for very much the same -# reasons). There must be a neater way to ensure that enviromental dependencies -# are met when they are needed, and are not imposed when they are not. -android_home = None -platform_tools = None -adb = None -aapt = None -fastboot = None - - -class _AndroidEnvironment(object): - - def __init__(self): - self.android_home = None - self.platform_tools = None - self.adb = None - self.aapt = None - self.fastboot = None - - -class AndroidProperties(object): - - def __init__(self, text): - self._properties = {} - self.parse(text) - - def parse(self, text): - self._properties = dict(re.findall(r'\[(.*?)\]:\s+\[(.*?)\]', text)) - - def __iter__(self): - return iter(self._properties) - - def __getattr__(self, name): - return self._properties.get(name) - - __getitem__ = __getattr__ - - -class ApkInfo(object): - - version_regex = re.compile(r"name='(?P[^']+)' versionCode='(?P[^']+)' versionName='(?P[^']+)'") - name_regex = re.compile(r"name='(?P[^']+)'") - - def __init__(self, path=None): - self.path = path - self.package = None - self.activity = None - self.label = None - self.version_name = None - self.version_code = None - self.parse(path) - - def parse(self, apk_path): - _check_env() - command = [aapt, 'dump', 'badging', apk_path] - logger.debug(' '.join(command)) - output = subprocess.check_output(command) - for line in output.split('\n'): - if line.startswith('application-label:'): - self.label = line.split(':')[1].strip().replace('\'', '') - elif line.startswith('package:'): - match = self.version_regex.search(line) - if match: - self.package = match.group('name') - self.version_code = match.group('vcode') - self.version_name = match.group('vname') - elif line.startswith('launchable-activity:'): - match = self.name_regex.search(line) - self.activity = match.group('name') - else: - pass # not interested - - -def fastboot_command(command, timeout=None): - _check_env() - full_command = "fastboot {}".format(command) - logger.debug(full_command) - output, _ = check_output(full_command, timeout, shell=True) - return output - - -def fastboot_flash_partition(partition, path_to_image): - command = 'flash {} {}'.format(partition, path_to_image) - fastboot_command(command) - - -def adb_get_device(): - """ - Returns the serial number of a connected android device. - - If there are more than one device connected to the machine, or it could not - find any device connected, :class:`wlauto.exceptions.ConfigError` is raised. - """ - _check_env() - # TODO this is a hacky way to issue a adb command to all listed devices - - # The output of calling adb devices consists of a heading line then - # a list of the devices sperated by new line - # The last line is a blank new line. in otherwords, if there is a device found - # then the output length is 2 + (1 for each device) - output = adb_command('0', "devices").splitlines() # pylint: disable=E1103 - output_length = len(output) - if output_length == 3: - # output[1] is the 2nd line in the output which has the device name - # Splitting the line by '\t' gives a list of two indexes, which has - # device serial in 0 number and device type in 1. - return output[1].split('\t')[0] - elif output_length > 3: - raise ConfigError('Number of discovered devices is {}, it should be 1'.format(output_length - 2)) - else: - raise ConfigError('No device is connected and available') - - -def adb_connect(device, timeout=None): - _check_env() - command = "adb connect " + device - if ":" in device: - port = device.split(':')[-1] - logger.debug(command) - - output, _ = check_output(command, shell=True, timeout=timeout) - logger.debug(output) - #### due to a rare adb bug sometimes an extra :5555 is appended to the IP address - if output.find('{}:{}'.format(port, port)) != -1: - logger.debug('ADB BUG with extra port') - command = "adb connect " + device.replace(':{}'.format(port), '') - - tries = 0 - output = None - while not poll_for_file(device, "/proc/cpuinfo"): - logger.debug("adb connect failed, retrying now...") - tries += 1 - if tries > MAX_TRIES: - raise DeviceError('Cannot connect to adb server on the device.') - logger.debug(command) - output, _ = check_output(command, shell=True, timeout=timeout) - time.sleep(10) - - if tries and output.find('connected to') == -1: - raise DeviceError('Could not connect to {}'.format(device)) - - -def adb_disconnect(device): - _check_env() - if ":5555" in device: - command = "adb disconnect " + device - logger.debug(command) - retval = subprocess.call(command, stdout=open(os.devnull, 'wb'), shell=True) - if retval: - raise DeviceError('"{}" returned {}'.format(command, retval)) - - -def poll_for_file(device, dfile): - _check_env() - device_string = '-s {}'.format(device) if device else '' - command = "adb " + device_string + " shell \" if [ -f " + dfile + " ] ; then true ; else false ; fi\" " - logger.debug(command) - result = subprocess.call(command, stderr=subprocess.PIPE, shell=True) - return not bool(result) - -am_start_error = re.compile(r"Error: Activity class {[\w|.|/]*} does not exist") - - -def adb_shell(device, command, timeout=None, check_exit_code=False, as_root=False): # NOQA - _check_env() - if as_root: - command = 'echo \'{}\' | su'.format(escape_single_quotes(command)) - device_string = '-s {}'.format(device) if device else '' - full_command = 'adb {} shell "{}"'.format(device_string, escape_double_quotes(command)) - logger.debug(full_command) - if check_exit_code: - actual_command = "adb {} shell '({}); echo; echo $?'".format(device_string, escape_single_quotes(command)) - raw_output, error = check_output(actual_command, timeout, shell=True) - if raw_output: - try: - output, exit_code, _ = raw_output.rsplit('\r\n', 2) - except ValueError: - exit_code, _ = raw_output.rsplit('\r\n', 1) - output = '' - else: # raw_output is empty - exit_code = '969696' # just because - output = '' - - exit_code = exit_code.strip() - if exit_code.isdigit(): - if int(exit_code): - message = 'Got exit code {}\nfrom: {}\nSTDOUT: {}\nSTDERR: {}'.format(exit_code, full_command, - output, error) - raise DeviceError(message) - elif am_start_error.findall(output): - message = 'Could not start activity; got the following:' - message += '\n{}'.format(am_start_error.findall(output)[0]) - raise DeviceError(message) - else: # not all digits - if am_start_error.findall(output): - message = 'Could not start activity; got the following:' - message += '\n{}'.format(am_start_error.findall(output)[0]) - raise DeviceError(message) - else: - raise DeviceError('adb has returned early; did not get an exit code. Was kill-server invoked?') - else: # do not check exit code - output, _ = check_output(full_command, timeout, shell=True) - return output - - -def adb_background_shell(device, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): - """Runs the sepcified command in a subprocess, returning the the Popen object.""" - _check_env() - if as_root: - command = 'echo \'{}\' | su'.format(escape_single_quotes(command)) - device_string = '-s {}'.format(device) if device else '' - full_command = 'adb {} shell "{}"'.format(device_string, escape_double_quotes(command)) - logger.debug(full_command) - return subprocess.Popen(full_command, stdout=stdout, stderr=stderr, shell=True) - - -class AdbDevice(object): - - def __init__(self, name, status): - self.name = name - self.status = status - - def __cmp__(self, other): - if isinstance(other, AdbDevice): - return cmp(self.name, other.name) - else: - return cmp(self.name, other) - - -def adb_list_devices(): - _check_env() - output = adb_command(None, 'devices') - devices = [] - for line in output.splitlines(): - parts = [p.strip() for p in line.split()] - if len(parts) == 2: - devices.append(AdbDevice(*parts)) - return devices - - -def adb_command(device, command, timeout=None): - _check_env() - device_string = '-s {}'.format(device) if device else '' - full_command = "adb {} {}".format(device_string, command) - logger.debug(full_command) - output, _ = check_output(full_command, timeout, shell=True) - return output - - -# Messy environment initialisation stuff... - - -def _initialize_with_android_home(env): - logger.debug('Using ANDROID_HOME from the environment.') - env.android_home = android_home - env.platform_tools = os.path.join(android_home, 'platform-tools') - os.environ['PATH'] += os.pathsep + env.platform_tools - _init_common(env) - return env - - -def _initialize_without_android_home(env): - if os.name == 'nt': - raise HostError('Please set ANDROID_HOME to point to the location of the Android SDK.') - # Assuming Unix in what follows. - if subprocess.call('adb version >{}'.format(get_null()), shell=True): - raise HostError('ANDROID_HOME is not set and adb is not in PATH. Have you installed Android SDK?') - logger.debug('Discovering ANDROID_HOME from adb path.') - env.platform_tools = os.path.dirname(subprocess.check_output('which adb', shell=True)) - env.android_home = os.path.dirname(env.platform_tools) - _init_common(env) - return env - - -def _init_common(env): - logger.debug('ANDROID_HOME: {}'.format(env.android_home)) - build_tools_directory = os.path.join(env.android_home, 'build-tools') - if not os.path.isdir(build_tools_directory): - msg = 'ANDROID_HOME ({}) does not appear to have valid Android SDK install (cannot find build-tools)' - raise HostError(msg.format(env.android_home)) - versions = os.listdir(build_tools_directory) - for version in reversed(sorted(versions)): - aapt_path = os.path.join(build_tools_directory, version, 'aapt') - if os.path.isfile(aapt_path): - logger.debug('Using aapt for version {}'.format(version)) - env.aapt = aapt_path - break - else: - raise HostError('aapt not found. Please make sure at least one Android platform is installed.') - - -def _check_env(): - global android_home, platform_tools, adb, aapt # pylint: disable=W0603 - if not android_home: - android_home = os.getenv('ANDROID_HOME') - if android_home: - _env = _initialize_with_android_home(_AndroidEnvironment()) - else: - _env = _initialize_without_android_home(_AndroidEnvironment()) - android_home = _env.android_home - platform_tools = _env.platform_tools - adb = _env.adb - aapt = _env.aapt diff --git a/wlauto/utils/misc.py b/wlauto/utils/misc.py index 76a54443..ddd34926 100644 --- a/wlauto/utils/misc.py +++ b/wlauto/utils/misc.py @@ -43,31 +43,20 @@ from distutils.spawn import find_executable import yaml from dateutil import tz - -# ABI --> architectures list -ABI_MAP = { - 'armeabi': ['armeabi', 'armv7', 'armv7l', 'armv7el', 'armv7lh'], - 'arm64': ['arm64', 'armv8', 'arm64-v8a', 'aarch64'], -} - - -def preexec_function(): - # Ignore the SIGINT signal by setting the handler to the standard - # signal handler SIG_IGN. - signal.signal(signal.SIGINT, signal.SIG_IGN) - # Change process group in case we have to kill the subprocess and all of - # its children later. - # TODO: this is Unix-specific; would be good to find an OS-agnostic way - # to do this in case we wanna port WA to Windows. - os.setpgrp() - +from devlib.utils.misc import ABI_MAP, check_output, walk_modules, \ + ensure_directory_exists, ensure_file_directory_exists, \ + merge_dicts, merge_lists, normalize, convert_new_lines, \ + escape_quotes, escape_single_quotes, escape_double_quotes, \ + isiterable, getch, as_relative, ranges_to_list, \ + list_to_ranges, list_to_mask, mask_to_list, which, \ + get_cpu_mask, unique check_output_logger = logging.getLogger('check_output') # Defined here rather than in wlauto.exceptions due to module load dependencies class TimeoutError(Exception): - """Raised when a subprocess command times out. This is basically a ``WAError``-derived version + """Raised when a subprocess command times out. This is basically a ``WAError``-derived version{} of ``subprocess.CalledProcessError``, the thinking being that while a timeout could be due to programming error (e.g. not setting long enough timers), it is often due to some failure in the environment, and there fore should be classed as a "user error".""" @@ -81,86 +70,6 @@ class TimeoutError(Exception): return '\n'.join([self.message, 'OUTPUT:', self.output or '']) -def check_output(command, timeout=None, ignore=None, **kwargs): - """This is a version of subprocess.check_output that adds a timeout parameter to kill - the subprocess if it does not return within the specified time.""" - # pylint: disable=too-many-branches - if ignore is None: - ignore = [] - elif isinstance(ignore, int): - ignore = [ignore] - elif not isinstance(ignore, list) and ignore != 'all': - message = 'Invalid value for ignore parameter: "{}"; must be an int or a list' - raise ValueError(message.format(ignore)) - if 'stdout' in kwargs: - raise ValueError('stdout argument not allowed, it will be overridden.') - - def callback(pid): - try: - check_output_logger.debug('{} timed out; sending SIGKILL'.format(pid)) - os.killpg(pid, signal.SIGKILL) - except OSError: - pass # process may have already terminated. - - process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - preexec_fn=preexec_function, **kwargs) - - if timeout: - timer = threading.Timer(timeout, callback, [process.pid, ]) - timer.start() - - try: - output, error = process.communicate() - finally: - if timeout: - timer.cancel() - - retcode = process.poll() - if retcode: - if retcode == -9: # killed, assume due to timeout callback - raise TimeoutError(command, output='\n'.join([output, error])) - elif ignore != 'all' and retcode not in ignore: - raise subprocess.CalledProcessError(retcode, command, output='\n'.join([output, error])) - return output, error - - -def walk_modules(path): - """ - Given package name, return a list of all modules (including submodules, etc) - in that package. - - """ - root_mod = __import__(path, {}, {}, ['']) - mods = [root_mod] - for _, name, ispkg in pkgutil.iter_modules(root_mod.__path__): - submod_path = '.'.join([path, name]) - if ispkg: - mods.extend(walk_modules(submod_path)) - else: - submod = __import__(submod_path, {}, {}, ['']) - mods.append(submod) - return mods - - -def ensure_directory_exists(dirpath): - """A filter for directory paths to ensure they exist.""" - if not os.path.isdir(dirpath): - os.makedirs(dirpath) - return dirpath - - -def ensure_file_directory_exists(filepath): - """ - A filter for file paths to ensure the directory of the - file exists and the file can be created there. The file - itself is *not* going to be created if it doesn't already - exist. - - """ - ensure_directory_exists(os.path.dirname(filepath)) - return filepath - - def diff_tokens(before_token, after_token): """ Creates a diff of two tokens. @@ -249,131 +158,6 @@ def get_traceback(exc=None): return sio.getvalue() -def merge_dicts(*args, **kwargs): - if len(args) < 2: - raise ValueError('Must specify at least two dicts to merge.') - func = partial(_merge_two_dicts, **kwargs) - return reduce(func, args) - - -def _merge_two_dicts(base, other, list_duplicates='all', match_types=False, # pylint: disable=R0912,R0914 - dict_type=dict, should_normalize=True, should_merge_lists=True): - """Merge dicts normalizing their keys.""" - merged = dict_type() - base_keys = base.keys() - other_keys = other.keys() - norm = normalize if should_normalize else lambda x, y: x - - base_only = [] - other_only = [] - both = [] - union = [] - for k in base_keys: - if k in other_keys: - both.append(k) - else: - base_only.append(k) - union.append(k) - for k in other_keys: - if k in base_keys: - union.append(k) - else: - union.append(k) - other_only.append(k) - - for k in union: - if k in base_only: - merged[k] = norm(base[k], dict_type) - elif k in other_only: - merged[k] = norm(other[k], dict_type) - elif k in both: - base_value = base[k] - other_value = other[k] - base_type = type(base_value) - other_type = type(other_value) - if (match_types and (base_type != other_type) and - (base_value is not None) and (other_value is not None)): - raise ValueError('Type mismatch for {} got {} ({}) and {} ({})'.format(k, base_value, base_type, - other_value, other_type)) - if isinstance(base_value, dict): - merged[k] = _merge_two_dicts(base_value, other_value, list_duplicates, match_types, dict_type) - elif isinstance(base_value, list): - if should_merge_lists: - merged[k] = _merge_two_lists(base_value, other_value, list_duplicates, dict_type) - else: - merged[k] = _merge_two_lists([], other_value, list_duplicates, dict_type) - - elif isinstance(base_value, set): - merged[k] = norm(base_value.union(other_value), dict_type) - else: - merged[k] = norm(other_value, dict_type) - else: # Should never get here - raise AssertionError('Unexpected merge key: {}'.format(k)) - - return merged - - -def merge_lists(*args, **kwargs): - if len(args) < 2: - raise ValueError('Must specify at least two lists to merge.') - func = partial(_merge_two_lists, **kwargs) - return reduce(func, args) - - -def _merge_two_lists(base, other, duplicates='all', dict_type=dict): # pylint: disable=R0912 - """ - Merge lists, normalizing their entries. - - parameters: - - :base, other: the two lists to be merged. ``other`` will be merged on - top of base. - :duplicates: Indicates the strategy of handling entries that appear - in both lists. ``all`` will keep occurrences from both - lists; ``first`` will only keep occurrences from - ``base``; ``last`` will only keep occurrences from - ``other``; - - .. note:: duplicate entries that appear in the *same* list - will never be removed. - - """ - if not isiterable(base): - base = [base] - if not isiterable(other): - other = [other] - if duplicates == 'all': - merged_list = [] - for v in normalize(base, dict_type) + normalize(other, dict_type): - if not _check_remove_item(merged_list, v): - merged_list.append(v) - return merged_list - elif duplicates == 'first': - base_norm = normalize(base, dict_type) - merged_list = normalize(base, dict_type) - for v in base_norm: - _check_remove_item(merged_list, v) - for v in normalize(other, dict_type): - if not _check_remove_item(merged_list, v): - if v not in base_norm: - merged_list.append(v) # pylint: disable=no-member - return merged_list - elif duplicates == 'last': - other_norm = normalize(other, dict_type) - merged_list = [] - for v in normalize(base, dict_type): - if not _check_remove_item(merged_list, v): - if v not in other_norm: - merged_list.append(v) - for v in other_norm: - if not _check_remove_item(merged_list, v): - merged_list.append(v) - return merged_list - else: - raise ValueError('Unexpected value for list duplicates argument: {}. '.format(duplicates) + - 'Must be in {"all", "first", "last"}.') - - def _check_remove_item(the_list, item): """Helper function for merge_lists that implements checking wether an items should be removed from the list and doing so if needed. Returns ``True`` if @@ -388,24 +172,6 @@ def _check_remove_item(the_list, item): return True -def normalize(value, dict_type=dict): - """Normalize values. Recursively normalizes dict keys to be lower case, - no surrounding whitespace, underscore-delimited strings.""" - if isinstance(value, dict): - normalized = dict_type() - for k, v in value.iteritems(): - if isinstance(k, basestring): - k = k.strip().lower().replace(' ', '_') - normalized[k] = normalize(v, dict_type) - return normalized - elif isinstance(value, list): - return [normalize(v, dict_type) for v in value] - elif isinstance(value, tuple): - return tuple([normalize(v, dict_type) for v in value]) - else: - return value - - VALUE_REGEX = re.compile(r'(\d+(?:\.\d+)?)\s*(\w*)') UNITS_MAP = { @@ -457,50 +223,6 @@ def capitalize(text): return text[0].upper() + text[1:].lower() -def convert_new_lines(text): - """ Convert new lines to a common format. """ - return text.replace('\r\n', '\n').replace('\r', '\n') - - -def escape_quotes(text): - """Escape quotes, and escaped quotes, in the specified text.""" - return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\\\'').replace('\"', '\\\"') - - -def escape_single_quotes(text): - """Escape single quotes, and escaped single quotes, in the specified text.""" - return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\'', '\'\\\'\'') - - -def escape_double_quotes(text): - """Escape double quotes, and escaped double quotes, in the specified text.""" - return re.sub(r'\\("|\')', r'\\\\\1', text).replace('\"', '\\\"') - - -def getch(count=1): - """Read ``count`` characters from standard input.""" - if os.name == 'nt': - import msvcrt # pylint: disable=F0401 - return ''.join([msvcrt.getch() for _ in xrange(count)]) - else: # assume Unix - import tty # NOQA - import termios # NOQA - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(sys.stdin.fileno()) - ch = sys.stdin.read(count) - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - return ch - - -def isiterable(obj): - """Returns ``True`` if the specified object is iterable and - *is not a string type*, ``False`` otherwise.""" - return hasattr(obj, '__iter__') and not isinstance(obj, basestring) - - def utc_to_local(dt): """Convert naive datetime to local time zone, assuming UTC.""" return dt.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal()) @@ -511,21 +233,6 @@ def local_to_utc(dt): return dt.replace(tzinfo=tz.tzlocal()).astimezone(tz.tzutc()) -def as_relative(path): - """Convert path to relative by stripping away the leading '/' on UNIX or - the equivant on other platforms.""" - path = os.path.splitdrive(path)[1] - return path.lstrip(os.sep) - - -def get_cpu_mask(cores): - """Return a string with the hex for the cpu mask for the specified core numbers.""" - mask = 0 - for i in cores: - mask |= 1 << i - return '0x{0:x}'.format(mask) - - def load_class(classpath): """Loads the specified Python class. ``classpath`` must be a fully-qualified class name (i.e. namspaced under module/package).""" @@ -587,27 +294,6 @@ def enum_metaclass(enum_param, return_name=False, start=0): return __EnumMeta -def which(name): - """Platform-independent version of UNIX which utility.""" - if os.name == 'nt': - paths = os.getenv('PATH').split(os.pathsep) - exts = os.getenv('PATHEXT').split(os.pathsep) - for path in paths: - testpath = os.path.join(path, name) - if os.path.isfile(testpath): - return testpath - for ext in exts: - testpathext = testpath + ext - if os.path.isfile(testpathext): - return testpathext - return None - else: # assume UNIX-like - try: - return check_output(['which', name])[0].strip() - except subprocess.CalledProcessError: - return None - - _bash_color_regex = re.compile('\x1b\[[0-9;]+m') @@ -733,19 +419,6 @@ def load_struct_from_file(filepath): raise ValueError('Unknown format "{}": {}'.format(extn, filepath)) -def unique(alist): - """ - Returns a list containing only unique elements from the input list (but preserves - order, unlike sets). - - """ - result = [] - for item in alist: - if item not in result: - result.append(item) - return result - - def open_file(filepath): """ Open the specified file path with the associated launcher in an OS-agnostic way. @@ -759,49 +432,6 @@ def open_file(filepath): return subprocess.call(['xdg-open', filepath]) -def ranges_to_list(ranges_string): - """Converts a sysfs-style ranges string, e.g. ``"0,2-4"``, into a list ,e.g ``[0,2,3,4]``""" - values = [] - for rg in ranges_string.split(','): - if '-' in rg: - first, last = map(int, rg.split('-')) - values.extend(xrange(first, last + 1)) - else: - values.append(int(rg)) - return values - - -def list_to_ranges(values): - """Converts a list, e.g ``[0,2,3,4]``, into a sysfs-style ranges string, e.g. ``"0,2-4"``""" - range_groups = [] - for _, g in groupby(enumerate(values), lambda (i, x): i - x): - range_groups.append(map(itemgetter(1), g)) - range_strings = [] - for group in range_groups: - if len(group) == 1: - range_strings.append(str(group[0])) - else: - range_strings.append('{}-{}'.format(group[0], group[-1])) - return ','.join(range_strings) - - -def list_to_mask(values, base=0x0): - """Converts the specified list of integer values into - a bit mask for those values. Optinally, the list can be - applied to an existing mask.""" - for v in values: - base |= (1 << v) - return base - - -def mask_to_list(mask): - """Converts the specfied integer bitmask into a list of - indexes of bits that are set in the mask.""" - size = len(bin(mask)) - 2 # because of "0b" - return [size - i - 1 for i in xrange(size) - if mask & (1 << size - i - 1)] - - def sha256(path, chunk=2048): """Calculates SHA256 hexdigest of the file at the specified path.""" h = hashlib.sha256() diff --git a/wlauto/utils/serial_port.py b/wlauto/utils/serial_port.py index 3dc0f0cd..b1a419f2 100644 --- a/wlauto/utils/serial_port.py +++ b/wlauto/utils/serial_port.py @@ -27,11 +27,8 @@ if V(pexpect.__version__) < V('4.0.0'): else: from pexpect import fdpexpect -# Adding pexpect exceptions into this module's namespace -from pexpect import EOF, TIMEOUT # NOQA pylint: disable=W0611 - -from wlauto.exceptions import HostError from wlauto.utils.log import LogWriter +from devlib.utils.serial_port import pulse_dtr, get_connection, open_serial_connection class PexpectLogger(LogWriter): @@ -52,71 +49,3 @@ class PexpectLogger(LogWriter): self.kind = kind logger_name = 'serial_{}'.format(kind) if kind else 'serial' super(PexpectLogger, self).__init__(logger_name) - - -def pulse_dtr(conn, state=True, duration=0.1): - """Set the DTR line of the specified serial connection to the specified state - for the specified duration (note: the initial state of the line is *not* checked.""" - conn.setDTR(state) - time.sleep(duration) - conn.setDTR(not state) - - -def get_connection(timeout, init_dtr=None, *args, **kwargs): - if init_dtr is not None: - kwargs['dsrdtr'] = True - try: - conn = serial.Serial(*args, **kwargs) - except serial.SerialException as e: - raise HostError(e.message) - if init_dtr is not None: - conn.setDTR(init_dtr) - conn.nonblocking() - conn.flushOutput() - target = fdpexpect.fdspawn(conn.fileno(), timeout=timeout) - target.logfile_read = PexpectLogger('read') - target.logfile_send = PexpectLogger('send') - - # Monkey-patching sendline to introduce a short delay after - # chacters are sent to the serial. If two sendline s are issued - # one after another the second one might start putting characters - # into the serial device before the first one has finished, causing - # corruption. The delay prevents that. - tsln = target.sendline - - def sendline(x): - tsln(x) - time.sleep(0.1) - - target.sendline = sendline - return target, conn - - -@contextmanager -def open_serial_connection(timeout, get_conn=False, init_dtr=None, *args, **kwargs): - """ - Opens a serial connection to a device. - - :param timeout: timeout for the fdpexpect spawn object. - :param conn: ``bool`` that specfies whether the underlying connection - object should be yielded as well. - :param init_dtr: specifies the initial DTR state stat should be set. - - All arguments are passed into the __init__ of serial.Serial. See - pyserial documentation for details: - - http://pyserial.sourceforge.net/pyserial_api.html#serial.Serial - - :returns: a pexpect spawn object connected to the device. - See: http://pexpect.sourceforge.net/pexpect.html - - """ - target, conn = get_connection(timeout, init_dtr=init_dtr, *args, **kwargs) - - if get_conn: - yield target, conn - else: - yield target - - target.close() # Closes the file descriptor used by the conn. - del conn diff --git a/wlauto/utils/ssh.py b/wlauto/utils/ssh.py deleted file mode 100644 index 824bac17..00000000 --- a/wlauto/utils/ssh.py +++ /dev/null @@ -1,276 +0,0 @@ -# Copyright 2014-2015 ARM Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -import os -import stat -import logging -import subprocess -import re -import threading -import tempfile -import shutil - -from pexpect import EOF, TIMEOUT, spawn, pxssh - -from wlauto.exceptions import HostError, DeviceError, TimeoutError, ConfigError -from wlauto.utils.misc import which, strip_bash_colors, escape_single_quotes, check_output - - -ssh = None -scp = None -sshpass = None - -logger = logging.getLogger('ssh') - - -def ssh_get_shell(host, username, password=None, keyfile=None, port=None, timeout=10, telnet=False): - _check_env() - if telnet: - if keyfile: - raise ConfigError('keyfile may not be used with a telnet connection.') - conn = TelnetConnection() - else: # ssh - conn = pxssh.pxssh() # pylint: disable=redefined-variable-type - try: - if keyfile: - conn.login(host, username, ssh_key=keyfile, port=port, login_timeout=timeout) - else: - conn.login(host, username, password, port=port, login_timeout=timeout) - except EOF: - raise DeviceError('Could not connect to {}; is the host name correct?'.format(host)) - return conn - - -class TelnetConnection(pxssh.pxssh): - # pylint: disable=arguments-differ - - def login(self, server, username, password='', original_prompt=r'[#$]', login_timeout=10, - auto_prompt_reset=True, sync_multiplier=1, port=23): - cmd = 'telnet -l {} {} {}'.format(username, server, port) - - spawn._spawn(self, cmd) # pylint: disable=protected-access - try: - i = self.expect('(?i)(?:password)', timeout=login_timeout) - if i == 0: - self.sendline(password) - i = self.expect([original_prompt, 'Login incorrect'], timeout=login_timeout) - if i: - raise pxssh.ExceptionPxssh('could not log in: password was incorrect') - except TIMEOUT: - if not password: - # There was no password prompt before TIMEOUT, and we didn't - # have a password to enter. Assume everything is OK. - pass - else: - raise pxssh.ExceptionPxssh('could not log in: did not see a password prompt') - - if not self.sync_original_prompt(sync_multiplier): - self.close() - raise pxssh.ExceptionPxssh('could not synchronize with original prompt') - - if auto_prompt_reset: - if not self.set_unique_prompt(): - self.close() - message = 'could not set shell prompt (recieved: {}, expected: {}).' - raise pxssh.ExceptionPxssh(message.format(self.before, self.PROMPT)) - return True - - -def check_keyfile(keyfile): - """ - keyfile must have the right access premissions in order to be useable. If the specified - file doesn't, create a temporary copy and set the right permissions for that. - - Returns either the ``keyfile`` (if the permissions on it are correct) or the path to a - temporary copy with the right permissions. - """ - desired_mask = stat.S_IWUSR | stat.S_IRUSR - actual_mask = os.stat(keyfile).st_mode & 0xFF - if actual_mask != desired_mask: - tmp_file = os.path.join(tempfile.gettempdir(), os.path.basename(keyfile)) - shutil.copy(keyfile, tmp_file) - os.chmod(tmp_file, desired_mask) - return tmp_file - else: # permissions on keyfile are OK - return keyfile - - -class SshShell(object): - - default_password_prompt = '[sudo] password' - max_cancel_attempts = 5 - - def __init__(self, password_prompt=None, timeout=10, telnet=False): - self.password_prompt = password_prompt if password_prompt is not None else self.default_password_prompt - self.timeout = timeout - self.telnet = telnet - self.conn = None - self.lock = threading.Lock() - self.connection_lost = False - - def login(self, host, username, password=None, keyfile=None, port=None, timeout=None): - # pylint: disable=attribute-defined-outside-init - logger.debug('Logging in {}@{}'.format(username, host)) - self.host = host - self.username = username - self.password = password - self.keyfile = check_keyfile(keyfile) if keyfile else keyfile - self.port = port - timeout = self.timeout if timeout is None else timeout - self.conn = ssh_get_shell(host, username, password, self.keyfile, port, timeout, self.telnet) - - def push_file(self, source, dest, timeout=30): - dest = '{}@{}:{}'.format(self.username, self.host, dest) - return self._scp(source, dest, timeout) - - def pull_file(self, source, dest, timeout=30): - source = '{}@{}:{}'.format(self.username, self.host, source) - return self._scp(source, dest, timeout) - - def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE): - port_string = '-p {}'.format(self.port) if self.port else '' - keyfile_string = '-i {}'.format(self.keyfile) if self.keyfile else '' - command = '{} {} {} {}@{} {}'.format(ssh, keyfile_string, port_string, self.username, self.host, command) - logger.debug(command) - if self.password: - command = _give_password(self.password, command) - return subprocess.Popen(command, stdout=stdout, stderr=stderr, shell=True) - - def reconnect(self): - self.conn = ssh_get_shell(self.host, self.username, self.password, - self.keyfile, self.port, self.timeout, self.telnet) - - def execute(self, command, timeout=None, check_exit_code=True, as_root=False, strip_colors=True): - try: - with self.lock: - if self.connection_lost: - logger.debug('Attempting to reconnect...') - self.reconnect() - self.connection_lost = False - output = self._execute_and_wait_for_prompt(command, timeout, as_root, strip_colors) - if check_exit_code: - exit_code_text = self._execute_and_wait_for_prompt('echo $?', strip_colors=strip_colors, log=False) - try: - exit_code = int(exit_code_text.split()[0]) - if exit_code: - message = 'Got exit code {}\nfrom: {}\nOUTPUT: {}' - raise DeviceError(message.format(exit_code, command, output)) - except (ValueError, IndexError): - logger.warning('Could not get exit code for "{}",\ngot: "{}"'.format(command, exit_code_text)) - return output - except EOF: - logger.error('Dropped connection detected.') - self.connection_lost = True - raise - - def logout(self): - logger.debug('Logging out {}@{}'.format(self.username, self.host)) - self.conn.logout() - - def cancel_running_command(self): - # simulate impatiently hitting ^C until command prompt appears - logger.debug('Sending ^C') - for _ in xrange(self.max_cancel_attempts): - self.conn.sendline(chr(3)) - if self.conn.prompt(0.1): - return True - return False - - def _execute_and_wait_for_prompt(self, command, timeout=None, as_root=False, strip_colors=True, log=True): - self.conn.prompt(0.1) # clear an existing prompt if there is one. - if as_root: - command = "sudo -- sh -c '{}'".format(escape_single_quotes(command)) - if log: - logger.debug(command) - self.conn.sendline(command) - index = self.conn.expect_exact([self.password_prompt, TIMEOUT], timeout=0.5) - if index == 0: - self.conn.sendline(self.password) - timed_out = self._wait_for_prompt(timeout) - output = re.sub(r' \r([^\n])', r'\1', self.conn.before) - output = process_backspaces(output) - output = re.sub(r'.*?{}'.format(re.escape(command)), '', output, 1).strip() - else: - if log: - logger.debug(command) - self.conn.sendline(command) - timed_out = self._wait_for_prompt(timeout) - # the regex removes line breaks potential introduced when writing - # command to shell. - output = re.sub(r' \r([^\n])', r'\1', self.conn.before) - output = process_backspaces(output) - command_index = output.find(command) - output = output[command_index + len(command):].strip() - if timed_out: - self.cancel_running_command() - raise TimeoutError(command, output) - if strip_colors: - output = strip_bash_colors(output) - return output - - def _wait_for_prompt(self, timeout=None): - if timeout: - return not self.conn.prompt(timeout) - else: # cannot timeout; wait forever - while not self.conn.prompt(self.timeout): - pass - return False - - def _scp(self, source, dest, timeout=30): - # NOTE: the version of scp in Ubuntu 12.04 occasionally (and bizarrely) - # fails to connect to a device if port is explicitly specified using -P - # option, even if it is the default port, 22. To minimize this problem, - # only specify -P for scp if the port is *not* the default. - port_string = '-P {}'.format(self.port) if (self.port and self.port != 22) else '' - keyfile_string = '-i {}'.format(self.keyfile) if self.keyfile else '' - command = '{} -r {} {} {} {}'.format(scp, keyfile_string, port_string, source, dest) - pass_string = '' - logger.debug(command) - if self.password: - command = _give_password(self.password, command) - try: - check_output(command, timeout=timeout, shell=True) - except subprocess.CalledProcessError as e: - raise subprocess.CalledProcessError(e.returncode, e.cmd.replace(pass_string, ''), e.output) - except TimeoutError as e: - raise TimeoutError(e.command.replace(pass_string, ''), e.output) - - -def _give_password(password, command): - if not sshpass: - raise HostError('Must have sshpass installed on the host in order to use password-based auth.') - pass_string = "sshpass -p '{}' ".format(password) - return pass_string + command - - -def _check_env(): - global ssh, scp, sshpass # pylint: disable=global-statement - if not ssh: - ssh = which('ssh') - scp = which('scp') - sshpass = which('sshpass') - if not (ssh and scp): - raise HostError('OpenSSH must be installed on the host.') - - -def process_backspaces(text): - chars = [] - for c in text: - if c == chr(8) and chars: # backspace - chars.pop() - else: - chars.append(c) - return ''.join(chars) diff --git a/wlauto/utils/types.py b/wlauto/utils/types.py index 0762941f..06fcbfef 100644 --- a/wlauto/utils/types.py +++ b/wlauto/utils/types.py @@ -33,55 +33,7 @@ from bisect import insort from collections import defaultdict from wlauto.utils.misc import isiterable, to_identifier - - -def identifier(text): - """Converts text to a valid Python identifier by replacing all - whitespace and punctuation.""" - return to_identifier(text) - - -def boolean(value): - """ - Returns bool represented by the value. This is different from - calling the builtin bool() in that it will interpret string representations. - e.g. boolean('0') and boolean('false') will both yield False. - - """ - false_strings = ['', '0', 'n', 'no', 'off'] - if isinstance(value, basestring): - value = value.lower() - if value in false_strings or 'false'.startswith(value): - return False - return bool(value) - - -def integer(value): - """Handles conversions for string respresentations of binary, octal and hex.""" - if isinstance(value, basestring): - return int(value, 0) - else: - return int(value) - - -def numeric(value): - """ - Returns the value as number (int if possible, or float otherwise), or - raises ``ValueError`` if the specified ``value`` does not have a straight - forward numeric conversion. - - """ - if isinstance(value, int): - return value - try: - fvalue = float(value) - except ValueError: - raise ValueError('Not numeric: {}'.format(value)) - if not math.isnan(fvalue) and not math.isinf(fvalue): - ivalue = int(fvalue) - if ivalue == fvalue: # yeah, yeah, I know. Whatever. This is best-effort. - return ivalue - return fvalue +from devlib.utils.types import identifier, boolean, integer, numeric, caseless_string def list_of_strs(value): @@ -251,30 +203,6 @@ def counter(name=None): return value -class caseless_string(str): - """ - Just like built-in Python string except case-insensitive on comparisons. However, the - case is preserved otherwise. - - """ - - def __eq__(self, other): - if isinstance(other, basestring): - other = other.lower() - return self.lower() == other - - def __ne__(self, other): - return not self.__eq__(other) - - def __cmp__(self, other): - if isinstance(basestring, other): - other = other.lower() - return cmp(self.lower(), other) - - def format(self, *args, **kwargs): - return caseless_string(super(caseless_string, self).format(*args, **kwargs)) - - class arguments(list): """ Represents command line arguments to be passed to a program. diff --git a/wlauto/utils/uboot.py b/wlauto/utils/uboot.py deleted file mode 100644 index c59949a0..00000000 --- a/wlauto/utils/uboot.py +++ /dev/null @@ -1,116 +0,0 @@ - -# Copyright 2014-2015 ARM Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import re -import time -import logging - -from wlauto.utils.serial_port import TIMEOUT - - -logger = logging.getLogger('U-Boot') - - -class UbootMenu(object): - """ - Allows navigating Das U-boot menu over serial (it relies on a pexpect connection). - - """ - - option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M) - prompt_regex = re.compile(r'^([^\r\n]+):\s*', re.M) - invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M) - - load_delay = 1 # seconds - default_timeout = 60 # seconds - - def __init__(self, conn, start_prompt='Hit any key to stop autoboot'): - """ - :param conn: A serial connection as returned by ``pexect.spawn()``. - :param prompt: U-Boot menu prompt - :param start_prompt: The starting prompt to wait for during ``open()``. - - """ - self.conn = conn - self.conn.crlf = '\n\r' # TODO: this has *got* to be a bug in U-Boot... - self.start_prompt = start_prompt - self.options = {} - self.prompt = None - - def open(self, timeout=default_timeout): - """ - "Open" the UEFI menu by sending an interrupt on STDIN after seeing the - starting prompt (configurable upon creation of the ``UefiMenu`` object. - - """ - self.conn.expect(self.start_prompt, timeout) - self.conn.sendline('') - time.sleep(self.load_delay) - self.conn.readline() # garbage - self.conn.sendline('') - self.prompt = self.conn.readline().strip() - - def getenv(self): - output = self.enter('printenv') - result = {} - for line in output.split('\n'): - if '=' in line: - variable, value = line.split('=', 1) - result[variable.strip()] = value.strip() - return result - - def setenv(self, variable, value, force=False): - force_str = ' -f' if force else '' - if value is not None: - command = 'setenv{} {} {}'.format(force_str, variable, value) - else: - command = 'setenv{} {}'.format(force_str, variable) - return self.enter(command) - - def boot(self): - self.write_characters('boot') - - def nudge(self): - """Send a little nudge to ensure there is something to read. This is useful when you're not - sure if all out put from the serial has been read already.""" - self.enter('') - - def enter(self, value, delay=load_delay): - """Like ``select()`` except no resolution is performed -- the value is sent directly - to the serial connection.""" - # Empty the buffer first, so that only response to the input about to - # be sent will be processed by subsequent commands. - value = str(value) - self.empty_buffer() - self.write_characters(value) - self.conn.expect(self.prompt, timeout=delay) - return self.conn.before - - def write_characters(self, line): - line = line.rstrip('\r\n') - for c in line: - self.conn.send(c) - time.sleep(0.05) - self.conn.sendline('') - - def empty_buffer(self): - try: - while True: - time.sleep(0.1) - self.conn.read_nonblocking(size=1024, timeout=0.1) - except TIMEOUT: - pass - self.conn.buffer = '' - diff --git a/wlauto/utils/uefi.py b/wlauto/utils/uefi.py deleted file mode 100644 index 00c62364..00000000 --- a/wlauto/utils/uefi.py +++ /dev/null @@ -1,235 +0,0 @@ -# Copyright 2014-2015 ARM Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -import re -import time -import logging -from copy import copy - -from wlauto.exceptions import ConfigError -from wlauto.utils.serial_port import TIMEOUT -from wlauto.utils.types import boolean - - -logger = logging.getLogger('UEFI') - - -class UefiConfig(object): - - def __init__(self, config_dict): - if isinstance(config_dict, UefiConfig): - self.__dict__ = copy(config_dict.__dict__) - else: - try: - self.image_name = config_dict['image_name'] - self.image_args = config_dict['image_args'] - self.fdt_support = boolean(config_dict['fdt_support']) - except KeyError as e: - raise ConfigError('Missing mandatory parameter for UEFI entry config: "{}"'.format(e)) - self.initrd = config_dict.get('initrd') - self.fdt_path = config_dict.get('fdt_path') - if self.fdt_path and not self.fdt_support: - raise ConfigError('FDT path has been specfied for UEFI entry, when FDT support is "False"') - - -class UefiMenu(object): - """ - Allows navigating UEFI menu over serial (it relies on a pexpect connection). - - """ - - option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M) - prompt_regex = re.compile(r'^(\S[^\r\n]+):\s*', re.M) - invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M) - - load_delay = 1 # seconds - default_timeout = 60 # seconds - - def __init__(self, conn, prompt='The default boot selection will start in'): - """ - :param conn: A serial connection as returned by ``pexect.spawn()``. - :param prompt: The starting prompt to wait for during ``open()``. - - """ - self.conn = conn - self.start_prompt = prompt - self.options = {} - self.prompt = None - - def open(self, timeout=default_timeout): - """ - "Open" the UEFI menu by sending an interrupt on STDIN after seeing the - starting prompt (configurable upon creation of the ``UefiMenu`` object. - - """ - self.conn.expect(self.start_prompt, timeout) - self.conn.sendline('') - time.sleep(self.load_delay) - - def create_entry(self, name, config): - """Create a new UEFI entry using the parameters. The menu is assumed - to be at the top level. Upon return, the menu will be at the top level.""" - logger.debug('Creating UEFI entry {}'.format(name)) - self.nudge() - self.select('Boot Manager') - self.select('Add Boot Device Entry') - self.select('NOR Flash') - self.enter(config.image_name) - self.enter('y' if config.fdt_support else 'n') - if config.initrd: - self.enter('y') - self.enter(config.initrd) - else: - self.enter('n') - self.enter(config.image_args) - self.enter(name) - - if config.fdt_path: - self.select('Update FDT path') - self.enter(config.fdt_path) - - self.select('Return to main menu') - - def delete_entry(self, name): - """Delete the specified UEFI entry. The menu is assumed - to be at the top level. Upon return, the menu will be at the top level.""" - logger.debug('Removing UEFI entry {}'.format(name)) - self.nudge() - self.select('Boot Manager') - self.select('Remove Boot Device Entry') - self.select(name) - self.select('Return to main menu') - - def select(self, option, timeout=default_timeout): - """ - Select the specified option from the current menu. - - :param option: Could be an ``int`` index of the option, or a string/regex to - match option text against. - :param timeout: If a non-``int`` option is specified, the option list may need - need to be parsed (if it hasn't been already), this may block - and the timeout is used to cap that , resulting in a ``TIMEOUT`` - exception. - :param delay: A fixed delay to wait after sending the input to the serial connection. - This should be set if input this action is known to result in a - long-running operation. - - """ - if isinstance(option, basestring): - option = self.get_option_index(option, timeout) - self.enter(option) - - def enter(self, value, delay=load_delay): - """Like ``select()`` except no resolution is performed -- the value is sent directly - to the serial connection.""" - # Empty the buffer first, so that only response to the input about to - # be sent will be processed by subsequent commands. - value = str(value) - self._reset() - self.write_characters(value) - # TODO: in case the value is long an complicated, things may get - # screwed up (e.g. there may be line breaks injected), additionally, - # special chars might cause regex to fail. To avoid these issues i'm - # only matching against the first 5 chars of the value. This is - # entirely arbitrary and I'll probably have to find a better way of - # doing this at some point. - self.conn.expect(value[:5], timeout=delay) - time.sleep(self.load_delay) - - def read_menu(self, timeout=default_timeout): - """Parse serial output to get the menu options and the following prompt.""" - attempting_timeout_retry = False - attempting_invalid_retry = False - while True: - index = self.conn.expect([self.option_regex, self.prompt_regex, self.invalid_regex, TIMEOUT], - timeout=timeout) - match = self.conn.match - if index == 0: # matched menu option - self.options[match.group(1)] = match.group(2) - elif index == 1: # matched prompt - self.prompt = match.group(1) - break - elif index == 2: # matched invalid selection - # We've sent an invalid input (which includes an empty line) at - # the top-level menu. To get back the menu options, it seems we - # need to enter what the error reports as the max + 1, so... - if not attempting_invalid_retry: - attempting_invalid_retry = True - val = int(match.group(1)) + 1 - self.empty_buffer() - self.enter(val) - else: # OK, that didn't work; panic! - raise RuntimeError('Could not read menu entries stuck on "{}" prompt'.format(self.prompt)) - elif index == 3: # timed out - if not attempting_timeout_retry: - attempting_timeout_retry = True - self.nudge() - else: # Didn't help. Run away! - raise RuntimeError('Did not see a valid UEFI menu.') - else: - raise AssertionError('Unexpected response waiting for UEFI menu') # should never get here - - def get_option_index(self, text, timeout=default_timeout): - """Returns the menu index of the specified option text (uses regex matching). If the option - is not in the current menu, ``LookupError`` will be raised.""" - if not self.prompt: - self.read_menu(timeout) - for k, v in self.options.iteritems(): - if re.search(text, v): - return k - raise LookupError(text) - - def has_option(self, text, timeout=default_timeout): - """Returns ``True`` if at least one of the options in the current menu has - matched (using regex) the specified text.""" - try: - self.get_option_index(text, timeout) - return True - except LookupError: - return False - - def nudge(self): - """Send a little nudge to ensure there is something to read. This is useful when you're not - sure if all out put from the serial has been read already.""" - self.enter('') - - def empty_buffer(self): - """Read everything from the serial and clear the internal pexpect buffer. This ensures - that the next ``expect()`` call will time out (unless further input will be sent to the - serial beforehand. This is used to create a "known" state and avoid unexpected matches.""" - try: - while True: - time.sleep(0.1) - self.conn.read_nonblocking(size=1024, timeout=0.1) - except TIMEOUT: - pass - self.conn.buffer = '' - - def write_characters(self, line): - """Write a single line out to serial charcter-by-character. This will ensure that nothing will - be dropped for longer lines.""" - line = line.rstrip('\r\n') - for c in line: - self.conn.send(c) - time.sleep(0.05) - self.conn.sendline('') - - def _reset(self): - self.options = {} - self.prompt = None - self.empty_buffer() - -