1
0
mirror of https://github.com/ARM-software/devlib.git synced 2025-04-04 00:40:04 +01:00
devlib/devlib/target.py
Javi Merino d0c71fbc86 target: warn if a module is not supported for the target
When you create a target using devlib, you may not set
load_default_modules because you want to load a specific list of
modules.  In that case, raise a warning if one of those modules failed
to load, as it is something you are not expecting.

In the general case, the standard set of modules are tried to be loaded.
We know that they are not supported for all platforms so just emit a
debug message.
2016-01-18 17:31:00 +00:00

993 lines
36 KiB
Python

import os
import re
import time
import logging
import posixpath
import subprocess
import tempfile
import threading
from collections import namedtuple
from devlib.host import LocalConnection, PACKAGE_BIN_DIRECTORY
from devlib.module import get_module
from devlib.platform import Platform
from devlib.exception import TargetError, TargetNotRespondingError, TimeoutError
from devlib.utils.ssh import SshConnection
from devlib.utils.android import AdbConnection, AndroidProperties, adb_command, adb_disconnect
from devlib.utils.misc import memoized, isiterable, convert_new_lines, merge_lists
from devlib.utils.misc import ABI_MAP, get_cpu_name, ranges_to_list, escape_double_quotes
from devlib.utils.types import integer, boolean, bitmask, identifier, caseless_string
FSTAB_ENTRY_REGEX = re.compile(r'(\S+) on (\S+) type (\S+) \((\S+)\)')
ANDROID_SCREEN_STATE_REGEX = re.compile('(?:mPowerState|mScreenOn)=([0-9]+|true|false)',
re.IGNORECASE)
ANDROID_SCREEN_RESOLUTION_REGEX = re.compile(r'mUnrestrictedScreen=\(\d+,\d+\)'
r'\s+(?P<width>\d+)x(?P<height>\d+)')
DEFAULT_SHELL_PROMPT = re.compile(r'^.*(shell|root)@.*:/\S* [#$] ',
re.MULTILINE)
class Target(object):
conn_cls = None
path = None
os = None
default_modules = [
'hotplug',
'cpufreq',
'cpuidle',
'cgroups',
'hwmon',
]
@property
def core_names(self):
return self.platform.core_names
@property
def core_clusters(self):
return self.platform.core_clusters
@property
def big_core(self):
return self.platform.big_core
@property
def little_core(self):
return self.platform.little_core
@property
def is_connected(self):
return self.conn is not None
@property
@memoized
def connected_as_root(self):
result = self.execute('id')
return 'uid=0(' in result
@property
@memoized
def is_rooted(self):
if self.connected_as_root:
return True
try:
self.execute('ls /', timeout=2, as_root=True)
return True
except (TargetError, TimeoutError):
return False
@property
@memoized
def kernel_version(self):
return KernelVersion(self.execute('uname -r -v').strip())
@property
def os_version(self): # pylint: disable=no-self-use
return {}
@property
def abi(self): # pylint: disable=no-self-use
return None
@property
@memoized
def cpuinfo(self):
return Cpuinfo(self.execute('cat /proc/cpuinfo'))
@property
@memoized
def number_of_cpus(self):
num_cpus = 0
corere = re.compile(r'^\s*cpu\d+\s*$')
output = self.execute('ls /sys/devices/system/cpu')
for entry in output.split():
if corere.match(entry):
num_cpus += 1
return num_cpus
@property
@memoized
def config(self):
try:
return KernelConfig(self.execute('zcat /proc/config.gz'))
except TargetError:
for path in ['/boot/config', '/boot/config-$(uname -r)']:
try:
return KernelConfig(self.execute('cat {}'.format(path)))
except TargetError:
pass
return KernelConfig('')
@property
@memoized
def user(self):
return self.getenv('USER')
@property
def conn(self):
if self._connections:
tid = id(threading.current_thread())
if tid not in self._connections:
self._connections[tid] = self.get_connection()
return self._connections[tid]
else:
return None
def __init__(self,
connection_settings=None,
platform=None,
working_directory=None,
executables_directory=None,
connect=True,
modules=None,
load_default_modules=True,
shell_prompt=DEFAULT_SHELL_PROMPT,
):
self.connection_settings = connection_settings or {}
self.platform = platform or Platform()
self.working_directory = working_directory
self.executables_directory = executables_directory
self.modules = modules or []
self.load_default_modules = load_default_modules
self.shell_prompt = shell_prompt
self.logger = logging.getLogger(self.__class__.__name__)
self._installed_binaries = {}
self._installed_modules = {}
self._cache = {}
self._connections = {}
self.busybox = None
if load_default_modules:
module_lists = [self.default_modules]
else:
module_lists = []
module_lists += [self.modules, self.platform.modules]
self.modules = merge_lists(*module_lists, duplicates='first')
self._update_modules('early')
if connect:
self.connect()
# connection and initialization
def connect(self, timeout=None):
self.platform.init_target_connection(self)
tid = id(threading.current_thread())
self._connections[tid] = self.get_connection(timeout=timeout)
self._resolve_paths()
self.busybox = self.get_installed('busybox')
self.platform.update_from_target(self)
self._update_modules('connected')
if self.platform.big_core and self.load_default_modules:
self._install_module(get_module('bl'))
def disconnect(self):
for conn in self._connections.itervalues():
conn.close()
self._connections = {}
def get_connection(self, timeout=None):
if self.conn_cls is None:
raise NotImplementedError('conn_cls must be set by the subclass of Target')
return self.conn_cls(timeout=timeout, **self.connection_settings) # pylint: disable=not-callable
def setup(self, executables=None):
self.execute('mkdir -p {}'.format(self.working_directory))
self.execute('mkdir -p {}'.format(self.executables_directory))
self.busybox = self.install(os.path.join(PACKAGE_BIN_DIRECTORY, self.abi, 'busybox'))
for host_exe in (executables or []): # pylint: disable=superfluous-parens
self.install(host_exe)
def reboot(self, hard=False, connect=True, timeout=180):
if hard:
if not self.has('hard_reset'):
raise TargetError('Hard reset not supported for this target.')
self.hard_reset() # pylint: disable=no-member
else:
if not self.is_connected:
message = 'Cannot reboot target becuase it is disconnected. ' +\
'Either connect() first, or specify hard=True ' +\
'(in which case, a hard_reset module must be installed)'
raise TargetError(message)
self.reset()
if self.has('boot'):
self.boot() # pylint: disable=no-member
if connect:
self.connect(timeout=timeout)
# file transfer
def push(self, source, dest, timeout=None):
return self.conn.push(source, dest, timeout=timeout)
def pull(self, source, dest, timeout=None):
return self.conn.pull(source, dest, timeout=timeout)
# execution
def execute(self, command, timeout=None, check_exit_code=True, as_root=False):
return self.conn.execute(command, timeout, check_exit_code, as_root)
def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False):
return self.conn.background(command, stdout, stderr, as_root)
def invoke(self, binary, args=None, in_directory=None, on_cpus=None,
as_root=False, timeout=30):
"""
Executes the specified binary under the specified conditions.
:binary: binary to execute. Must be present and executable on the device.
:args: arguments to be passed to the binary. The can be either a list or
a string.
:in_directory: execute the binary in the specified directory. This must
be an absolute path.
:on_cpus: taskset the binary to these CPUs. This may be a single ``int`` (in which
case, it will be interpreted as the mask), a list of ``ints``, in which
case this will be interpreted as the list of cpus, or string, which
will be interpreted as a comma-separated list of cpu ranges, e.g.
``"0,4-7"``.
:as_root: Specify whether the command should be run as root
:timeout: If the invocation does not terminate within this number of seconds,
a ``TimeoutError`` exception will be raised. Set to ``None`` if the
invocation should not timeout.
"""
command = binary
if args:
if isiterable(args):
args = ' '.join(args)
command = '{} {}'.format(command, args)
if on_cpus:
on_cpus = bitmask(on_cpus)
command = '{} taskset 0x{:x} {}'.format(self.busybox, on_cpus, command)
if in_directory:
command = 'cd {} && {}'.format(in_directory, command)
return self.execute(command, as_root=as_root, timeout=timeout)
def kick_off(self, command, as_root=False):
raise NotImplementedError()
# sysfs interaction
def read_value(self, path, kind=None):
output = self.execute('cat \'{}\''.format(path), as_root=self.is_rooted).strip() # pylint: disable=E1103
if kind:
return kind(output)
else:
return output
def read_int(self, path):
return self.read_value(path, kind=integer)
def read_bool(self, path):
return self.read_value(path, kind=boolean)
def write_value(self, path, value, verify=True):
value = str(value)
self.execute('echo {} > \'{}\''.format(value, path), check_exit_code=False, as_root=True)
if verify:
output = self.read_value(path)
if not output == value:
message = 'Could not set the value of {} to "{}" (read "{}")'.format(path, value, output)
raise TargetError(message)
def reset(self):
try:
self.execute('reboot', as_root=self.is_rooted, timeout=2)
except (TargetError, TimeoutError, subprocess.CalledProcessError):
# on some targets "reboot" doesn't return gracefully
pass
def check_responsive(self):
try:
self.conn.execute('ls /', timeout=5)
except (TimeoutError, subprocess.CalledProcessError):
raise TargetNotRespondingError(self.conn.name)
# process management
def kill(self, pid, signal=None, as_root=False):
signal_string = '-s {}'.format(signal) if signal else ''
self.execute('kill {} {}'.format(signal_string, pid), as_root=as_root)
def killall(self, process_name, signal=None, as_root=False):
for pid in self.get_pids_of(process_name):
self.kill(pid, signal=signal, as_root=as_root)
def get_pids_of(self, process_name):
raise NotImplementedError()
def ps(self, **kwargs):
raise NotImplementedError()
# files
def file_exists(self, filepath):
command = 'if [ -e \'{}\' ]; then echo 1; else echo 0; fi'
return boolean(self.execute(command.format(filepath)).strip())
def list_file_systems(self):
output = self.execute('mount')
fstab = []
for line in output.split('\n'):
line = line.strip()
if not line:
continue
match = FSTAB_ENTRY_REGEX.search(line)
if match:
fstab.append(FstabEntry(match.group(1), match.group(2),
match.group(3), match.group(4),
None, None))
else: # assume pre-M Android
fstab.append(FstabEntry(*line.split()))
return fstab
def list_directory(self, path, as_root=False):
raise NotImplementedError()
def get_workpath(self, name):
return self.path.join(self.working_directory, name)
def tempfile(self, prefix='', suffix=''):
names = tempfile._get_candidate_names() # pylint: disable=W0212
for _ in xrange(tempfile.TMP_MAX):
name = names.next()
path = self.get_workpath(prefix + name + suffix)
if not self.file_exists(path):
return path
raise IOError('No usable temporary filename found')
def remove(self, path, as_root=False):
self.execute('rm -rf {}'.format(path), as_root=as_root)
# misc
def core_cpus(self, core):
return [i for i, c in enumerate(self.core_names) if c == core]
def list_online_cpus(self, core=None):
path = self.path.join('/sys/devices/system/cpu/online')
output = self.read_value(path)
all_online = ranges_to_list(output)
if core:
cpus = self.core_cpus(core)
if not cpus:
raise ValueError(core)
return [o for o in all_online if o in cpus]
else:
return all_online
def list_offline_cpus(self):
online = self.list_online_cpus()
return [c for c in xrange(self.number_of_cpus)
if c not in online]
def getenv(self, variable):
return self.execute('echo ${}'.format(variable)).rstrip('\r\n')
def capture_screen(self, filepath):
raise NotImplementedError()
def install(self, filepath, timeout=None, with_name=None):
raise NotImplementedError()
def uninstall(self, name):
raise NotImplementedError()
def get_installed(self, name):
for path in self.getenv('PATH').split(self.path.pathsep):
try:
if name in self.list_directory(path):
return self.path.join(path, name)
except TargetError:
pass # directory does not exist or no executable premssions
if self.file_exists(self.executables_directory):
if name in self.list_directory(self.executables_directory):
return self.path.join(self.executables_directory, name)
which = get_installed
def is_installed(self, name):
return bool(self.get_installed(name))
def bin(self, name):
return self._installed_binaries.get(name, name)
def has(self, modname):
return hasattr(self, identifier(modname))
def _update_modules(self, stage):
for mod in self.modules:
if isinstance(mod, dict):
mod, params = mod.items()[0]
else:
params = {}
mod = get_module(mod)
if not mod.stage == stage:
continue
if mod.probe(self):
self._install_module(mod, **params)
else:
msg = 'Module {} is not supported by the target'.format(mod.name)
if self.load_default_modules:
self.logger.debug(msg)
else:
self.logger.warning(msg)
def _install_module(self, mod, **params):
if mod.name not in self._installed_modules:
self.logger.debug('Installing module {}'.format(mod.name))
mod.install(self, **params)
self._installed_modules[mod.name] = mod
else:
self.logger.debug('Module {} is already installed.'.format(mod.name))
def _resolve_paths(self):
raise NotImplementedError()
class LinuxTarget(Target):
conn_cls = SshConnection
path = posixpath
os = 'linux'
@property
@memoized
def abi(self):
value = self.execute('uname -m').strip()
for abi, architectures in ABI_MAP.iteritems():
if value in architectures:
result = abi
break
else:
result = value
return result
@property
@memoized
def os_version(self):
os_version = {}
try:
command = 'ls /etc/*-release /etc*-version /etc/*_release /etc/*_version 2>/dev/null'
version_files = self.execute(command, check_exit_code=False).strip().split()
for vf in version_files:
name = self.path.basename(vf)
output = self.read_value(vf)
os_version[name] = output.strip().replace('\n', ' ')
except TargetError:
raise
return os_version
def connect(self, timeout=None):
super(LinuxTarget, self).connect(timeout=timeout)
def kick_off(self, command, as_root=False):
command = 'sh -c "{}" 1>/dev/null 2>/dev/null &'.format(escape_double_quotes(command))
return self.conn.execute(command, as_root=as_root)
def get_pids_of(self, process_name):
"""Returns a list of PIDs of all processes with the specified name."""
# result should be a column of PIDs with the first row as "PID" header
result = self.execute('ps -C {} -o pid'.format(process_name), # NOQA
check_exit_code=False).strip().split()
if len(result) >= 2: # at least one row besides the header
return map(int, result[1:])
else:
return []
def ps(self, **kwargs):
command = 'ps -eo user,pid,ppid,vsize,rss,wchan,pcpu,state,fname'
lines = iter(convert_new_lines(self.execute(command)).split('\n'))
lines.next() # header
result = []
for line in lines:
parts = re.split(r'\s+', line, maxsplit=8)
if parts and parts != ['']:
result.append(PsEntry(*(parts[0:1] + map(int, parts[1:5]) + parts[5:])))
if not kwargs:
return result
else:
filtered_result = []
for entry in result:
if all(getattr(entry, k) == v for k, v in kwargs.iteritems()):
filtered_result.append(entry)
return filtered_result
def list_directory(self, path, as_root=False):
contents = self.execute('ls -1 {}'.format(path), as_root=as_root)
return [x.strip() for x in contents.split('\n') if x.strip()]
def install(self, filepath, timeout=None, with_name=None): # pylint: disable=W0221
destpath = self.path.join(self.executables_directory,
with_name and with_name or self.path.basename(filepath))
self.push(filepath, destpath)
self.execute('chmod a+x {}'.format(destpath), timeout=timeout)
self._installed_binaries[self.path.basename(destpath)] = destpath
return destpath
def uninstall(self, name):
path = self.path.join(self.executables_directory, name)
self.remove(path)
def capture_screen(self, filepath):
if not self.is_installed('scrot'):
self.logger.debug('Could not take screenshot as scrot is not installed.')
return
try:
tmpfile = self.tempfile()
self.execute('DISPLAY=:0.0 scrot {}'.format(tmpfile))
self.pull(tmpfile, filepath)
self.remove(tmpfile)
except TargetError as e:
if "Can't open X dispay." not in e.message:
raise e
message = e.message.split('OUTPUT:', 1)[1].strip() # pylint: disable=no-member
self.logger.debug('Could not take screenshot: {}'.format(message))
def _resolve_paths(self):
if self.working_directory is None:
if self.connected_as_root:
self.working_directory = '/root/devlib-target'
else:
self.working_directory = '/home/{}/devlib-target'.format(self.user)
if self.executables_directory is None:
self.executables_directory = self.path.join(self.working_directory, 'bin')
class AndroidTarget(Target):
conn_cls = AdbConnection
path = posixpath
os = 'android'
@property
@memoized
def abi(self):
return self.getprop()['ro.product.cpu.abi'].split('-')[0]
@property
@memoized
def os_version(self):
os_version = {}
for k, v in self.getprop().iteritems():
if k.startswith('ro.build.version'):
part = k.split('.')[-1]
os_version[part] = v
return os_version
@property
def adb_name(self):
return self.conn.device
@property
@memoized
def screen_resolution(self):
output = self.execute('dumpsys window')
match = ANDROID_SCREEN_RESOLUTION_REGEX.search(output)
if match:
return (int(match.group('width')),
int(match.group('height')))
else:
return (0, 0)
def reset(self, fastboot=False): # pylint: disable=arguments-differ
try:
self.execute('reboot {}'.format(fastboot and 'fastboot' or ''),
as_root=self.is_rooted, timeout=2)
except (TargetError, TimeoutError, subprocess.CalledProcessError):
# on some targets "reboot" doesn't return gracefully
pass
def connect(self, timeout=10, check_boot_completed=True): # pylint: disable=arguments-differ
start = time.time()
device = self.connection_settings.get('device')
if device and ':' in device:
# ADB does not automatically remove a network device from it's
# devices list when the connection is broken by the remote, so the
# adb connection may have gone "stale", resulting in adb blocking
# indefinitely when making calls to the device. To avoid this,
# always disconnect first.
adb_disconnect(device)
super(AndroidTarget, self).connect(timeout=timeout)
if check_boot_completed:
boot_completed = boolean(self.getprop('sys.boot_completed'))
while not boot_completed and timeout >= time.time() - start:
time.sleep(5)
boot_completed = boolean(self.getprop('sys.boot_completed'))
if not boot_completed:
raise TargetError('Connected but Android did not fully boot.')
def setup(self, executables=None):
super(AndroidTarget, self).setup(executables)
self.execute('mkdir -p {}'.format(self._file_transfer_cache))
def kick_off(self, command, as_root=False):
"""
Like execute but closes adb session and returns immediately, leaving the command running on the
device (this is different from execute(background=True) which keeps adb connection open and returns
a subprocess object).
.. note:: This relies on busybox's nohup applet and so won't work on unrooted devices.
"""
if not self.is_rooted:
raise TargetError('kick_off uses busybox\'s nohup applet and so can only be run a rooted device.')
try:
command = 'cd {} && {} nohup {}'.format(self.working_directory, self.bin('busybox'), command)
output = self.execute(command, timeout=1, as_root=as_root)
except TimeoutError:
pass
else:
raise ValueError('Background command exited before timeout; got "{}"'.format(output))
def list_directory(self, path, as_root=False):
contents = self.execute('ls {}'.format(path), as_root=as_root)
return [x.strip() for x in contents.split('\n') if x.strip()]
def install(self, filepath, timeout=None, with_name=None): # pylint: disable=W0221
ext = os.path.splitext(filepath)[1].lower()
if ext == '.apk':
return self.install_apk(filepath, timeout)
else:
return self.install_executable(filepath, with_name)
def uninstall(self, name):
if self.package_is_installed(name):
self.uninstall_package(name)
else:
self.uninstall_executable(name)
def get_pids_of(self, process_name):
result = self.execute('ps {}'.format(process_name[-15:]), check_exit_code=False).strip()
if result and 'not found' not in result:
return [int(x.split()[1]) for x in result.split('\n')[1:]]
else:
return []
def ps(self, **kwargs):
lines = iter(convert_new_lines(self.execute('ps')).split('\n'))
lines.next() # header
result = []
for line in lines:
parts = line.split()
if parts:
result.append(PsEntry(*(parts[0:1] + map(int, parts[1:5]) + parts[5:])))
if not kwargs:
return result
else:
filtered_result = []
for entry in result:
if all(getattr(entry, k) == v for k, v in kwargs.iteritems()):
filtered_result.append(entry)
return filtered_result
def capture_screen(self, filepath):
on_device_file = self.path.join(self.working_directory, 'screen_capture.png')
self.execute('screencap -p {}'.format(on_device_file))
self.pull(on_device_file, filepath)
self.remove(on_device_file)
def push(self, source, dest, as_root=False, timeout=None): # pylint: disable=arguments-differ
if not as_root:
self.conn.push(source, dest, timeout=timeout)
else:
device_tempfile = self.path.join(self._file_transfer_cache, source.lstrip(self.path.sep))
self.execute('mkdir -p {}'.format(self.path.dirname(device_tempfile)))
self.conn.push(source, device_tempfile, timeout=timeout)
self.execute('cp {} {}'.format(device_tempfile, dest), as_root=True)
def pull(self, source, dest, as_root=False, timeout=None): # pylint: disable=arguments-differ
if not as_root:
self.conn.pull(source, dest, timeout=timeout)
else:
device_tempfile = self.path.join(self._file_transfer_cache, source.lstrip(self.path.sep))
self.execute('mkdir -p {}'.format(self.path.dirname(device_tempfile)))
self.execute('cp {} {}'.format(source, device_tempfile), as_root=True)
self.conn.pull(device_tempfile, dest, timeout=timeout)
# Android-specific
def swipe_to_unlock(self):
width, height = self.screen_resolution
swipe_heigh = height * 2 // 3
start = 100
stop = width - start
command = 'input swipe {} {} {} {}'
self.execute(command.format(start, swipe_heigh, stop, swipe_heigh))
def getprop(self, prop=None):
props = AndroidProperties(self.execute('getprop'))
if prop:
return props[prop]
return props
def is_installed(self, name):
return super(AndroidTarget, self).is_installed(name) or self.package_is_installed(name)
def package_is_installed(self, package_name):
return package_name in self.list_packages()
def list_packages(self):
output = self.execute('pm list packages')
output = output.replace('package:', '')
return output.split()
def get_package_version(self, package):
output = self.execute('dumpsys package {}'.format(package))
for line in convert_new_lines(output).split('\n'):
if 'versionName' in line:
return line.split('=', 1)[1]
return None
def install_apk(self, filepath, timeout=None): # pylint: disable=W0221
ext = os.path.splitext(filepath)[1].lower()
if ext == '.apk':
return adb_command(self.adb_name, "install {}".format(filepath), timeout=timeout)
else:
raise TargetError('Can\'t install {}: unsupported format.'.format(filepath))
def install_executable(self, filepath, with_name=None):
self._ensure_executables_directory_is_writable()
executable_name = with_name or os.path.basename(filepath)
on_device_file = self.path.join(self.working_directory, executable_name)
on_device_executable = self.path.join(self.executables_directory, executable_name)
self.push(filepath, on_device_file)
if on_device_file != on_device_executable:
self.execute('cp {} {}'.format(on_device_file, on_device_executable), as_root=self.is_rooted)
self.remove(on_device_file, as_root=self.is_rooted)
self.execute('chmod 0777 {}'.format(on_device_executable), as_root=self.is_rooted)
self._installed_binaries[executable_name] = on_device_executable
return on_device_executable
def uninstall_package(self, package):
adb_command(self.adb_name, "uninstall {}".format(package), timeout=30)
def uninstall_executable(self, executable_name):
on_device_executable = self.path.join(self.executables_directory, executable_name)
self._ensure_executables_directory_is_writable()
self.remove(on_device_executable, as_root=self.is_rooted)
def dump_logcat(self, filepath, filter=None, append=False, timeout=30): # pylint: disable=redefined-builtin
op = '>>' if append == True else '>'
filtstr = ' -s {}'.format(filter) if filter else ''
command = 'logcat -d{} {} {}'.format(filtstr, op, filepath)
adb_command(self.adb_name, command, timeout=timeout)
def clear_logcat(self):
adb_command(self.adb_name, 'logcat -c', timeout=30)
def is_screen_on(self):
output = self.execute('dumpsys power')
match = ANDROID_SCREEN_STATE_REGEX.search(output)
if match:
return boolean(match.group(1))
else:
raise TargetError('Could not establish screen state.')
def ensure_screen_is_on(self):
if not self.is_screen_on():
self.execute('input keyevent 26')
def _resolve_paths(self):
if self.working_directory is None:
self.working_directory = '/data/local/tmp/devlib-target'
self._file_transfer_cache = self.path.join(self.working_directory, '.file-cache')
if self.executables_directory is None:
self.executables_directory = self.path.join(self.working_directory, 'bin')
def _ensure_executables_directory_is_writable(self):
matched = []
for entry in self.list_file_systems():
if self.executables_directory.rstrip('/').startswith(entry.mount_point):
matched.append(entry)
if matched:
entry = sorted(matched, key=lambda x: len(x.mount_point))[-1]
if 'rw' not in entry.options:
self.execute('mount -o rw,remount {} {}'.format(entry.device,
entry.mount_point),
as_root=True)
else:
message = 'Could not find mount point for executables directory {}'
raise TargetError(message.format(self.executables_directory))
FstabEntry = namedtuple('FstabEntry', ['device', 'mount_point', 'fs_type', 'options', 'dump_freq', 'pass_num'])
PsEntry = namedtuple('PsEntry', 'user pid ppid vsize rss wchan pc state name')
class Cpuinfo(object):
@property
@memoized
def architecture(self):
for section in self.sections:
if 'CPU architecture' in section:
return section['CPU architecture']
if 'architecture' in section:
return section['architecture']
@property
@memoized
def cpu_names(self):
cpu_names = []
global_name = None
for section in self.sections:
if 'processor' in section:
if 'CPU part' in section:
cpu_names.append(_get_part_name(section))
elif 'model name' in section:
cpu_names.append(_get_model_name(section))
else:
cpu_names.append(None)
elif 'CPU part' in section:
global_name = _get_part_name(section)
return [caseless_string(c or global_name) for c in cpu_names]
def __init__(self, text):
self.sections = None
self.text = None
self.parse(text)
@memoized
def get_cpu_features(self, cpuid=0):
global_features = []
for section in self.sections:
if 'processor' in section:
if int(section.get('processor')) != cpuid:
continue
if 'Features' in section:
return section.get('Features').split()
elif 'flags' in section:
return section.get('flags').split()
elif 'Features' in section:
global_features = section.get('Features').split()
elif 'flags' in section:
global_features = section.get('flags').split()
return global_features
def parse(self, text):
self.sections = []
current_section = {}
self.text = text.strip()
for line in self.text.split('\n'):
line = line.strip()
if line:
key, value = line.split(':', 1)
current_section[key.strip()] = value.strip()
else: # not line
self.sections.append(current_section)
current_section = {}
self.sections.append(current_section)
def __str__(self):
return 'CpuInfo({})'.format(self.cpu_names)
__repr__ = __str__
class KernelVersion(object):
def __init__(self, version_string):
if ' #' in version_string:
release, version = version_string.split(' #')
self.release = release
self.version = version
elif version_string.startswith('#'):
self.release = ''
self.version = version_string
else:
self.release = version_string
self.version = ''
def __str__(self):
return '{} {}'.format(self.release, self.version)
__repr__ = __str__
class KernelConfig(object):
not_set_regex = re.compile(r'# (\S+) is not set')
@staticmethod
def get_config_name(name):
name = name.upper()
if not name.startswith('CONFIG_'):
name = 'CONFIG_' + name
return name
def iteritems(self):
return self._config.iteritems()
def __init__(self, text):
self.text = text
self._config = {}
for line in text.split('\n'):
line = line.strip()
if line.startswith('#'):
match = self.not_set_regex.search(line)
if match:
self._config[match.group(1)] = 'n'
elif '=' in line:
name, value = line.split('=', 1)
self._config[name.strip()] = value.strip()
def get(self, name):
return self._config.get(self.get_config_name(name))
def like(self, name):
regex = re.compile(name, re.I)
result = {}
for k, v in self._config.iteritems():
if regex.search(k):
result[k] = v
return result
def is_enabled(self, name):
return self.get(name) == 'y'
def is_module(self, name):
return self.get(name) == 'm'
def is_not_set(self, name):
return self.get(name) == 'n'
def has(self, name):
return self.get(name) in ['m', 'y']
class LocalLinuxTarget(LinuxTarget):
conn_cls = LocalConnection
def _resolve_paths(self):
if self.working_directory is None:
self.working_directory = '/tmp'
if self.executables_directory is None:
self.executables_directory = '/tmp'
def _get_model_name(section):
name_string = section['model name']
parts = name_string.split('@')[0].strip().split()
return ' '.join([p for p in parts
if '(' not in p and p != 'CPU'])
def _get_part_name(section):
implementer = section.get('CPU implementer', '0x0')
part = section['CPU part']
variant = section.get('CPU variant', '0x0')
name = get_cpu_name(*map(integer, [implementer, part, variant]))
if name is None:
name = '{}/{}/{}'.format(implementer, part, variant)
return name