1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-19 20:34:30 +00:00
2017-05-30 12:06:36 +01:00

859 lines
35 KiB
Python

# Copyright 2013-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=E1101
import os
import sys
import re
import time
import tempfile
import shutil
import threading
import json
import xml.dom.minidom
from subprocess import CalledProcessError
from wlauto.core.extension import Parameter
from wlauto.common.resources import Executable
from wlauto.core.resource import NO_ONE
from wlauto.common.linux.device import BaseLinuxDevice, PsEntry
from wlauto.exceptions import DeviceError, WorkerThreadError, TimeoutError, DeviceNotRespondingError
from wlauto.utils.misc import convert_new_lines, ABI_MAP, commonprefix
from wlauto.utils.types import boolean, regex
from wlauto.utils.android import (adb_shell, adb_background_shell, adb_list_devices,
adb_command, AndroidProperties, ANDROID_VERSION_MAP)
SCREEN_STATE_REGEX = re.compile('(?:mPowerState|mScreenOn|Display Power: state)=([0-9]+|true|false|ON|OFF)', re.I)
SCREEN_SIZE_REGEX = re.compile(r'mUnrestrictedScreen=\(\d+,\d+\)\s+(?P<width>\d+)x(?P<height>\d+)')
class AndroidDevice(BaseLinuxDevice): # pylint: disable=W0223
"""
Device running Android OS.
"""
platform = 'android'
parameters = [
Parameter('adb_name',
description='The unique ID of the device as output by "adb devices".'),
Parameter('android_prompt', kind=regex, default=re.compile('^.*(shell|root)@.*:/\S* [#$] ', re.MULTILINE),
description='The format of matching the shell prompt in Android.'),
Parameter('working_directory', default='/sdcard/wa-working', override=True),
Parameter('binaries_directory', default='/data/local/tmp/wa-bin', override=True,
description='Location of binaries on the device.'),
Parameter('package_data_directory', default='/data/data',
description='Location of of data for an installed package (APK).'),
Parameter('external_storage_directory', default='/sdcard',
description='Mount point for external storage.'),
Parameter('connection', default='usb', allowed_values=['usb', 'ethernet'],
description='Specified the nature of adb connection.'),
Parameter('logcat_poll_period', kind=int,
description="""
If specified and is not ``0``, logcat will be polled every
``logcat_poll_period`` seconds, and buffered on the host. This
can be used if a lot of output is expected in logcat and the fixed
logcat buffer on the device is not big enough. The trade off is that
this introduces some minor runtime overhead. Not set by default.
"""),
Parameter('enable_screen_check', kind=boolean, default=False,
description="""
Specified whether the device should make sure that the screen is on
during initialization.
"""),
Parameter('swipe_to_unlock', kind=str, default=None,
allowed_values=[None, "horizontal", "vertical"],
description="""
If set a swipe of the specified direction will be performed.
This should unlock the screen.
"""),
]
default_timeout = 30
delay = 2
long_delay = 3 * delay
ready_timeout = 60
# Overwritten from Device. For documentation, see corresponding method in
# Device.
@property
def is_rooted(self):
if self._is_rooted is None:
try:
result = adb_shell(self.adb_name, 'su', timeout=1)
if 'not found' in result:
self._is_rooted = False
else:
self._is_rooted = True
except TimeoutError:
self._is_rooted = True
except DeviceError:
self._is_rooted = False
return self._is_rooted
@property
def abi(self):
val = self.getprop()['ro.product.cpu.abi'].split('-')[0]
for abi, architectures in ABI_MAP.iteritems():
if val in architectures:
return abi
return val
@property
def supported_abi(self):
props = self.getprop()
result = [props['ro.product.cpu.abi']]
if 'ro.product.cpu.abi2' in props:
result.append(props['ro.product.cpu.abi2'])
if 'ro.product.cpu.abilist' in props:
for abi in props['ro.product.cpu.abilist'].split(','):
if abi not in result:
result.append(abi)
mapped_result = []
for supported_abi in result:
for abi, architectures in ABI_MAP.iteritems():
found = False
if supported_abi in architectures and abi not in mapped_result:
mapped_result.append(abi)
found = True
break
if not found and supported_abi not in mapped_result:
mapped_result.append(supported_abi)
return mapped_result
def __init__(self, **kwargs):
super(AndroidDevice, self).__init__(**kwargs)
self._logcat_poller = None
def reset(self):
self._is_ready = False
self._just_rebooted = True
adb_command(self.adb_name, 'reboot', timeout=self.default_timeout)
def hard_reset(self):
super(AndroidDevice, self).hard_reset()
self._is_ready = False
self._just_rebooted = True
def boot(self, hard=False, **kwargs):
if hard:
self.hard_reset()
else:
self.reset()
def connect(self): # NOQA pylint: disable=R0912
iteration_number = 0
max_iterations = self.ready_timeout / self.delay
available = False
self.logger.debug('Polling for device {}...'.format(self.adb_name))
while iteration_number < max_iterations:
devices = adb_list_devices()
if self.adb_name:
for device in devices:
if device.name == self.adb_name and device.status != 'offline':
available = True
else: # adb_name not set
if len(devices) == 1:
available = True
elif len(devices) > 1:
raise DeviceError('More than one device is connected and adb_name is not set.')
if available:
break
else:
time.sleep(self.delay)
iteration_number += 1
else:
raise DeviceError('Could not boot {} ({}).'.format(self.name, self.adb_name))
while iteration_number < max_iterations:
available = (int('0' + (adb_shell(self.adb_name, 'getprop sys.boot_completed', timeout=self.default_timeout))) == 1)
if available:
break
else:
time.sleep(self.delay)
iteration_number += 1
else:
raise DeviceError('Could not boot {} ({}).'.format(self.name, self.adb_name))
if self._just_rebooted:
self.logger.debug('Waiting for boot to complete...')
# On some devices, adb connection gets reset some time after booting.
# This causes errors during execution. To prevent this, open a shell
# session and wait for it to be killed. Once its killed, give adb
# enough time to restart, and then the device should be ready.
# TODO: This is more of a work-around rather than an actual solution.
# Need to figure out what is going on the "proper" way of handling it.
try:
adb_shell(self.adb_name, '', timeout=20)
time.sleep(5) # give adb time to re-initialize
except TimeoutError:
pass # timed out waiting for the session to be killed -- assume not going to be.
self.logger.debug('Boot completed.')
self._just_rebooted = False
self._is_ready = True
def initialize(self, context):
self.sqlite = self.deploy_sqlite3(context) # pylint: disable=attribute-defined-outside-init
if self.is_rooted:
self.disable_screen_lock()
self.disable_selinux()
if self.enable_screen_check:
self.ensure_screen_is_on()
def disconnect(self):
if self._logcat_poller:
self._logcat_poller.close()
def ping(self):
try:
# May be triggered inside initialize()
adb_shell(self.adb_name, 'ls /', timeout=10)
except (TimeoutError, CalledProcessError):
raise DeviceNotRespondingError(self.adb_name or self.name)
def start(self):
if self.logcat_poll_period:
if self._logcat_poller:
self._logcat_poller.close()
self._logcat_poller = _LogcatPoller(self, self.logcat_poll_period, timeout=self.default_timeout)
self._logcat_poller.start()
def stop(self):
if self._logcat_poller:
self._logcat_poller.stop()
def get_android_version(self):
return ANDROID_VERSION_MAP.get(self.get_sdk_version(), None)
def get_android_id(self):
"""
Get the device's ANDROID_ID. Which is
"A 64-bit number (as a hex string) that is randomly generated when the user
first sets up the device and should remain constant for the lifetime of the
user's device."
.. note:: This will get reset on userdata erasure.
"""
output = self.execute('content query --uri content://settings/secure --projection value --where "name=\'android_id\'"').strip()
return output.split('value=')[-1]
def get_sdk_version(self):
try:
return int(self.getprop('ro.build.version.sdk'))
except (ValueError, TypeError):
return None
def get_installed_package_version(self, package):
"""
Returns the version (versionName) of the specified package if it is installed
on the device, or ``None`` otherwise.
Added in version 2.1.4
"""
output = self.execute('dumpsys package {}'.format(package))
for line in convert_new_lines(output).split('\n'):
if 'versionName' in line:
return line.split('=', 1)[1]
return None
def get_installed_package_abi(self, package):
"""
Returns the primary abi of the specified package if it is installed
on the device, or ``None`` otherwise.
"""
output = self.execute('dumpsys package {}'.format(package))
val = None
for line in convert_new_lines(output).split('\n'):
if 'primaryCpuAbi' in line:
val = line.split('=', 1)[1]
break
if val == 'null':
return None
for abi, architectures in ABI_MAP.iteritems():
if val in architectures:
return abi
return val
def list_packages(self):
"""
List packages installed on the device.
Added in version 2.1.4
"""
output = self.execute('pm list packages')
output = output.replace('package:', '')
return output.split()
def package_is_installed(self, package_name):
"""
Returns ``True`` the if a package with the specified name is installed on
the device, and ``False`` otherwise.
Added in version 2.1.4
"""
return package_name in self.list_packages()
def executable_is_installed(self, executable_name): # pylint: disable=unused-argument,no-self-use
raise AttributeError("""Instead of using is_installed, please use
``get_binary_path`` or ``install_if_needed`` instead. You should
use the path returned by these functions to then invoke the binary
please see: https://pythonhosted.org/wlauto/writing_extensions.html""")
def is_installed(self, name):
if self.package_is_installed(name):
return True
elif "." in name: # assumes android packages have a . in their name and binaries documentation
return False
else:
raise AttributeError("""Instead of using is_installed, please use
``get_binary_path`` or ``install_if_needed`` instead. You should
use the path returned by these functions to then invoke the binary
please see: https://pythonhosted.org/wlauto/writing_extensions.html""")
def listdir(self, path, as_root=False, **kwargs):
contents = self.execute('ls {}'.format(path), as_root=as_root)
return [x.strip() for x in contents.split()]
def push_file(self, source, dest, as_root=False, timeout=default_timeout): # pylint: disable=W0221
"""
Modified in version 2.1.4: added ``as_root`` parameter.
"""
self._check_ready()
try:
if not as_root:
adb_command(self.adb_name, "push '{}' '{}'".format(source, dest), timeout=timeout)
else:
device_tempfile = self.path.join(self.file_transfer_cache, source.lstrip(self.path.sep))
self.execute('mkdir -p {}'.format(self.path.dirname(device_tempfile)))
adb_command(self.adb_name, "push '{}' '{}'".format(source, device_tempfile), timeout=timeout)
self.execute('cp {} {}'.format(device_tempfile, dest), as_root=True)
except CalledProcessError as e:
raise DeviceError(e)
def pull_file(self, source, dest, as_root=False, timeout=default_timeout): # pylint: disable=W0221
"""
Modified in version 2.1.4: added ``as_root`` parameter.
"""
self._check_ready()
try:
if not as_root:
adb_command(self.adb_name, "pull '{}' '{}'".format(source, dest), timeout=timeout)
else:
device_tempfile = self.path.join(self.file_transfer_cache, source.lstrip(self.path.sep))
self.execute('mkdir -p {}'.format(self.path.dirname(device_tempfile)))
self.execute('cp {} {}'.format(source, device_tempfile), as_root=True)
adb_command(self.adb_name, "pull '{}' '{}'".format(device_tempfile, dest), timeout=timeout)
except CalledProcessError as e:
raise DeviceError(e)
def delete_file(self, filepath, as_root=False): # pylint: disable=W0221
self._check_ready()
adb_shell(self.adb_name, "rm -rf '{}'".format(filepath), as_root=as_root, timeout=self.default_timeout)
def file_exists(self, filepath):
self._check_ready()
output = adb_shell(self.adb_name, 'if [ -e \'{}\' ]; then echo 1; else echo 0; fi'.format(filepath),
timeout=self.default_timeout)
return bool(int(output))
def install(self, filepath, timeout=default_timeout, with_name=None, replace=False): # pylint: disable=W0221
ext = os.path.splitext(filepath)[1].lower()
if ext == '.apk':
return self.install_apk(filepath, timeout, replace)
else:
return self.install_executable(filepath, with_name)
def install_apk(self, filepath, timeout=default_timeout, replace=False, allow_downgrade=False, force=False): # pylint: disable=W0221
self._check_ready()
ext = os.path.splitext(filepath)[1].lower()
if ext == '.apk' or force:
flags = []
if replace:
flags.append('-r') # Replace existing APK
if allow_downgrade:
flags.append('-d') # Install the APK even if a newer version is already installed
if self.get_sdk_version() >= 23:
flags.append('-g') # Grant all runtime permissions
self.logger.debug("Replace APK = {}, ADB flags = '{}'".format(replace, ' '.join(flags)))
return adb_command(self.adb_name, "install {} '{}'".format(' '.join(flags), filepath), timeout=timeout)
else:
raise DeviceError('Can\'t install {}: unsupported format.'.format(filepath))
def install_executable(self, filepath, with_name=None):
"""
Installs a binary executable on device. Returns
the path to the installed binary, or ``None`` if the installation has failed.
Optionally, ``with_name`` parameter may be used to specify a different name under
which the executable will be installed.
Added in version 2.1.3.
Updated in version 2.1.5 with ``with_name`` parameter.
"""
self._ensure_binaries_directory_is_writable()
executable_name = with_name or os.path.basename(filepath)
on_device_file = self.path.join(self.working_directory, executable_name)
on_device_executable = self.path.join(self.binaries_directory, executable_name)
self.push_file(filepath, on_device_file)
self.execute('cp {} {}'.format(on_device_file, on_device_executable), as_root=self.is_rooted)
self.execute('chmod 0777 {}'.format(on_device_executable), as_root=self.is_rooted)
return on_device_executable
def uninstall(self, package):
self._check_ready()
adb_command(self.adb_name, "uninstall {}".format(package), timeout=self.default_timeout)
def uninstall_executable(self, executable_name):
"""
Added in version 2.1.3.
"""
on_device_executable = self.get_binary_path(executable_name, search_system_binaries=False)
if not on_device_executable:
raise DeviceError("Could not uninstall {}, binary not found".format(on_device_executable))
self._ensure_binaries_directory_is_writable()
self.delete_file(on_device_executable, as_root=self.is_rooted)
def execute(self, command, timeout=default_timeout, check_exit_code=True, background=False,
as_root=False, busybox=False, **kwargs):
"""
Execute the specified command on the device using adb.
Parameters:
:param command: The command to be executed. It should appear exactly
as if you were typing it into a shell.
:param timeout: Time, in seconds, to wait for adb to return before aborting
and raising an error. Defaults to ``AndroidDevice.default_timeout``.
:param check_exit_code: If ``True``, the return code of the command on the Device will
be check and exception will be raised if it is not 0.
Defaults to ``True``.
:param background: If ``True``, will execute adb in a subprocess, and will return
immediately, not waiting for adb to return. Defaults to ``False``
:param busybox: If ``True``, will use busybox to execute the command. Defaults to ``False``.
Added in version 2.1.3
.. note:: The device must be rooted to be able to use some busybox features.
:param as_root: If ``True``, will attempt to execute command in privileged mode. The device
must be rooted, otherwise an error will be raised. Defaults to ``False``.
Added in version 2.1.3
:returns: If ``background`` parameter is set to ``True``, the subprocess object will
be returned; otherwise, the contents of STDOUT from the device will be returned.
:raises: DeviceError if adb timed out or if the command returned non-zero exit
code on the device, or if attempting to execute a command in privileged mode on an
unrooted device.
"""
self._check_ready()
if as_root and not self.is_rooted:
raise DeviceError('Attempting to execute "{}" as root on unrooted device.'.format(command))
if busybox:
command = ' '.join([self.busybox, command])
if background:
return adb_background_shell(self.adb_name, command, as_root=as_root)
else:
return adb_shell(self.adb_name, command, timeout, check_exit_code, as_root)
def kick_off(self, command, as_root=None):
"""
Like execute but closes adb session and returns immediately, leaving the command running on the
device (this is different from execute(background=True) which keeps adb connection open and returns
a subprocess object).
Added in version 2.1.4
"""
if as_root is None:
as_root = self.is_rooted
try:
command = 'cd {} && {} nohup {}'.format(self.working_directory, self.busybox, command)
output = self.execute(command, timeout=1, as_root=as_root)
except TimeoutError:
pass
else:
raise ValueError('Background command exited before timeout; got "{}"'.format(output))
def get_pids_of(self, process_name):
"""Returns a list of PIDs of all processes with the specified name."""
result = (self.execute('ps | {} grep {}'.format(self.busybox, process_name),
check_exit_code=False) or '').strip()
if result and 'not found' not in result:
return [int(x.split()[1]) for x in result.split('\n')]
else:
return []
def ps(self, **kwargs):
"""
Returns the list of running processes on the device. Keyword arguments may
be used to specify simple filters for columns.
Added in version 2.1.4
"""
lines = iter(convert_new_lines(self.execute('ps')).split('\n'))
lines.next() # header
result = []
for line in lines:
parts = line.split()
if parts:
result.append(PsEntry(*(parts[0:1] + map(int, parts[1:5]) + parts[5:])))
if not kwargs:
return result
else:
filtered_result = []
for entry in result:
if all(getattr(entry, k) == v for k, v in kwargs.iteritems()):
filtered_result.append(entry)
return filtered_result
def get_properties(self, context):
"""Captures and saves the information from /system/build.prop and /proc/version"""
props = super(AndroidDevice, self).get_properties(context)
props.update(self._get_android_properties(context))
return props
def _get_android_properties(self, context):
props = {}
props['android_id'] = self.get_android_id()
self._update_build_properties(props)
dumpsys_host_file = os.path.join(context.host_working_directory, 'window.dumpsys')
with open(dumpsys_host_file, 'w') as wfh:
wfh.write(self.execute('dumpsys window'))
context.add_run_artifact('dumpsys_window', dumpsys_host_file, 'meta')
prop_file = os.path.join(context.host_working_directory, 'android-props.json')
with open(prop_file, 'w') as wfh:
json.dump(props, wfh)
context.add_run_artifact('android_properties', prop_file, 'export')
return props
def getprop(self, prop=None):
"""Returns parsed output of Android getprop command. If a property is
specified, only the value for that property will be returned (with
``None`` returned if the property doesn't exist. Otherwise,
``wlauto.utils.android.AndroidProperties`` will be returned, which is
a dict-like object."""
props = AndroidProperties(self.execute('getprop'))
if prop:
return props[prop]
return props
def deploy_sqlite3(self, context):
host_file = context.resolver.get(Executable(NO_ONE, self.abi, 'sqlite3'))
target_file = self.install_if_needed(host_file)
return target_file
# Android-specific methods. These either rely on specifics of adb or other
# Android-only concepts in their interface and/or implementation.
def forward_port(self, from_port, to_port):
"""
Forward a port on the device to a port on localhost.
:param from_port: Port on the device which to forward.
:param to_port: Port on the localhost to which the device port will be forwarded.
Ports should be specified using adb spec. See the "adb forward" section in "adb help".
"""
adb_command(self.adb_name, 'forward {} {}'.format(from_port, to_port), timeout=self.default_timeout)
def dump_logcat(self, outfile, filter_spec=None):
"""
Dump the contents of logcat, for the specified filter spec to the
specified output file.
See http://developer.android.com/tools/help/logcat.html
:param outfile: Output file on the host into which the contents of the
log will be written.
:param filter_spec: Logcat filter specification.
see http://developer.android.com/tools/debugging/debugging-log.html#filteringOutput
"""
if self._logcat_poller:
return self._logcat_poller.write_log(outfile)
else:
if filter_spec:
command = 'logcat -d -s {} > {}'.format(filter_spec, outfile)
else:
command = 'logcat -d > {}'.format(outfile)
return adb_command(self.adb_name, command, timeout=self.default_timeout)
def clear_logcat(self):
"""Clear (flush) logcat log."""
if self._logcat_poller:
return self._logcat_poller.clear_buffer()
else:
return adb_shell(self.adb_name, 'logcat -c', timeout=self.default_timeout)
def get_screen_size(self):
output = self.execute('dumpsys window')
match = SCREEN_SIZE_REGEX.search(output)
if match:
return (int(match.group('width')),
int(match.group('height')))
else:
return (0, 0)
def perform_unlock_swipe(self):
width, height = self.get_screen_size()
command = 'input swipe {} {} {} {}'
if self.swipe_to_unlock == "horizontal":
swipe_heigh = height * 2 // 3
start = 100
stop = width - start
self.execute(command.format(start, swipe_heigh, stop, swipe_heigh))
if self.swipe_to_unlock == "vertical":
swipe_middle = height / 2
swipe_heigh = height * 2 // 3
self.execute(command.format(swipe_middle, swipe_heigh, swipe_middle, 0))
else: # Should never reach here
raise DeviceError("Invalid swipe direction: {}".format(self.swipe_to_unlock))
def capture_screen(self, filepath):
"""Caputers the current device screen into the specified file in a PNG format."""
on_device_file = self.path.join(self.working_directory, 'screen_capture.png')
self.execute('screencap -p {}'.format(on_device_file))
self.pull_file(on_device_file, filepath)
self.delete_file(on_device_file)
def capture_ui_hierarchy(self, filepath):
"""Captures the current view hierarchy into the specified file in a XML format."""
on_device_file = self.path.join(self.working_directory, 'screen_capture.xml')
self.execute('uiautomator dump {}'.format(on_device_file))
self.pull_file(on_device_file, filepath)
self.delete_file(on_device_file)
parsed_xml = xml.dom.minidom.parse(filepath)
with open(filepath, 'w') as f:
f.write(parsed_xml.toprettyxml())
def is_screen_on(self):
"""Returns ``True`` if the device screen is currently on, ``False`` otherwise."""
output = self.execute('dumpsys power')
match = SCREEN_STATE_REGEX.search(output)
if match:
return boolean(match.group(1))
else:
raise DeviceError('Could not establish screen state.')
def ensure_screen_is_on(self):
if not self.is_screen_on():
self.execute('input keyevent 26')
if self.swipe_to_unlock:
self.perform_unlock_swipe()
def disable_screen_lock(self):
"""
Attempts to disable he screen lock on the device.
.. note:: This does not always work...
Added inversion 2.1.4
"""
lockdb = '/data/system/locksettings.db'
sqlcommand = "update locksettings set value='0' where name='screenlock.disabled';"
f = tempfile.NamedTemporaryFile()
try:
f.write('{} {} "{}"'.format(self.sqlite, lockdb, sqlcommand))
f.flush()
on_device_executable = self.install_executable(f.name,
with_name="disable_screen_lock")
finally:
f.close()
self.execute(on_device_executable, as_root=True)
def disable_selinux(self):
# This may be invoked from intialize() so we can't use execute() or the
# standard API for doing this.
api_level = int(adb_shell(self.adb_name, 'getprop ro.build.version.sdk',
timeout=self.default_timeout).strip())
# SELinux was added in Android 4.3 (API level 18). Trying to
# 'getenforce' in earlier versions will produce an error.
if api_level >= 18:
se_status = self.execute('getenforce', as_root=True).strip()
if se_status == 'Enforcing':
self.execute('setenforce 0', as_root=True)
def get_device_model(self):
try:
return self.getprop(prop='ro.product.device')
except KeyError:
return None
def refresh_device_files(self, file_list):
"""
Depending on the devices android version and root status, determine the
appropriate method of forcing a re-index of the mediaserver cache for a given
list of files.
"""
if self.device.is_rooted or self.device.get_sdk_version() < 24: # MM and below
common_path = commonprefix(file_list, sep=self.device.path.sep)
self.broadcast_media_mounted(common_path, self.device.is_rooted)
else:
for f in file_list:
self.broadcast_media_scan_file(f)
def broadcast_media_scan_file(self, filepath):
"""
Force a re-index of the mediaserver cache for the specified file.
"""
command = 'am broadcast -a android.intent.action.MEDIA_SCANNER_SCAN_FILE -d file://'
self.execute(command + filepath)
def broadcast_media_mounted(self, dirpath, as_root=False):
"""
Force a re-index of the mediaserver cache for the specified directory.
"""
command = 'am broadcast -a android.intent.action.MEDIA_MOUNTED -d file://'
self.execute(command + dirpath, as_root=as_root)
# Internal methods: do not use outside of the class.
def _update_build_properties(self, props):
try:
def strip(somestring):
return somestring.strip().replace('[', '').replace(']', '')
for line in self.execute("getprop").splitlines():
key, value = line.split(':', 1)
key = strip(key)
value = strip(value)
props[key] = value
except ValueError:
self.logger.warning('Could not parse build.prop.')
def _update_versions(self, filepath, props):
with open(filepath) as fh:
text = fh.read()
props['version'] = text
text = re.sub(r'#.*', '', text).strip()
match = re.search(r'^(Linux version .*?)\s*\((gcc version .*)\)$', text)
if match:
props['linux_version'] = match.group(1).strip()
props['gcc_version'] = match.group(2).strip()
else:
self.logger.warning('Could not parse version string.')
def _ensure_binaries_directory_is_writable(self):
matched = []
for entry in self.list_file_systems():
if self.binaries_directory.rstrip('/').startswith(entry.mount_point):
matched.append(entry)
if matched:
entry = sorted(matched, key=lambda x: len(x.mount_point))[-1]
if 'rw' not in entry.options:
self.execute('mount -o rw,remount {} {}'.format(entry.device, entry.mount_point), as_root=True)
else:
raise DeviceError('Could not find mount point for binaries directory {}'.format(self.binaries_directory))
class _LogcatPoller(threading.Thread):
join_timeout = 5
def __init__(self, device, period, timeout=None):
super(_LogcatPoller, self).__init__()
self.adb_device = device.adb_name
self.logger = device.logger
self.period = period
self.timeout = timeout
self.stop_signal = threading.Event()
self.lock = threading.RLock()
self.buffer_file = tempfile.mktemp()
self.last_poll = 0
self.daemon = True
self.exc = None
def run(self):
self.logger.debug('Starting logcat polling.')
try:
while True:
if self.stop_signal.is_set():
break
with self.lock:
current_time = time.time()
if (current_time - self.last_poll) >= self.period:
self._poll()
time.sleep(0.5)
except Exception: # pylint: disable=W0703
self.exc = WorkerThreadError(self.name, sys.exc_info())
self.logger.debug('Logcat polling stopped.')
def stop(self):
self.logger.debug('Stopping logcat polling.')
self.stop_signal.set()
self.join(self.join_timeout)
if self.is_alive():
self.logger.error('Could not join logcat poller thread.')
if self.exc:
raise self.exc # pylint: disable=E0702
def clear_buffer(self):
self.logger.debug('Clearing logcat buffer.')
with self.lock:
adb_shell(self.adb_device, 'logcat -c', timeout=self.timeout)
with open(self.buffer_file, 'w') as _: # NOQA
pass
def write_log(self, outfile):
self.logger.debug('Writing logbuffer to {}.'.format(outfile))
with self.lock:
self._poll()
if os.path.isfile(self.buffer_file):
shutil.copy(self.buffer_file, outfile)
else: # there was no logcat trace at this time
with open(outfile, 'w') as _: # NOQA
pass
def close(self):
self.logger.debug('Closing logcat poller.')
if os.path.isfile(self.buffer_file):
os.remove(self.buffer_file)
def _poll(self):
with self.lock:
self.last_poll = time.time()
adb_command(self.adb_device, 'logcat -d >> {}'.format(self.buffer_file), timeout=self.timeout)
adb_command(self.adb_device, 'logcat -c', timeout=self.timeout)
class BigLittleDevice(AndroidDevice): # pylint: disable=W0223
parameters = [
Parameter('scheduler', default='hmp', override=True),
]