diff --git a/setup.py b/setup.py index 0b9bdf04..4a9b43dd 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ scripts = [os.path.join('scripts', s) for s in os.listdir('scripts')] params = dict( name='wlauto', - description='A framework for automating workload execution and measurment collection on ARM devices.', + description='A framework for automating workload execution and measurement collection on ARM devices.', version=get_wa_version(), packages=packages, package_data=data_files, @@ -72,13 +72,14 @@ params = dict( maintainer_email='workload-automation@arm.com', install_requires=[ 'python-dateutil', # converting between UTC and local time. - 'pexpect>=3.3', # Send/recieve to/from device + 'pexpect>=3.3', # Send/receive to/from device 'pyserial', # Serial port interface 'colorama', # Printing with colors 'pyYAML', # YAML-formatted agenda parsing 'requests', # Fetch assets over HTTP 'devlib', # Interacting with devices - 'louie' # callbacks dispatch + 'louie', # callbacks dispatch + 'wrapt', # better decorators ], extras_require={ 'other': ['jinja2', 'pandas>=0.13.1'], diff --git a/wa/__init__.py b/wa/__init__.py index 262984be..575c5963 100644 --- a/wa/__init__.py +++ b/wa/__init__.py @@ -1,11 +1,12 @@ from wa.framework import pluginloader, log, signal -from wa.framework.configuration import settings -from wa.framework.plugin import Plugin, Parameter from wa.framework.command import Command -from wa.framework.workload import Workload - -from wa.framework.exception import WAError, NotFoundError, ValidationError, WorkloadError +from wa.framework.configuration import settings from wa.framework.exception import HostError, JobError, InstrumentError, ConfigError -from wa.framework.exception import ResultProcessorError, ResourceError, CommandError, ToolError +from wa.framework.exception import (ResultProcessorError, ResourceError, + CommandError, ToolError) +from wa.framework.exception import (WAError, NotFoundError, ValidationError, + WorkloadError) from wa.framework.exception import WorkerThreadError, PluginLoaderError - +from wa.framework.instrumentation import Instrument +from wa.framework.plugin import Plugin, Parameter +from wa.framework.workload import Workload diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index c79df8b8..82237ce6 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -555,7 +555,7 @@ class MetaConfiguration(Configuration): plugin_packages = [ 'wa.commands', 'wa.workloads', - #'wa.instrumentation', + 'wa.instrumentation', #'wa.result_processors', #'wa.managers', 'wa.framework.target.descriptor', diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 442adf21..032d9e52 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -1,4 +1,5 @@ import random +import logging from itertools import izip_longest, groupby, chain from wa.framework import pluginloader @@ -6,6 +7,8 @@ from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration JobGenerator, settings) from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache +from wa.framework.exception import NotFoundError +from wa.utils.types import enum class CombinedConfig(object): @@ -26,33 +29,54 @@ class CombinedConfig(object): 'run_config': self.run_config.to_pod()} -class JobStatus: - PENDING = 0 - RUNNING = 1 - OK = 2 - FAILED = 3 - PARTIAL = 4 - ABORTED = 5 - PASSED = 6 - +JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', + 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) class Job(object): + @property + def id(self): + return self.spec.id + def __init__(self, spec, iteration, context): + self.logger = logging.getLogger('job') self.spec = spec self.iteration = iteration self.context = context - self.status = 'new' + self.status = JobStatus.NEW self.workload = None self.output = None def load(self, target, loader=pluginloader): + self.logger.debug('Loading job {}'.format(self.id)) self.workload = loader.get_workload(self.spec.workload_name, target, **self.spec.workload_parameters) self.workload.init_resources(self.context) self.workload.validate() + self.status = JobStatus.LOADED + def initialize(self, context): + self.logger.info('Initializing job {}'.format(self.id)) + self.status = JobStatus.PENDING + + def configure_target(self, context): + self.logger.info('Configuring target for job {}'.format(self.id)) + + def setup(self, context): + self.logger.info('Setting up job {}'.format(self.id)) + + def run(self, context): + self.logger.info('Running job {}'.format(self.id)) + + def process_output(self, context): + self.looger.info('Processing output for job {}'.format(self.id)) + + def teardown(self, context): + self.logger.info('Tearing down job {}'.format(self.id)) + + def finalize(self, context): + self.logger.info('Finalizing job {}'.format(self.id)) class ConfigManager(object): """ @@ -108,8 +132,12 @@ class ConfigManager(object): def get_instruments(self, target): instruments = [] for name in self.enabled_instruments: - instruments.append(self.get_plugin(name, kind='instrument', - target=target)) + try: + instruments.append(self.get_plugin(name, kind='instrument', + target=target)) + except NotFoundError: + msg = 'Instrument "{}" not found' + raise NotFoundError(msg.format(name)) return instruments def finalize(self): diff --git a/wa/framework/entrypoint.py b/wa/framework/entrypoint.py index 3e73b910..db72b687 100644 --- a/wa/framework/entrypoint.py +++ b/wa/framework/entrypoint.py @@ -85,27 +85,7 @@ def main(): except KeyboardInterrupt: logging.info('Got CTRL-C. Aborting.') sys.exit(3) - except (WAError, DevlibError) as e: - logging.critical(e) - sys.exit(1) - except subprocess.CalledProcessError as e: - tb = get_traceback() - logging.critical(tb) - command = e.cmd - if e.args: - command = '{} {}'.format(command, ' '.join(e.args)) - message = 'Command \'{}\' returned non-zero exit status {}\nOUTPUT:\n{}\n' - logging.critical(message.format(command, e.returncode, e.output)) - sys.exit(2) - except SyntaxError as e: - tb = get_traceback() - logging.critical(tb) - message = 'Syntax Error in {}, line {}, offset {}:' - logging.critical(message.format(e.filename, e.lineno, e.offset)) - logging.critical('\t{}'.format(e.msg)) - sys.exit(2) except Exception as e: # pylint: disable=broad-except - tb = get_traceback() - logging.critical(tb) - logging.critical('{}({})'.format(e.__class__.__name__, e)) + if not getattr(e, 'logged', None): + log.log_error(e, logger) sys.exit(2) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index a5c79714..3b562fad 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -57,6 +57,8 @@ from wa.framework.exception import (WAError, ConfigError, TimeoutError, from wa.framework.plugin import Artifact from wa.framework.resource import ResourceResolver from wa.framework.target.info import TargetInfo +from wa.framework.target.manager import TargetManager +from wa.utils import log from wa.utils.misc import (ensure_directory_exists as _d, get_traceback, format_duration) from wa.utils.serializer import json @@ -74,15 +76,70 @@ REBOOT_DELAY = 3 class ExecutionContext(object): + @property + def previous_job(self): + if not self.job_queue: + return None + return self.job_queue[0] + + @property + def next_job(self): + if not self.completed_jobs: + return None + return self.completed_jobs[-1] + + @property + def spec_changed(self): + if self.previous_job is None and self.current_job is not None: # Start of run + return True + if self.previous_job is not None and self.current_job is None: # End of run + return True + return self.current_job.spec.id != self.previous_job.spec.id + + @property + def spec_will_change(self): + if self.current_job is None and self.next_job is not None: # Start of run + return True + if self.current_job is not None and self.next_job is None: # End of run + return True + return self.current_job.spec.id != self.next_job.spec.id + def __init__(self, cm, tm, output): - self.logger = logging.getLogger('ExecContext') + self.logger = logging.getLogger('context') self.cm = cm self.tm = tm self.output = output self.logger.debug('Loading resource discoverers') self.resolver = ResourceResolver(cm) self.resolver.load() + self.job_queue = None + self.completed_jobs = None + self.current_job = None + + def start_run(self): + self.output.info.start_time = datetime.now() + self.output.write_info() + self.job_queue = copy(self.cm.jobs) + self.completed_jobs = [] + + def end_run(self): + self.output.info.end_time = datetime.now() + self.output.info.duration = self.output.info.end_time -\ + self.output.info.start_time + self.output.write_info() + + def start_job(self): + if not self.job_queue: + raise RuntimeError('No jobs to run') + self.current_job = self.job_queue.pop(0) + return self.current_job + + def end_job(self): + if not self.current_job: + raise RuntimeError('No jobs in progress') + self.completed_jobs.append(self.current_job) + self.current_job = None class OldExecutionContext(object): @@ -147,22 +204,6 @@ class OldExecutionContext(object): self.job_iteration_counts = defaultdict(int) self.aborted = False self.runner = None - if config.agenda.filepath: - self.run_artifacts.append(Artifact('agenda', - os.path.join(self.host_working_directory, - os.path.basename(config.agenda.filepath)), - 'meta', - mandatory=True, - description='Agenda for this run.')) - for i, filepath in enumerate(settings.config_paths, 1): - name = 'config_{}'.format(i) - path = os.path.join(self.host_working_directory, - name + os.path.splitext(filepath)[1]) - self.run_artifacts.append(Artifact(name, - path, - kind='meta', - mandatory=True, - description='Config file used for the run.')) def initialize(self): if not os.path.isdir(self.run_output_directory): @@ -228,30 +269,6 @@ def _check_artifact_path(path, rootpath): return full_path -class FakeTargetManager(object): - # TODO: this is a FAKE - - def __init__(self, name, config): - self.device_name = name - self.device_config = config - - from devlib import LocalLinuxTarget - self.target = LocalLinuxTarget({'unrooted': True}) - - def get_target_info(self): - return TargetInfo(self.target) - - def validate_runtime_parameters(self, params): - pass - - def merge_runtime_parameters(self, params): - pass - - -def init_target_manager(config): - return FakeTargetManager(config.device, config.device_config) - - class Executor(object): """ The ``Executor``'s job is to set up the execution context and pass to a @@ -268,7 +285,7 @@ class Executor(object): # pylint: disable=R0915 def __init__(self): - self.logger = logging.getLogger('Executor') + self.logger = logging.getLogger('executor') self.error_logged = False self.warning_logged = False pluginloader = None @@ -297,7 +314,8 @@ class Executor(object): output.write_config(config) self.logger.info('Connecting to target') - target_manager = init_target_manager(config.run_config) + target_manager = TargetManager(config.run_config.device, + config.run_config.device_config) output.write_target_info(target_manager.get_target_info()) self.logger.info('Initializing execution conetext') @@ -312,6 +330,11 @@ class Executor(object): instrumentation.install(instrument) instrumentation.validate() + self.logger.info('Starting run') + runner = Runner(context) + runner.run() + + def execute_postamble(self): """ @@ -370,6 +393,92 @@ class Runner(object): """ + def __init__(self, context): + self.logger = logging.getLogger('runner') + self.context = context + self.output = self.context.output + self.config = self.context.cm + + def run(self): + self.send(signal.RUN_STARTED) + try: + self.initialize_run() + self.send(signal.RUN_INITIALIZED) + + while self.context.job_queue: + with signal.wrap('JOB_EXECUTION', self): + self.run_next_job(self.context) + except Exception as e: + if (not getattr(e, 'logged', None) and + not isinstance(e, KeyboardInterrupt)): + log.log_error(e, self.logger) + e.logged = True + raise e + finally: + self.finalize_run() + self.send(signal.RUN_COMPLETED) + + def initialize_run(self): + self.logger.info('Initializing run') + self.context.start_run() + log.indent() + for job in self.context.job_queue: + job.initialize(self.context) + log.dedent() + + def finalize_run(self): + self.logger.info('Finalizing run') + self.context.end_run() + + def run_next_job(self, context): + job = context.start_job() + self.logger.info('Running job {}'.format(job.id)) + job.status = JobStatus.RUNNING + log.indent() + self.send(signal.JOB_STARTED) + + with signal.wrap('JOB_TARGET_CONFIG', self): + job.configure_target(context) + + with signal.wrap('JOB_SETUP', self): + job.setup(context) + + try: + with signal.wrap('JOB_EXECUTION', self): + job.run(context) + + try: + with signal.wrap('JOB_OUTPUT_PROCESSED', self): + job.run(context) + except Exception: + job.status = JobStatus.PARTIAL + raise + except KeyboardInterrupt: + job.status = JobStatus.ABORTED + raise + except Exception as e: + job.status = JobStatus.FAILED + if not getattr(e, 'logged', None): + log.log_error(e, self.logger) + e.logged = True + raise e + finally: + # If setup was successfully completed, teardown must + # run even if the job failed + with signal.wrap('JOB_TEARDOWN', self): + job.teardown(context) + + log.dedent() + self.logger.info('Completing job {}'.format(job.id)) + self.send(signal.JOB_COMPLETED) + context.end_job() + + def send(self, s): + signal.send(s, self, self.context) + + def __str__(self): + return 'runner' + class RunnerJob(object): """ diff --git a/wa/framework/instrumentation.py b/wa/framework/instrumentation.py index f4d3e480..69386f13 100644 --- a/wa/framework/instrumentation.py +++ b/wa/framework/instrumentation.py @@ -102,7 +102,7 @@ import logging import inspect from collections import OrderedDict -import wa.framework.signal as signal +from wa.framework import signal from wa.framework.plugin import Plugin from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError from wa.utils.misc import get_traceback, isiterable diff --git a/wa/framework/output.py b/wa/framework/output.py index 77d5853e..c8d5dd4a 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -5,6 +5,7 @@ import string import sys import uuid from copy import copy +from datetime import timedelta from wlauto.core.configuration.configuration import JobSpec from wlauto.core.configuration.manager import ConfigManager @@ -25,10 +26,13 @@ class RunInfo(object): @staticmethod def from_pod(pod): uid = pod.pop('uuid') + duration = pod.pop('duration') if uid is not None: uid = uuid.UUID(uid) instance = RunInfo(**pod) instance.uuid = uid + instance.duration = duration if duration is None else\ + timedelta(seconds=duration) return instance def __init__(self, run_name=None, project=None, project_stage=None, @@ -44,6 +48,10 @@ class RunInfo(object): def to_pod(self): d = copy(self.__dict__) d['uuid'] = str(self.uuid) + if self.duration is None: + d['duration'] = self.duration + else: + d['duration'] = self.duration.total_seconds() return d diff --git a/wa/framework/plugin.py b/wa/framework/plugin.py index b642ee29..9f06de5e 100644 --- a/wa/framework/plugin.py +++ b/wa/framework/plugin.py @@ -406,7 +406,7 @@ class Plugin(object): return self.__class__.__name__ def __init__(self, **kwargs): - self.logger = logging.getLogger(self._classname) + self.logger = logging.getLogger(self.name) self._modules = [] self.capabilities = getattr(self.__class__, 'capabilities', []) for param in self.parameters: diff --git a/wa/framework/resource.py b/wa/framework/resource.py index e86eb830..6ef1992c 100644 --- a/wa/framework/resource.py +++ b/wa/framework/resource.py @@ -286,7 +286,7 @@ class ResourceResolver(object): """ def __init__(self, config): - self.logger = logging.getLogger(self.__class__.__name__) + self.logger = logging.getLogger('resolver') self.getters = defaultdict(prioritylist) self.config = config diff --git a/wa/framework/signal.py b/wa/framework/signal.py index dd19a5e5..7dbfd73d 100644 --- a/wa/framework/signal.py +++ b/wa/framework/signal.py @@ -27,7 +27,7 @@ from louie import dispatcher from wa.utils.types import prioritylist -logger = logging.getLogger('dispatcher') +logger = logging.getLogger('signal') class Signal(object): @@ -101,6 +101,27 @@ BEFORE_RUN_INIT = Signal('before-run-init', invert_priority=True) SUCCESSFUL_RUN_INIT = Signal('successful-run-init') AFTER_RUN_INIT = Signal('after-run-init') +BEFORE_JOB_TARGET_CONFIG = Signal('before-job-target-config', invert_priority=True) +SUCCESSFUL_JOB_TARGET_CONFIG = Signal('successful-job-target-config') +AFTER_JOB_TARGET_CONFIG = Signal('after-job-target-config') + +BEFORE_JOB_SETUP = Signal('before-job-setup', invert_priority=True) +SUCCESSFUL_JOB_SETUP = Signal('successful-job-setup') +AFTER_JOB_SETUP = Signal('after-job-setup') + +BEFORE_JOB_EXECUTION = Signal('before-job-execution', invert_priority=True) +SUCCESSFUL_JOB_EXECUTION = Signal('successful-job-execution') +AFTER_JOB_EXECUTION = Signal('after-job-execution') + +BEFORE_JOB_OUTPUT_PROCESSED = Signal('before-job-output-processed', + invert_priority=True) +SUCCESSFUL_JOB_OUTPUT_PROCESSED = Signal('successful-job-output-processed') +AFTER_JOB_OUTPUT_PROCESSED = Signal('after-job-output-processed') + +BEFORE_JOB_TEARDOWN = Signal('before-job-teardown', invert_priority=True) +SUCCESSFUL_JOB_TEARDOWN = Signal('successful-job-teardown') +AFTER_JOB_TEARDOWN = Signal('after-job-teardown') + BEFORE_FLASHING = Signal('before-flashing', invert_priority=True) SUCCESSFUL_FLASHING = Signal('successful-flashing') AFTER_FLASHING = Signal('after-flashing') @@ -250,6 +271,7 @@ def send(signal, sender=dispatcher.Anonymous, *args, **kwargs): The rest of the parameters will be passed on as aruments to the handler. """ + logger.debug('Sending {} from {}'.format(signal, sender)) return dispatcher.send(signal, sender, *args, **kwargs) @@ -266,6 +288,7 @@ def safe_send(signal, sender=dispatcher.Anonymous, to just ``[KeyboardInterrupt]``). """ try: + logger.debug('Safe-sending {} from {}'.format(signal, sender)) send(singnal, sender, *args, **kwargs) except Exception as e: if any(isinstance(e, p) for p in propagate): @@ -292,3 +315,17 @@ def wrap(signal_name, sender=dispatcher.Anonymous, safe=False, *args, **kwargs): finally: send_func(after_signal, sender, *args, **kwargs) + +def wrapped(signal_name, sender=dispatcher.Anonymous, safe=False): + """A decorator for wrapping function in signal dispatch.""" + @wrapt.decorator + def signal_wrapped(wrapped, instance, args, kwargs): + func_id = repr(wrapped) + + def signal_wrapper(*args, **kwargs): + with wrap(signal_name, sender, safe): + return wrapped(*args, **kwargs) + + return signal_wrapper(*args, **kwargs) + + return signal_wrapped diff --git a/wa/instrumentation/__init__.py b/wa/instrumentation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wa/instrumentation/misc/__init__.py b/wa/instrumentation/misc/__init__.py new file mode 100644 index 00000000..634f4014 --- /dev/null +++ b/wa/instrumentation/misc/__init__.py @@ -0,0 +1,391 @@ +# Copyright 2013-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# pylint: disable=W0613,no-member,attribute-defined-outside-init +""" + +Some "standard" instruments to collect additional info about workload execution. + +.. note:: The run() method of a Workload may perform some "boilerplate" as well as + the actual execution of the workload (e.g. it may contain UI automation + needed to start the workload). This "boilerplate" execution will also + be measured by these instruments. As such, they are not suitable for collected + precise data about specific operations. +""" +import os +import re +import logging +import time +import tarfile +from itertools import izip, izip_longest +from subprocess import CalledProcessError + +from devlib.exception import TargetError + +from devlib.utils.android import ApkInfo + +from wa import Instrument, Parameter +from wa.framework import signal +from wa.framework.exception import ConfigError +from wa.utils.misc import diff_tokens, write_table, check_output, as_relative +from wa.utils.misc import ensure_file_directory_exists as _f +from wa.utils.misc import ensure_directory_exists as _d +from wa.utils.types import list_of_strings + + +logger = logging.getLogger(__name__) + + +class SysfsExtractor(Instrument): + + name = 'sysfs_extractor' + description = """ + Collects the contest of a set of directories, before and after workload execution + and diffs the result. + + """ + + mount_command = 'mount -t tmpfs -o size={} tmpfs {}' + extract_timeout = 30 + tarname = 'sysfs.tar.gz' + DEVICE_PATH = 0 + BEFORE_PATH = 1 + AFTER_PATH = 2 + DIFF_PATH = 3 + + parameters = [ + Parameter('paths', kind=list_of_strings, mandatory=True, + description="""A list of paths to be pulled from the device. These could be directories + as well as files.""", + global_alias='sysfs_extract_dirs'), + Parameter('use_tmpfs', kind=bool, default=None, + description=""" + Specifies whether tmpfs should be used to cache sysfile trees and then pull them down + as a tarball. This is significantly faster then just copying the directory trees from + the device directly, bur requres root and may not work on all devices. Defaults to + ``True`` if the device is rooted and ``False`` if it is not. + """), + Parameter('tmpfs_mount_point', default=None, + description="""Mount point for tmpfs partition used to store snapshots of paths."""), + Parameter('tmpfs_size', default='32m', + description="""Size of the tempfs partition."""), + ] + + def initialize(self, context): + if not self.device.is_rooted and self.use_tmpfs: # pylint: disable=access-member-before-definition + raise ConfigError('use_tempfs must be False for an unrooted device.') + elif self.use_tmpfs is None: # pylint: disable=access-member-before-definition + self.use_tmpfs = self.device.is_rooted + + if self.use_tmpfs: + self.on_device_before = self.device.path.join(self.tmpfs_mount_point, 'before') + self.on_device_after = self.device.path.join(self.tmpfs_mount_point, 'after') + + if not self.device.file_exists(self.tmpfs_mount_point): + self.device.execute('mkdir -p {}'.format(self.tmpfs_mount_point), as_root=True) + self.device.execute(self.mount_command.format(self.tmpfs_size, self.tmpfs_mount_point), + as_root=True) + + def setup(self, context): + before_dirs = [ + _d(os.path.join(context.output_directory, 'before', self._local_dir(d))) + for d in self.paths + ] + after_dirs = [ + _d(os.path.join(context.output_directory, 'after', self._local_dir(d))) + for d in self.paths + ] + diff_dirs = [ + _d(os.path.join(context.output_directory, 'diff', self._local_dir(d))) + for d in self.paths + ] + self.device_and_host_paths = zip(self.paths, before_dirs, after_dirs, diff_dirs) + + if self.use_tmpfs: + for d in self.paths: + before_dir = self.device.path.join(self.on_device_before, + self.device.path.dirname(as_relative(d))) + after_dir = self.device.path.join(self.on_device_after, + self.device.path.dirname(as_relative(d))) + if self.device.file_exists(before_dir): + self.device.execute('rm -rf {}'.format(before_dir), as_root=True) + self.device.execute('mkdir -p {}'.format(before_dir), as_root=True) + if self.device.file_exists(after_dir): + self.device.execute('rm -rf {}'.format(after_dir), as_root=True) + self.device.execute('mkdir -p {}'.format(after_dir), as_root=True) + + def slow_start(self, context): + if self.use_tmpfs: + for d in self.paths: + dest_dir = self.device.path.join(self.on_device_before, as_relative(d)) + if '*' in dest_dir: + dest_dir = self.device.path.dirname(dest_dir) + self.device.execute('{} cp -Hr {} {}'.format(self.device.busybox, d, dest_dir), + as_root=True, check_exit_code=False) + else: # not rooted + for dev_dir, before_dir, _, _ in self.device_and_host_paths: + self.device.pull(dev_dir, before_dir) + + def slow_stop(self, context): + if self.use_tmpfs: + for d in self.paths: + dest_dir = self.device.path.join(self.on_device_after, as_relative(d)) + if '*' in dest_dir: + dest_dir = self.device.path.dirname(dest_dir) + self.device.execute('{} cp -Hr {} {}'.format(self.device.busybox, d, dest_dir), + as_root=True, check_exit_code=False) + else: # not using tmpfs + for dev_dir, _, after_dir, _ in self.device_and_host_paths: + self.device.pull(dev_dir, after_dir) + + def update_result(self, context): + if self.use_tmpfs: + on_device_tarball = self.device.path.join(self.device.working_directory, self.tarname) + on_host_tarball = self.device.path.join(context.output_directory, self.tarname) + self.device.execute('{} tar czf {} -C {} .'.format(self.device.busybox, + on_device_tarball, + self.tmpfs_mount_point), + as_root=True) + self.device.execute('chmod 0777 {}'.format(on_device_tarball), as_root=True) + self.device.pull(on_device_tarball, on_host_tarball) + with tarfile.open(on_host_tarball, 'r:gz') as tf: + tf.extractall(context.output_directory) + self.device.remove(on_device_tarball) + os.remove(on_host_tarball) + + for paths in self.device_and_host_paths: + after_dir = paths[self.AFTER_PATH] + dev_dir = paths[self.DEVICE_PATH].strip('*') # remove potential trailing '*' + if (not os.listdir(after_dir) and + self.device.file_exists(dev_dir) and + self.device.list_directory(dev_dir)): + self.logger.error('sysfs files were not pulled from the device.') + self.device_and_host_paths.remove(paths) # Path is removed to skip diffing it + for _, before_dir, after_dir, diff_dir in self.device_and_host_paths: + _diff_sysfs_dirs(before_dir, after_dir, diff_dir) + + def teardown(self, context): + self._one_time_setup_done = [] + + def finalize(self, context): + if self.use_tmpfs: + try: + self.device.execute('umount {}'.format(self.tmpfs_mount_point), as_root=True) + except (TargetError, CalledProcessError): + # assume a directory but not mount point + pass + self.device.execute('rm -rf {}'.format(self.tmpfs_mount_point), + as_root=True, check_exit_code=False) + + def validate(self): + if not self.tmpfs_mount_point: # pylint: disable=access-member-before-definition + self.tmpfs_mount_point = self.device.path.join(self.device.working_directory, 'temp-fs') + + def _local_dir(self, directory): + return os.path.dirname(as_relative(directory).replace(self.device.path.sep, os.sep)) + + +class ExecutionTimeInstrument(Instrument): + + name = 'execution_time' + description = """ + Measure how long it took to execute the run() methods of a Workload. + + """ + + priority = 15 + + def __init__(self, target, **kwargs): + super(ExecutionTimeInstrument, self).__init__(target, **kwargs) + self.start_time = None + self.end_time = None + + def on_run_start(self, context): + signal.connect(self.get_start_time, signal.BEFORE_WORKLOAD_EXECUTION, priority=self.priority) + signal.connect(self.get_stop_time, signal.AFTER_WORKLOAD_EXECUTION, priority=self.priority) + + def get_start_time(self, context): + self.start_time = time.time() + + def get_stop_time(self, context): + self.end_time = time.time() + + def update_result(self, context): + execution_time = self.end_time - self.start_time + context.result.add_metric('execution_time', execution_time, 'seconds') + + +class ApkVersion(Instrument): + + name = 'apk_version' + description = """ + Extracts APK versions for workloads that have them. + + """ + + def __init__(self, device, **kwargs): + super(ApkVersion, self).__init__(device, **kwargs) + self.apk_info = None + + def setup(self, context): + if hasattr(context.workload, 'apk_file'): + self.apk_info = ApkInfo(context.workload.apk_file) + else: + self.apk_info = None + + def update_result(self, context): + if self.apk_info: + context.result.add_metric(self.name, self.apk_info.version_name) + + +class InterruptStatsInstrument(Instrument): + + name = 'interrupts' + description = """ + Pulls the ``/proc/interrupts`` file before and after workload execution and diffs them + to show what interrupts occurred during that time. + + """ + + def __init__(self, device, **kwargs): + super(InterruptStatsInstrument, self).__init__(device, **kwargs) + self.before_file = None + self.after_file = None + self.diff_file = None + + def setup(self, context): + self.before_file = os.path.join(context.output_directory, 'before', 'proc', 'interrupts') + self.after_file = os.path.join(context.output_directory, 'after', 'proc', 'interrupts') + self.diff_file = os.path.join(context.output_directory, 'diff', 'proc', 'interrupts') + + def start(self, context): + with open(_f(self.before_file), 'w') as wfh: + wfh.write(self.device.execute('cat /proc/interrupts')) + + def stop(self, context): + with open(_f(self.after_file), 'w') as wfh: + wfh.write(self.device.execute('cat /proc/interrupts')) + + def update_result(self, context): + # If workload execution failed, the after_file may not have been created. + if os.path.isfile(self.after_file): + _diff_interrupt_files(self.before_file, self.after_file, _f(self.diff_file)) + + +class DynamicFrequencyInstrument(SysfsExtractor): + + name = 'cpufreq' + description = """ + Collects dynamic frequency (DVFS) settings before and after workload execution. + + """ + + tarname = 'cpufreq.tar.gz' + + parameters = [ + Parameter('paths', mandatory=False, override=True), + ] + + def setup(self, context): + self.paths = ['/sys/devices/system/cpu'] + if self.use_tmpfs: + self.paths.append('/sys/class/devfreq/*') # the '*' would cause problems for adb pull. + super(DynamicFrequencyInstrument, self).setup(context) + + def validate(self): + # temp-fs would have been set in super's validate, if not explicitly specified. + if not self.tmpfs_mount_point.endswith('-cpufreq'): # pylint: disable=access-member-before-definition + self.tmpfs_mount_point += '-cpufreq' + + +def _diff_interrupt_files(before, after, result): # pylint: disable=R0914 + output_lines = [] + with open(before) as bfh: + with open(after) as ofh: + for bline, aline in izip(bfh, ofh): + bchunks = bline.strip().split() + while True: + achunks = aline.strip().split() + if achunks[0] == bchunks[0]: + diffchunks = [''] + diffchunks.append(achunks[0]) + diffchunks.extend([diff_tokens(b, a) for b, a + in zip(bchunks[1:], achunks[1:])]) + output_lines.append(diffchunks) + break + else: # new category appeared in the after file + diffchunks = ['>'] + achunks + output_lines.append(diffchunks) + try: + aline = ofh.next() + except StopIteration: + break + + # Offset heading columns by one to allow for row labels on subsequent + # lines. + output_lines[0].insert(0, '') + + # Any "columns" that do not have headings in the first row are not actually + # columns -- they are a single column where space-spearated words got + # split. Merge them back together to prevent them from being + # column-aligned by write_table. + table_rows = [output_lines[0]] + num_cols = len(output_lines[0]) + for row in output_lines[1:]: + table_row = row[:num_cols] + table_row.append(' '.join(row[num_cols:])) + table_rows.append(table_row) + + with open(result, 'w') as wfh: + write_table(table_rows, wfh) + + +def _diff_sysfs_dirs(before, after, result): # pylint: disable=R0914 + before_files = [] + os.path.walk(before, + lambda arg, dirname, names: arg.extend([os.path.join(dirname, f) for f in names]), + before_files + ) + before_files = filter(os.path.isfile, before_files) + files = [os.path.relpath(f, before) for f in before_files] + after_files = [os.path.join(after, f) for f in files] + diff_files = [os.path.join(result, f) for f in files] + + for bfile, afile, dfile in zip(before_files, after_files, diff_files): + if not os.path.isfile(afile): + logger.debug('sysfs_diff: {} does not exist or is not a file'.format(afile)) + continue + + with open(bfile) as bfh, open(afile) as afh: # pylint: disable=C0321 + with open(_f(dfile), 'w') as dfh: + for i, (bline, aline) in enumerate(izip_longest(bfh, afh), 1): + if aline is None: + logger.debug('Lines missing from {}'.format(afile)) + break + bchunks = re.split(r'(\W+)', bline) + achunks = re.split(r'(\W+)', aline) + if len(bchunks) != len(achunks): + logger.debug('Token length mismatch in {} on line {}'.format(bfile, i)) + dfh.write('xxx ' + bline) + continue + if ((len([c for c in bchunks if c.strip()]) == len([c for c in achunks if c.strip()]) == 2) and + (bchunks[0] == achunks[0])): + # if there are only two columns and the first column is the + # same, assume it's a "header" column and do not diff it. + dchunks = [bchunks[0]] + [diff_tokens(b, a) for b, a in zip(bchunks[1:], achunks[1:])] + else: + dchunks = [diff_tokens(b, a) for b, a in zip(bchunks, achunks)] + dfh.write(''.join(dchunks)) diff --git a/wa/utils/types.py b/wa/utils/types.py index c23d2886..87071205 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -29,6 +29,7 @@ import os import re import math import shlex +import string from bisect import insort from collections import defaultdict, MutableMapping from copy import copy @@ -475,3 +476,69 @@ class obj_dict(MutableMapping): return self.__dict__['dict'][name] else: raise AttributeError("No such attribute: " + name) + + +class level(object): + """ + A level has a name and behaves like a string when printed, + however it also has a numeric value which is used in comparisons. + + """ + + def __init__(self, name, value): + self.name = name + self.value = value + + def __str__(self): + return self.name + + def __repr__(self): + return '{}({})'.format(self.name, self.value) + + def __cmp__(self, other): + if isinstance(other, level): + return cmp(self.value, other.value) + else: + return cmp(self.value, other) + + def __eq__(self, other): + if isinstance(other, level): + return self.value == other.value + else: + return self.value == other + + def __ne__(self, other): + if isinstance(other, level): + return self.value != other.value + else: + return self.value != other + + +def enum(args, start=0): + """ + Creates a class with attributes named by the first argument. + Each attribute is a ``level`` so they behave is integers in comparisons. + The value of the first attribute is specified by the second argument + (``0`` if not specified). + + :: + MyEnum = enum(['A', 'B', 'C']) + + is equivalent of:: + + class MyEnum(object): + A = 0 + B = 1 + C = 2 + + """ + + class Enum(object): + pass + + for i, v in enumerate(args, start): + name = string.upper(identifier(v)) + setattr(Enum, name, level(v, i)) + + return Enum +