diff --git a/setup.py b/setup.py index 0b9bdf04..4a9b43dd 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ scripts = [os.path.join('scripts', s) for s in os.listdir('scripts')] params = dict( name='wlauto', - description='A framework for automating workload execution and measurment collection on ARM devices.', + description='A framework for automating workload execution and measurement collection on ARM devices.', version=get_wa_version(), packages=packages, package_data=data_files, @@ -72,13 +72,14 @@ params = dict( maintainer_email='workload-automation@arm.com', install_requires=[ 'python-dateutil', # converting between UTC and local time. - 'pexpect>=3.3', # Send/recieve to/from device + 'pexpect>=3.3', # Send/receive to/from device 'pyserial', # Serial port interface 'colorama', # Printing with colors 'pyYAML', # YAML-formatted agenda parsing 'requests', # Fetch assets over HTTP 'devlib', # Interacting with devices - 'louie' # callbacks dispatch + 'louie', # callbacks dispatch + 'wrapt', # better decorators ], extras_require={ 'other': ['jinja2', 'pandas>=0.13.1'], diff --git a/wa/__init__.py b/wa/__init__.py index 262984be..575c5963 100644 --- a/wa/__init__.py +++ b/wa/__init__.py @@ -1,11 +1,12 @@ from wa.framework import pluginloader, log, signal -from wa.framework.configuration import settings -from wa.framework.plugin import Plugin, Parameter from wa.framework.command import Command -from wa.framework.workload import Workload - -from wa.framework.exception import WAError, NotFoundError, ValidationError, WorkloadError +from wa.framework.configuration import settings from wa.framework.exception import HostError, JobError, InstrumentError, ConfigError -from wa.framework.exception import ResultProcessorError, ResourceError, CommandError, ToolError +from wa.framework.exception import (ResultProcessorError, ResourceError, + CommandError, ToolError) +from wa.framework.exception import (WAError, NotFoundError, ValidationError, + WorkloadError) from wa.framework.exception import WorkerThreadError, PluginLoaderError - +from wa.framework.instrumentation import Instrument +from wa.framework.plugin import Plugin, Parameter +from wa.framework.workload import Workload diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 8d11ceb5..8bd8b3e2 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -20,9 +20,8 @@ from collections import OrderedDict, defaultdict from wa.framework.exception import ConfigError, NotFoundError from wa.framework.configuration.tree import SectionNode from wa.utils.misc import (get_article, merge_config_values) -from wa.utils.types import (identifier, integer, boolean, - list_of_strings, toggle_set, - obj_dict) +from wa.utils.types import (identifier, integer, boolean, list_of_strings, + list_of, toggle_set, obj_dict, enum) from wa.utils.serializer import is_pod # Mapping for kind conversion; see docs for convert_types below @@ -32,17 +31,9 @@ KIND_MAP = { dict: OrderedDict, } -ITERATION_STATUS = [ - 'NOT_STARTED', - 'RUNNING', +JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', + 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) - 'OK', - 'NONCRITICAL', - 'PARTIAL', - 'FAILED', - 'ABORTED', - 'SKIPPED', -] ########################## ### CONFIG POINT TYPES ### @@ -555,7 +546,7 @@ class MetaConfiguration(Configuration): plugin_packages = [ 'wa.commands', 'wa.workloads', - #'wa.instrumentation', + 'wa.instrumentation', #'wa.result_processors', #'wa.managers', 'wa.framework.target.descriptor', @@ -638,8 +629,7 @@ class RunConfiguration(Configuration): name = "Run Configuration" - # Metadata is separated out because it is not loaded into the auto - # generated config file + # Metadata is separated out because it is not loaded into the auto generated config file meta_data = [ ConfigurationPoint('run_name', kind=str, description=''' @@ -717,9 +707,9 @@ class RunConfiguration(Configuration): This setting defines what specific Device subclass will be used to interact the connected device. Obviously, this must match your setup. '''), - ConfigurationPoint('retry_on_status', kind=status_list, + ConfigurationPoint('retry_on_status', kind=list_of(JobStatus), default=['FAILED', 'PARTIAL'], - allowed_values=ITERATION_STATUS, + allowed_values=JobStatus.values, description=''' This is list of statuses on which a job will be cosidered to have failed and will be automatically retried up to ``max_retries`` times. This defaults to @@ -737,10 +727,10 @@ class RunConfiguration(Configuration): ``"ABORTED"`` The user interupted the workload '''), - ConfigurationPoint('max_retries', kind=int, default=3, + ConfigurationPoint('max_retries', kind=int, default=2, description=''' The maximum number of times failed jobs will be retried before giving up. If - not set, this will default to ``3``. + not set. .. note:: this number does not include the original attempt '''), @@ -918,8 +908,7 @@ class JobSpec(Configuration): except NotFoundError: global_runtime_params = {} for source in plugin_cache.sources: - if source in global_runtime_params: - runtime_parameters[source] = global_runtime_params[source] + runtime_parameters[source] = global_runtime_params[source] # Add runtime parameters from JobSpec for source, values in self.to_merge['runtime_parameters'].iteritems(): @@ -929,7 +918,11 @@ class JobSpec(Configuration): self.runtime_parameters = target_manager.merge_runtime_parameters(runtime_parameters) def finalize(self): - self.id = "-".join([source.config['id'] for source in self._sources[1:]]) # ignore first id, "global" + self.id = "-".join([source.config['id'] + for source in self._sources[1:]]) # ignore first id, "global" + if self.label is None: + self.label = self.workload_name + # This is used to construct the list of Jobs WA will run diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 442adf21..4cfddd53 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -1,11 +1,15 @@ import random +import logging from itertools import izip_longest, groupby, chain from wa.framework import pluginloader from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration, - JobGenerator, settings) + JobGenerator, JobStatus, settings) from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache +from wa.framework.exception import NotFoundError +from wa.framework.job import Job +from wa.utils.types import enum class CombinedConfig(object): @@ -26,34 +30,6 @@ class CombinedConfig(object): 'run_config': self.run_config.to_pod()} -class JobStatus: - PENDING = 0 - RUNNING = 1 - OK = 2 - FAILED = 3 - PARTIAL = 4 - ABORTED = 5 - PASSED = 6 - - -class Job(object): - - def __init__(self, spec, iteration, context): - self.spec = spec - self.iteration = iteration - self.context = context - self.status = 'new' - self.workload = None - self.output = None - - def load(self, target, loader=pluginloader): - self.workload = loader.get_workload(self.spec.workload_name, - target, - **self.spec.workload_parameters) - self.workload.init_resources(self.context) - self.workload.validate() - - class ConfigManager(object): """ Represents run-time state of WA. Mostly used as a container for loaded @@ -108,8 +84,12 @@ class ConfigManager(object): def get_instruments(self, target): instruments = [] for name in self.enabled_instruments: - instruments.append(self.get_plugin(name, kind='instrument', - target=target)) + try: + instruments.append(self.get_plugin(name, kind='instrument', + target=target)) + except NotFoundError: + msg = 'Instrument "{}" not found' + raise NotFoundError(msg.format(name)) return instruments def finalize(self): diff --git a/wa/framework/configuration/parsers.py b/wa/framework/configuration/parsers.py index 70f50857..7f1a747c 100644 --- a/wa/framework/configuration/parsers.py +++ b/wa/framework/configuration/parsers.py @@ -15,10 +15,10 @@ import os -from wlauto.exceptions import ConfigError -from wlauto.utils.serializer import read_pod, SerializerSyntaxError -from wlauto.utils.types import toggle_set, counter -from wlauto.core.configuration.configuration import JobSpec +from wa.framework.configuration.core import JobSpec +from wa.framework.exception import ConfigError +from wa.utils.serializer import json, read_pod, SerializerSyntaxError +from wa.utils.types import toggle_set, counter ############### @@ -32,7 +32,6 @@ class ConfigParser(object): def load(self, state, raw, source, wrap_exceptions=True): # pylint: disable=too-many-branches try: - state.plugin_cache.add_source(source) if 'run_name' in raw: msg = '"run_name" can only be specified in the config '\ 'section of an agenda' @@ -45,19 +44,19 @@ class ConfigParser(object): # Get WA core configuration for cfg_point in state.settings.configuration.itervalues(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: state.settings.set(cfg_point.name, value) # Get run specific configuration for cfg_point in state.run_config.configuration.itervalues(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: state.run_config.set(cfg_point.name, value) # Get global job spec configuration for cfg_point in JobSpec.configuration.itervalues(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: state.jobs_config.set_global_value(cfg_point.name, value) @@ -159,6 +158,13 @@ class AgendaParser(object): state.jobs_config) workloads.append(workload) + if 'params' in section: + if 'runtime_params' in section: + msg = 'both "params" and "runtime_params" specified in a '\ + 'section: "{}"' + raise ConfigError(msg.format(json.dumps(section, indent=None))) + section['runtime_params'] = section.pop('params') + section = _construct_valid_entry(section, seen_sect_ids, "s", state.jobs_config) state.jobs_config.add_section(section, workloads) @@ -168,7 +174,7 @@ class AgendaParser(object): ### Helper functions ### ######################## -def get_aliased_param(cfg_point, d, default=None, pop=True): +def pop_aliased_param(cfg_point, d, default=None): """ Given a ConfigurationPoint and a dict, this function will search the dict for the ConfigurationPoint's name/aliases. If more than one is found it will raise @@ -181,10 +187,7 @@ def get_aliased_param(cfg_point, d, default=None, pop=True): if len(alias_map) > 1: raise ConfigError(DUPLICATE_ENTRY_ERROR.format(aliases)) elif alias_map: - if pop: - return d.pop(alias_map[0]) - else: - return d[alias_map[0]] + return d.pop(alias_map[0]) else: return default @@ -204,7 +207,7 @@ def _load_file(filepath, error_name): def merge_result_processors_instruments(raw): instr_config = JobSpec.configuration['instrumentation'] - instruments = toggle_set(get_aliased_param(instr_config, raw, default=[])) + instruments = toggle_set(pop_aliased_param(instr_config, raw, default=[])) result_processors = toggle_set(raw.pop('result_processors', [])) if instruments and result_processors: conflicts = instruments.conflicts_with(result_processors) @@ -247,26 +250,12 @@ def _construct_valid_entry(raw, seen_ids, prefix, jobs_config): # Validate all workload_entry for name, cfg_point in JobSpec.configuration.iteritems(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: value = cfg_point.kind(value) cfg_point.validate_value(name, value) workload_entry[name] = value - wk_id = workload_entry['id'] - param_names = ['workload_params', 'workload_parameters'] - if prefix == 'wk': - param_names += ['params', 'parameters'] - workload_entry["workload_parameters"] = _pop_aliased(raw, param_names, wk_id) - - param_names = ['runtime_parameters', 'runtime_params'] - if prefix == 's': - param_names += ['params', 'parameters'] - workload_entry["runtime_parameters"] = _pop_aliased(raw, param_names, wk_id) - - param_names = ['boot_parameters', 'boot_params'] - workload_entry["boot_parameters"] = _pop_aliased(raw, param_names, wk_id) - if "instrumentation" in workload_entry: jobs_config.update_enabled_instruments(workload_entry["instrumentation"]) diff --git a/wa/framework/configuration/plugin_cache.py b/wa/framework/configuration/plugin_cache.py index d8a3f8e8..bfabb97c 100644 --- a/wa/framework/configuration/plugin_cache.py +++ b/wa/framework/configuration/plugin_cache.py @@ -181,74 +181,47 @@ class PluginCache(object): :rtype: A fully merged and validated configuration in the form of a obj_dict. """ - ms = MergeState() - ms.generic_name = generic_name - ms.specific_name = specific_name - ms.generic_config = copy(self.plugin_configs[generic_name]) - ms.specific_config = copy(self.plugin_configs[specific_name]) - ms.cfg_points = self.get_plugin_parameters(specific_name) + generic_config = copy(self.plugin_configs[generic_name]) + specific_config = copy(self.plugin_configs[specific_name]) + cfg_points = self.get_plugin_parameters(specific_name) sources = self.sources + seen_specific_config = defaultdict(list) # set_value uses the 'name' attribute of the passed object in it error # messages, to ensure these messages make sense the name will have to be # changed several times during this function. final_config.name = specific_name + # pylint: disable=too-many-nested-blocks for source in sources: try: - update_config_from_source(final_config, source, ms) + if source in generic_config: + final_config.name = generic_name + for name, cfg_point in cfg_points.iteritems(): + if name in generic_config[source]: + if name in seen_specific_config: + msg = ('"{generic_name}" configuration "{config_name}" has already been ' + 'specified more specifically for {specific_name} in:\n\t\t{sources}') + msg = msg.format(generic_name=generic_name, + config_name=name, + specific_name=specific_name, + sources=", ".join(seen_specific_config[name])) + raise ConfigError(msg) + value = generic_config[source][name] + cfg_point.set_value(final_config, value, check_mandatory=False) + + if source in specific_config: + final_config.name = specific_name + for name, cfg_point in cfg_points.iteritems(): + if name in specific_config[source]: + seen_specific_config[name].append(str(source)) + value = specific_config[source][name] + cfg_point.set_value(final_config, value, check_mandatory=False) + except ConfigError as e: raise ConfigError('Error in "{}":\n\t{}'.format(source, str(e))) # Validate final configuration final_config.name = specific_name - for cfg_point in ms.cfg_points.itervalues(): + for cfg_point in cfg_points.itervalues(): cfg_point.validate(final_config) - - -class MergeState(object): - - def __init__(self): - self.generic_name = None - self.specific_name = None - self.generic_config = None - self.specific_config = None - self.cfg_points = None - self.seen_specific_config = defaultdict(list) - - -def update_config_from_source(final_config, source, state): - if source in state.generic_config: - final_config.name = state.generic_name - for name, cfg_point in state.cfg_points.iteritems(): - if name in state.generic_config[source]: - if name in state.seen_specific_config: - msg = ('"{generic_name}" configuration "{config_name}" has ' - 'already been specified more specifically for ' - '{specific_name} in:\n\t\t{sources}') - seen_sources = state.seen_specific_config[name] - msg = msg.format(generic_name=generic_name, - config_name=name, - specific_name=specific_name, - sources=", ".join(seen_sources)) - raise ConfigError(msg) - value = state.generic_config[source].pop(name) - cfg_point.set_value(final_config, value, check_mandatory=False) - - if state.generic_config[source]: - msg = 'Unexected values for {}: {}' - raise ConfigError(msg.format(state.generic_name, - state.generic_config[source])) - - if source in state.specific_config: - final_config.name = state.specific_name - for name, cfg_point in state.cfg_points.iteritems(): - if name in state.specific_config[source]: - seen_state.specific_config[name].append(str(source)) - value = state.specific_config[source].pop(name) - cfg_point.set_value(final_config, value, check_mandatory=False) - - if state.specific_config[source]: - msg = 'Unexected values for {}: {}' - raise ConfigError(msg.format(state.specific_name, - state.specific_config[source])) diff --git a/wa/framework/entrypoint.py b/wa/framework/entrypoint.py index 3e73b910..db72b687 100644 --- a/wa/framework/entrypoint.py +++ b/wa/framework/entrypoint.py @@ -85,27 +85,7 @@ def main(): except KeyboardInterrupt: logging.info('Got CTRL-C. Aborting.') sys.exit(3) - except (WAError, DevlibError) as e: - logging.critical(e) - sys.exit(1) - except subprocess.CalledProcessError as e: - tb = get_traceback() - logging.critical(tb) - command = e.cmd - if e.args: - command = '{} {}'.format(command, ' '.join(e.args)) - message = 'Command \'{}\' returned non-zero exit status {}\nOUTPUT:\n{}\n' - logging.critical(message.format(command, e.returncode, e.output)) - sys.exit(2) - except SyntaxError as e: - tb = get_traceback() - logging.critical(tb) - message = 'Syntax Error in {}, line {}, offset {}:' - logging.critical(message.format(e.filename, e.lineno, e.offset)) - logging.critical('\t{}'.format(e.msg)) - sys.exit(2) except Exception as e: # pylint: disable=broad-except - tb = get_traceback() - logging.critical(tb) - logging.critical('{}({})'.format(e.__class__.__name__, e)) + if not getattr(e, 'logged', None): + log.log_error(e, logger) sys.exit(2) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 699f6494..45b78a53 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -58,6 +58,7 @@ from wa.framework.plugin import Artifact from wa.framework.resource import ResourceResolver from wa.framework.target.info import TargetInfo from wa.framework.target.manager import TargetManager +from wa.utils import log from wa.utils.misc import (ensure_directory_exists as _d, get_traceback, format_duration) from wa.utils.serializer import json @@ -75,15 +76,81 @@ REBOOT_DELAY = 3 class ExecutionContext(object): + @property + def previous_job(self): + if not self.job_queue: + return None + return self.job_queue[0] + + @property + def next_job(self): + if not self.completed_jobs: + return None + return self.completed_jobs[-1] + + @property + def spec_changed(self): + if self.previous_job is None and self.current_job is not None: # Start of run + return True + if self.previous_job is not None and self.current_job is None: # End of run + return True + return self.current_job.spec.id != self.previous_job.spec.id + + @property + def spec_will_change(self): + if self.current_job is None and self.next_job is not None: # Start of run + return True + if self.current_job is not None and self.next_job is None: # End of run + return True + return self.current_job.spec.id != self.next_job.spec.id + + @property + def output_directory(self): + if self.current_job: + return os.path.join(self.output.basepath, self.current_job.output_name) + return self.output.basepath def __init__(self, cm, tm, output): - self.logger = logging.getLogger('ExecContext') + self.logger = logging.getLogger('context') self.cm = cm self.tm = tm self.output = output self.logger.debug('Loading resource discoverers') self.resolver = ResourceResolver(cm) self.resolver.load() + self.job_queue = None + self.completed_jobs = None + self.current_job = None + + def start_run(self): + self.output.info.start_time = datetime.now() + self.output.write_info() + self.job_queue = copy(self.cm.jobs) + self.completed_jobs = [] + + def end_run(self): + self.output.info.end_time = datetime.now() + self.output.info.duration = self.output.info.end_time -\ + self.output.info.start_time + self.output.write_info() + + def start_job(self): + if not self.job_queue: + raise RuntimeError('No jobs to run') + self.current_job = self.job_queue.pop(0) + os.makedirs(self.output_directory) + return self.current_job + + def end_job(self): + if not self.current_job: + raise RuntimeError('No jobs in progress') + self.completed_jobs.append(self.current_job) + self.current_job = None + + def move_failed(self, job): + attempt = job.retries + 1 + failed_name = '{}-attempt{:02}'.format(job.output_name, attempt) + self.output.move_failed(job.output_name, failed_name) class OldExecutionContext(object): @@ -148,22 +215,6 @@ class OldExecutionContext(object): self.job_iteration_counts = defaultdict(int) self.aborted = False self.runner = None - if config.agenda.filepath: - self.run_artifacts.append(Artifact('agenda', - os.path.join(self.host_working_directory, - os.path.basename(config.agenda.filepath)), - 'meta', - mandatory=True, - description='Agenda for this run.')) - for i, filepath in enumerate(settings.config_paths, 1): - name = 'config_{}'.format(i) - path = os.path.join(self.host_working_directory, - name + os.path.splitext(filepath)[1]) - self.run_artifacts.append(Artifact(name, - path, - kind='meta', - mandatory=True, - description='Config file used for the run.')) def initialize(self): if not os.path.isdir(self.run_output_directory): @@ -245,7 +296,7 @@ class Executor(object): # pylint: disable=R0915 def __init__(self): - self.logger = logging.getLogger('Executor') + self.logger = logging.getLogger('executor') self.error_logged = False self.warning_logged = False pluginloader = None @@ -290,6 +341,11 @@ class Executor(object): instrumentation.install(instrument) instrumentation.validate() + self.logger.info('Starting run') + runner = Runner(context) + runner.run() + + def execute_postamble(self): """ @@ -348,6 +404,124 @@ class Runner(object): """ + def __init__(self, context): + self.logger = logging.getLogger('runner') + self.context = context + self.output = self.context.output + self.config = self.context.cm + + def run(self): + self.send(signal.RUN_STARTED) + try: + self.initialize_run() + self.send(signal.RUN_INITIALIZED) + + while self.context.job_queue: + with signal.wrap('JOB_EXECUTION', self): + self.run_next_job(self.context) + except Exception as e: + if (not getattr(e, 'logged', None) and + not isinstance(e, KeyboardInterrupt)): + log.log_error(e, self.logger) + e.logged = True + raise e + finally: + self.finalize_run() + self.send(signal.RUN_COMPLETED) + + def initialize_run(self): + self.logger.info('Initializing run') + self.context.start_run() + log.indent() + for job in self.context.job_queue: + job.initialize(self.context) + log.dedent() + + def finalize_run(self): + self.logger.info('Finalizing run') + self.context.end_run() + + def run_next_job(self, context): + job = context.start_job() + self.logger.info('Running job {}'.format(job.id)) + + try: + log.indent() + self.do_run_job(job, context) + except KeyboardInterrupt: + job.status = JobStatus.ABORTED + raise + except Exception as e: + job.status = JobStatus.FAILED + if not getattr(e, 'logged', None): + log.log_error(e, self.logger) + e.logged = True + finally: + self.logger.info('Completing job {}'.format(job.id)) + self.send(signal.JOB_COMPLETED) + context.end_job() + + log.dedent() + self.check_job(job) + + def do_run_job(self, job, context): + job.status = JobStatus.RUNNING + self.send(signal.JOB_STARTED) + + with signal.wrap('JOB_TARGET_CONFIG', self): + job.configure_target(context) + + with signal.wrap('JOB_SETUP', self): + job.setup(context) + + try: + with signal.wrap('JOB_EXECUTION', self): + job.run(context) + + try: + with signal.wrap('JOB_OUTPUT_PROCESSED', self): + job.process_output(context) + except Exception: + job.status = JobStatus.PARTIAL + raise + except KeyboardInterrupt: + job.status = JobStatus.ABORTED + raise + except Exception as e: + job.status = JobStatus.FAILED + if not getattr(e, 'logged', None): + log.log_error(e, self.logger) + e.logged = True + raise e + finally: + # If setup was successfully completed, teardown must + # run even if the job failed + with signal.wrap('JOB_TEARDOWN', self): + job.teardown(context) + + def check_job(self, job): + rc = self.context.cm.run_config + if job.status in rc.retry_on_status: + if job.retries < rc.max_retries: + msg = 'Job {} iteration {} complted with status {}. retrying...' + self.logger.error(msg.format(job.id, job.status, job.iteration)) + self.context.move_failed(job) + job.retries += 1 + job.status = JobStatus.PENDING + self.context.job_queue.insert(0, job) + else: + msg = 'Job {} iteration {} completed with status {}. '\ + 'Max retries exceeded.' + self.logger.error(msg.format(job.id, job.status, job.iteration)) + else: # status not in retry_on_status + self.logger.info('Job completed with status {}'.format(job.status)) + + def send(self, s): + signal.send(s, self, self.context) + + def __str__(self): + return 'runner' + class RunnerJob(object): """ diff --git a/wa/framework/instrumentation.py b/wa/framework/instrumentation.py index f4d3e480..69386f13 100644 --- a/wa/framework/instrumentation.py +++ b/wa/framework/instrumentation.py @@ -102,7 +102,7 @@ import logging import inspect from collections import OrderedDict -import wa.framework.signal as signal +from wa.framework import signal from wa.framework.plugin import Plugin from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError from wa.utils.misc import get_traceback, isiterable diff --git a/wa/framework/job.py b/wa/framework/job.py new file mode 100644 index 00000000..598d6072 --- /dev/null +++ b/wa/framework/job.py @@ -0,0 +1,57 @@ +import logging + +from wa.framework import pluginloader +from wa.framework.configuration.core import JobStatus + + +class Job(object): + + @property + def id(self): + return self.spec.id + + @property + def output_name(self): + return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) + + def __init__(self, spec, iteration, context): + self.logger = logging.getLogger('job') + self.spec = spec + self.iteration = iteration + self.context = context + self.status = JobStatus.NEW + self.workload = None + self.output = None + self.retries = 0 + + def load(self, target, loader=pluginloader): + self.logger.debug('Loading job {}'.format(self.id)) + self.workload = loader.get_workload(self.spec.workload_name, + target, + **self.spec.workload_parameters) + self.workload.init_resources(self.context) + self.workload.validate() + self.status = JobStatus.LOADED + + def initialize(self, context): + self.logger.info('Initializing job {}'.format(self.id)) + self.status = JobStatus.PENDING + + def configure_target(self, context): + self.logger.info('Configuring target for job {}'.format(self.id)) + + def setup(self, context): + self.logger.info('Setting up job {}'.format(self.id)) + + def run(self, context): + self.logger.info('Running job {}'.format(self.id)) + + def process_output(self, context): + self.logger.info('Processing output for job {}'.format(self.id)) + + def teardown(self, context): + self.logger.info('Tearing down job {}'.format(self.id)) + + def finalize(self, context): + self.logger.info('Finalizing job {}'.format(self.id)) + diff --git a/wa/framework/output.py b/wa/framework/output.py index 07912bb4..16853539 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -5,11 +5,12 @@ import string import sys import uuid from copy import copy +from datetime import timedelta from wa.framework.configuration.core import JobSpec from wa.framework.configuration.manager import ConfigManager from wa.framework.target.info import TargetInfo -from wa.utils.misc import touch +from wa.utils.misc import touch, ensure_directory_exists from wa.utils.serializer import write_pod, read_pod @@ -25,10 +26,13 @@ class RunInfo(object): @staticmethod def from_pod(pod): uid = pod.pop('uuid') + duration = pod.pop('duration') if uid is not None: uid = uuid.UUID(uid) instance = RunInfo(**pod) instance.uuid = uid + instance.duration = duration if duration is None else\ + timedelta(seconds=duration) return instance def __init__(self, run_name=None, project=None, project_stage=None, @@ -44,6 +48,10 @@ class RunInfo(object): def to_pod(self): d = copy(self.__dict__) d['uuid'] = str(self.uuid) + if self.duration is None: + d['duration'] = self.duration + else: + d['duration'] = self.duration.total_seconds() return d @@ -97,6 +105,11 @@ class RunOutput(object): def raw_config_dir(self): return os.path.join(self.metadir, 'raw_config') + @property + def failed_dir(self): + path = os.path.join(self.basepath, '__failed') + return ensure_directory_exists(path) + def __init__(self, path): self.basepath = path self.info = None @@ -144,6 +157,15 @@ class RunOutput(object): pod = read_pod(self.jobsfile) return [JobSpec.from_pod(jp) for jp in pod['jobs']] + def move_failed(self, name, failed_name): + path = os.path.join(self.basepath, name) + failed_path = os.path.join(self.failed_dir, failed_name) + if not os.path.exists(path): + raise ValueError('Path {} does not exist'.format(path)) + if os.path.exists(failed_path): + raise ValueError('Path {} already exists'.format(failed_path)) + shutil.move(path, failed_path) + def init_wa_output(path, wa_state, force=False): if os.path.exists(path): diff --git a/wa/framework/plugin.py b/wa/framework/plugin.py index b642ee29..f6ef4e2e 100644 --- a/wa/framework/plugin.py +++ b/wa/framework/plugin.py @@ -26,8 +26,8 @@ from itertools import chain from copy import copy from wa.framework.configuration.core import settings, ConfigurationPoint as Parameter -from wa.framework.exception import (NotFoundError, PluginLoaderError, ValidationError, - ConfigError, HostError) +from wa.framework.exception import (NotFoundError, PluginLoaderError, + ValidationError, ConfigError, HostError) from wa.utils import log from wa.utils.misc import (ensure_directory_exists as _d, walk_modules, load_class, merge_dicts_simple, get_article) @@ -406,7 +406,7 @@ class Plugin(object): return self.__class__.__name__ def __init__(self, **kwargs): - self.logger = logging.getLogger(self._classname) + self.logger = logging.getLogger(self.name) self._modules = [] self.capabilities = getattr(self.__class__, 'capabilities', []) for param in self.parameters: diff --git a/wa/framework/resource.py b/wa/framework/resource.py index e86eb830..6ef1992c 100644 --- a/wa/framework/resource.py +++ b/wa/framework/resource.py @@ -286,7 +286,7 @@ class ResourceResolver(object): """ def __init__(self, config): - self.logger = logging.getLogger(self.__class__.__name__) + self.logger = logging.getLogger('resolver') self.getters = defaultdict(prioritylist) self.config = config diff --git a/wa/framework/signal.py b/wa/framework/signal.py index dd19a5e5..7dbfd73d 100644 --- a/wa/framework/signal.py +++ b/wa/framework/signal.py @@ -27,7 +27,7 @@ from louie import dispatcher from wa.utils.types import prioritylist -logger = logging.getLogger('dispatcher') +logger = logging.getLogger('signal') class Signal(object): @@ -101,6 +101,27 @@ BEFORE_RUN_INIT = Signal('before-run-init', invert_priority=True) SUCCESSFUL_RUN_INIT = Signal('successful-run-init') AFTER_RUN_INIT = Signal('after-run-init') +BEFORE_JOB_TARGET_CONFIG = Signal('before-job-target-config', invert_priority=True) +SUCCESSFUL_JOB_TARGET_CONFIG = Signal('successful-job-target-config') +AFTER_JOB_TARGET_CONFIG = Signal('after-job-target-config') + +BEFORE_JOB_SETUP = Signal('before-job-setup', invert_priority=True) +SUCCESSFUL_JOB_SETUP = Signal('successful-job-setup') +AFTER_JOB_SETUP = Signal('after-job-setup') + +BEFORE_JOB_EXECUTION = Signal('before-job-execution', invert_priority=True) +SUCCESSFUL_JOB_EXECUTION = Signal('successful-job-execution') +AFTER_JOB_EXECUTION = Signal('after-job-execution') + +BEFORE_JOB_OUTPUT_PROCESSED = Signal('before-job-output-processed', + invert_priority=True) +SUCCESSFUL_JOB_OUTPUT_PROCESSED = Signal('successful-job-output-processed') +AFTER_JOB_OUTPUT_PROCESSED = Signal('after-job-output-processed') + +BEFORE_JOB_TEARDOWN = Signal('before-job-teardown', invert_priority=True) +SUCCESSFUL_JOB_TEARDOWN = Signal('successful-job-teardown') +AFTER_JOB_TEARDOWN = Signal('after-job-teardown') + BEFORE_FLASHING = Signal('before-flashing', invert_priority=True) SUCCESSFUL_FLASHING = Signal('successful-flashing') AFTER_FLASHING = Signal('after-flashing') @@ -250,6 +271,7 @@ def send(signal, sender=dispatcher.Anonymous, *args, **kwargs): The rest of the parameters will be passed on as aruments to the handler. """ + logger.debug('Sending {} from {}'.format(signal, sender)) return dispatcher.send(signal, sender, *args, **kwargs) @@ -266,6 +288,7 @@ def safe_send(signal, sender=dispatcher.Anonymous, to just ``[KeyboardInterrupt]``). """ try: + logger.debug('Safe-sending {} from {}'.format(signal, sender)) send(singnal, sender, *args, **kwargs) except Exception as e: if any(isinstance(e, p) for p in propagate): @@ -292,3 +315,17 @@ def wrap(signal_name, sender=dispatcher.Anonymous, safe=False, *args, **kwargs): finally: send_func(after_signal, sender, *args, **kwargs) + +def wrapped(signal_name, sender=dispatcher.Anonymous, safe=False): + """A decorator for wrapping function in signal dispatch.""" + @wrapt.decorator + def signal_wrapped(wrapped, instance, args, kwargs): + func_id = repr(wrapped) + + def signal_wrapper(*args, **kwargs): + with wrap(signal_name, sender, safe): + return wrapped(*args, **kwargs) + + return signal_wrapper(*args, **kwargs) + + return signal_wrapped diff --git a/wa/framework/target/descriptor.py b/wa/framework/target/descriptor.py index c1252289..34966367 100644 --- a/wa/framework/target/descriptor.py +++ b/wa/framework/target/descriptor.py @@ -24,35 +24,6 @@ def get_target_descriptions(loader=pluginloader): return targets.values() -def instantiate_target(tdesc, params, connect=None): - target_params = {p.name: p for p in tdesc.target_params} - platform_params = {p.name: p for p in tdesc.platform_params} - conn_params = {p.name: p for p in tdesc.conn_params} - - tp, pp, cp = {}, {}, {} - - for name, value in params.iteritems(): - if name in target_params: - tp[name] = value - elif name in platform_params: - pp[name] = value - elif name in conn_params: - cp[name] = value - else: - msg = 'Unexpected parameter for {}: {}' - raise ValueError(msg.format(tdesc.name, name)) - - tp['platform'] = (tdesc.platform or Platform)(**pp) - if cp: - tp['connection_settings'] = cp - if tdesc.connection: - tp['conn_cls'] = tdesc.connection - if connect is not None: - tp['connect'] = connect - - return tdesc.target(**tp) - - class TargetDescription(object): def __init__(self, name, source, description=None, target=None, platform=None, @@ -115,18 +86,6 @@ COMMON_TARGET_PARAMS = [ Please see ``devlab`` documentation for information on the available modules. '''), - Parameter('load_default_modules', kind=bool, default=True, - description=''' - A number of modules (e.g. for working with the cpufreq subsystem) are - loaded by default when a Target is instantiated. Setting this to - ``True`` would suppress that, ensuring that only the base Target - interface is initialized. - - You may want to set this if there is a problem with one or more default - modules on your platform (e.g. your device is unrooted and cpufreq is - not accessible to unprivileged users), or if Target initialization is - taking too long for your platform. - '''), ] COMMON_PLATFORM_PARAMS = [ diff --git a/wa/framework/target/info.py b/wa/framework/target/info.py index f3e40119..4341e155 100644 --- a/wa/framework/target/info.py +++ b/wa/framework/target/info.py @@ -1,7 +1,6 @@ from devlib import AndroidTarget from devlib.exception import TargetError from devlib.target import KernelConfig, KernelVersion, Cpuinfo -from devlib.utils.android import AndroidProperties class TargetInfo(object): @@ -22,9 +21,8 @@ class TargetInfo(object): if pod["target"] == "AndroidTarget": instance.screen_resolution = pod['screen_resolution'] - instance.prop = AndroidProperties('') - instance.prop._properties = pod['prop'] - instance.android_id = pod['android_id'] + instance.prop = pod['prop'] + instance.prop = pod['android_id'] return instance @@ -74,7 +72,7 @@ class TargetInfo(object): if self.target == "AndroidTarget": pod['screen_resolution'] = self.screen_resolution - pod['prop'] = self.prop._properties + pod['prop'] = self.prop pod['android_id'] = self.android_id return pod diff --git a/wa/framework/target/manager.py b/wa/framework/target/manager.py index 545178e6..659516d6 100644 --- a/wa/framework/target/manager.py +++ b/wa/framework/target/manager.py @@ -9,8 +9,6 @@ import sys from wa.framework import signal from wa.framework.exception import WorkerThreadError, ConfigError from wa.framework.plugin import Parameter -from wa.framework.target.descriptor import (get_target_descriptions, - instantiate_target) from wa.framework.target.info import TargetInfo from wa.framework.target.runtime_config import (SysfileValuesRuntimeConfig, HotplugRuntimeConfig, @@ -43,26 +41,54 @@ class TargetManager(object): """), ] + DEVICE_MAPPING = {'test' : {'platform_name':'generic', + 'target_name': 'android'}, + 'other': {'platform_name':'test', + 'target_name': 'linux'}, + } + runtime_config_cls = [ - # order matters - SysfileValuesRuntimeConfig, - HotplugRuntimeConfig, - CpufreqRuntimeConfig, - CpuidleRuntimeConfig, - ] + # order matters + SysfileValuesRuntimeConfig, + HotplugRuntimeConfig, + CpufreqRuntimeConfig, + CpuidleRuntimeConfig, + ] def __init__(self, name, parameters): - self.target_name = name + self.name = name self.target = None self.assistant = None + self.target_name = None self.platform_name = None self.parameters = parameters self.disconnect = parameters.get('disconnect') self.info = TargetInfo() - self._init_target() - self._init_assistant() + # Determine platform and target based on passed name + self._parse_name() + # Create target + self._get_target() + # Create an assistant to perform target specific configuration + self._get_assistant() + + ### HERE FOR TESTING, WILL BE CALLED EXTERNALLY ### + # Connect to device and retrieve details. + # self.initialize() + # self.add_parameters() + # self.validate_parameters() + # self.set_parameters() + + def initialize(self): self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls] + # if self.parameters: + # self.logger.info('Connecting to the device') + with signal.wrap('TARGET_CONNECT'): + self.target.connect() + # self.info.load(self.target) + # info_file = os.path.join(self.context.info_directory, 'target.json') + # with open(info_file, 'w') as wfh: + # json.dump(self.info.to_pod(), wfh) def finalize(self): # self.logger.info('Disconnecting from the device') @@ -82,16 +108,10 @@ class TargetManager(object): if any(parameter in name for parameter in cfg.supported_parameters): cfg.add(name, self.parameters.pop(name)) - def get_target_info(self): - return TargetInfo(self.target) - - def validate_runtime_parameters(self, params): + def validate_parameters(self): for cfg in self.runtime_configs: cfg.validate() - def merge_runtime_parameters(self, params): - pass - def set_parameters(self): for cfg in self.runtime_configs: cfg.set() @@ -100,25 +120,47 @@ class TargetManager(object): for cfg in self.runtime_configs: cfg.clear() - def _init_target(self): - target_map = {td.name: td for td in get_target_descriptions()} - if self.target_name not in target_map: - raise ValueError('Unknown Target: {}'.format(self.target_name)) - tdesc = target_map[self.target_name] - self.target = instantiate_target(tdesc, self.parameters, connect=False) - with signal.wrap('TARGET_CONNECT'): - self.target.connect() - self.target.setup() + def _parse_name(self): + # Try and get platform and target + self.name = identifier(self.name.replace('-', '_')) + if '_' in self.name: + self.platform_name, self.target_name = self.name.split('_', 1) + elif self.name in self.DEVICE_MAPPING: + self.platform_name = self.DEVICE_MAPPING[self.name]['platform_name'] + self.target_name = self.DEVICE_MAPPING[self.name]['target_name'] + else: + raise ConfigError('Unknown Device Specified {}'.format(self.name)) - def _init_assistant(self): - # Create a corresponding target and target-assistant to help with - # platformy stuff? - if self.target.os == 'android': + def _get_target(self): + # Create a corresponding target and target-assistant + if self.target_name == 'android': + self.target = AndroidTarget() + elif self.target_name == 'linux': + self.target = LinuxTarget() # pylint: disable=redefined-variable-type + elif self.target_name == 'localLinux': + self.target = LocalLinuxTarget() + else: + raise ConfigError('Unknown Target Specified {}'.format(self.target_name)) + + def _get_assistant(self): + # Create a corresponding target and target-assistant to help with platformy stuff? + if self.target_name == 'android': self.assistant = AndroidAssistant(self.target) - elif self.target.os == 'linux': + elif self.target_name in ['linux', 'localLinux']: self.assistant = LinuxAssistant(self.target) # pylint: disable=redefined-variable-type else: - raise ValueError('Unknown Target OS: {}'.format(self.target.os)) + raise ConfigError('Unknown Target Specified {}'.format(self.target_name)) + + # def validate_runtime_parameters(self, parameters): + # for name, value in parameters.iteritems(): + # self.add_parameter(name, value) + # self.validate_parameters() + + # def set_runtime_parameters(self, parameters): + # # self.clear() + # for name, value in parameters.iteritems(): + # self.add_parameter(name, value) + # self.set_parameters() class LinuxAssistant(object): diff --git a/wa/instrumentation/__init__.py b/wa/instrumentation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wa/instrumentation/misc/__init__.py b/wa/instrumentation/misc/__init__.py new file mode 100644 index 00000000..634f4014 --- /dev/null +++ b/wa/instrumentation/misc/__init__.py @@ -0,0 +1,391 @@ +# Copyright 2013-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# pylint: disable=W0613,no-member,attribute-defined-outside-init +""" + +Some "standard" instruments to collect additional info about workload execution. + +.. note:: The run() method of a Workload may perform some "boilerplate" as well as + the actual execution of the workload (e.g. it may contain UI automation + needed to start the workload). This "boilerplate" execution will also + be measured by these instruments. As such, they are not suitable for collected + precise data about specific operations. +""" +import os +import re +import logging +import time +import tarfile +from itertools import izip, izip_longest +from subprocess import CalledProcessError + +from devlib.exception import TargetError + +from devlib.utils.android import ApkInfo + +from wa import Instrument, Parameter +from wa.framework import signal +from wa.framework.exception import ConfigError +from wa.utils.misc import diff_tokens, write_table, check_output, as_relative +from wa.utils.misc import ensure_file_directory_exists as _f +from wa.utils.misc import ensure_directory_exists as _d +from wa.utils.types import list_of_strings + + +logger = logging.getLogger(__name__) + + +class SysfsExtractor(Instrument): + + name = 'sysfs_extractor' + description = """ + Collects the contest of a set of directories, before and after workload execution + and diffs the result. + + """ + + mount_command = 'mount -t tmpfs -o size={} tmpfs {}' + extract_timeout = 30 + tarname = 'sysfs.tar.gz' + DEVICE_PATH = 0 + BEFORE_PATH = 1 + AFTER_PATH = 2 + DIFF_PATH = 3 + + parameters = [ + Parameter('paths', kind=list_of_strings, mandatory=True, + description="""A list of paths to be pulled from the device. These could be directories + as well as files.""", + global_alias='sysfs_extract_dirs'), + Parameter('use_tmpfs', kind=bool, default=None, + description=""" + Specifies whether tmpfs should be used to cache sysfile trees and then pull them down + as a tarball. This is significantly faster then just copying the directory trees from + the device directly, bur requres root and may not work on all devices. Defaults to + ``True`` if the device is rooted and ``False`` if it is not. + """), + Parameter('tmpfs_mount_point', default=None, + description="""Mount point for tmpfs partition used to store snapshots of paths."""), + Parameter('tmpfs_size', default='32m', + description="""Size of the tempfs partition."""), + ] + + def initialize(self, context): + if not self.device.is_rooted and self.use_tmpfs: # pylint: disable=access-member-before-definition + raise ConfigError('use_tempfs must be False for an unrooted device.') + elif self.use_tmpfs is None: # pylint: disable=access-member-before-definition + self.use_tmpfs = self.device.is_rooted + + if self.use_tmpfs: + self.on_device_before = self.device.path.join(self.tmpfs_mount_point, 'before') + self.on_device_after = self.device.path.join(self.tmpfs_mount_point, 'after') + + if not self.device.file_exists(self.tmpfs_mount_point): + self.device.execute('mkdir -p {}'.format(self.tmpfs_mount_point), as_root=True) + self.device.execute(self.mount_command.format(self.tmpfs_size, self.tmpfs_mount_point), + as_root=True) + + def setup(self, context): + before_dirs = [ + _d(os.path.join(context.output_directory, 'before', self._local_dir(d))) + for d in self.paths + ] + after_dirs = [ + _d(os.path.join(context.output_directory, 'after', self._local_dir(d))) + for d in self.paths + ] + diff_dirs = [ + _d(os.path.join(context.output_directory, 'diff', self._local_dir(d))) + for d in self.paths + ] + self.device_and_host_paths = zip(self.paths, before_dirs, after_dirs, diff_dirs) + + if self.use_tmpfs: + for d in self.paths: + before_dir = self.device.path.join(self.on_device_before, + self.device.path.dirname(as_relative(d))) + after_dir = self.device.path.join(self.on_device_after, + self.device.path.dirname(as_relative(d))) + if self.device.file_exists(before_dir): + self.device.execute('rm -rf {}'.format(before_dir), as_root=True) + self.device.execute('mkdir -p {}'.format(before_dir), as_root=True) + if self.device.file_exists(after_dir): + self.device.execute('rm -rf {}'.format(after_dir), as_root=True) + self.device.execute('mkdir -p {}'.format(after_dir), as_root=True) + + def slow_start(self, context): + if self.use_tmpfs: + for d in self.paths: + dest_dir = self.device.path.join(self.on_device_before, as_relative(d)) + if '*' in dest_dir: + dest_dir = self.device.path.dirname(dest_dir) + self.device.execute('{} cp -Hr {} {}'.format(self.device.busybox, d, dest_dir), + as_root=True, check_exit_code=False) + else: # not rooted + for dev_dir, before_dir, _, _ in self.device_and_host_paths: + self.device.pull(dev_dir, before_dir) + + def slow_stop(self, context): + if self.use_tmpfs: + for d in self.paths: + dest_dir = self.device.path.join(self.on_device_after, as_relative(d)) + if '*' in dest_dir: + dest_dir = self.device.path.dirname(dest_dir) + self.device.execute('{} cp -Hr {} {}'.format(self.device.busybox, d, dest_dir), + as_root=True, check_exit_code=False) + else: # not using tmpfs + for dev_dir, _, after_dir, _ in self.device_and_host_paths: + self.device.pull(dev_dir, after_dir) + + def update_result(self, context): + if self.use_tmpfs: + on_device_tarball = self.device.path.join(self.device.working_directory, self.tarname) + on_host_tarball = self.device.path.join(context.output_directory, self.tarname) + self.device.execute('{} tar czf {} -C {} .'.format(self.device.busybox, + on_device_tarball, + self.tmpfs_mount_point), + as_root=True) + self.device.execute('chmod 0777 {}'.format(on_device_tarball), as_root=True) + self.device.pull(on_device_tarball, on_host_tarball) + with tarfile.open(on_host_tarball, 'r:gz') as tf: + tf.extractall(context.output_directory) + self.device.remove(on_device_tarball) + os.remove(on_host_tarball) + + for paths in self.device_and_host_paths: + after_dir = paths[self.AFTER_PATH] + dev_dir = paths[self.DEVICE_PATH].strip('*') # remove potential trailing '*' + if (not os.listdir(after_dir) and + self.device.file_exists(dev_dir) and + self.device.list_directory(dev_dir)): + self.logger.error('sysfs files were not pulled from the device.') + self.device_and_host_paths.remove(paths) # Path is removed to skip diffing it + for _, before_dir, after_dir, diff_dir in self.device_and_host_paths: + _diff_sysfs_dirs(before_dir, after_dir, diff_dir) + + def teardown(self, context): + self._one_time_setup_done = [] + + def finalize(self, context): + if self.use_tmpfs: + try: + self.device.execute('umount {}'.format(self.tmpfs_mount_point), as_root=True) + except (TargetError, CalledProcessError): + # assume a directory but not mount point + pass + self.device.execute('rm -rf {}'.format(self.tmpfs_mount_point), + as_root=True, check_exit_code=False) + + def validate(self): + if not self.tmpfs_mount_point: # pylint: disable=access-member-before-definition + self.tmpfs_mount_point = self.device.path.join(self.device.working_directory, 'temp-fs') + + def _local_dir(self, directory): + return os.path.dirname(as_relative(directory).replace(self.device.path.sep, os.sep)) + + +class ExecutionTimeInstrument(Instrument): + + name = 'execution_time' + description = """ + Measure how long it took to execute the run() methods of a Workload. + + """ + + priority = 15 + + def __init__(self, target, **kwargs): + super(ExecutionTimeInstrument, self).__init__(target, **kwargs) + self.start_time = None + self.end_time = None + + def on_run_start(self, context): + signal.connect(self.get_start_time, signal.BEFORE_WORKLOAD_EXECUTION, priority=self.priority) + signal.connect(self.get_stop_time, signal.AFTER_WORKLOAD_EXECUTION, priority=self.priority) + + def get_start_time(self, context): + self.start_time = time.time() + + def get_stop_time(self, context): + self.end_time = time.time() + + def update_result(self, context): + execution_time = self.end_time - self.start_time + context.result.add_metric('execution_time', execution_time, 'seconds') + + +class ApkVersion(Instrument): + + name = 'apk_version' + description = """ + Extracts APK versions for workloads that have them. + + """ + + def __init__(self, device, **kwargs): + super(ApkVersion, self).__init__(device, **kwargs) + self.apk_info = None + + def setup(self, context): + if hasattr(context.workload, 'apk_file'): + self.apk_info = ApkInfo(context.workload.apk_file) + else: + self.apk_info = None + + def update_result(self, context): + if self.apk_info: + context.result.add_metric(self.name, self.apk_info.version_name) + + +class InterruptStatsInstrument(Instrument): + + name = 'interrupts' + description = """ + Pulls the ``/proc/interrupts`` file before and after workload execution and diffs them + to show what interrupts occurred during that time. + + """ + + def __init__(self, device, **kwargs): + super(InterruptStatsInstrument, self).__init__(device, **kwargs) + self.before_file = None + self.after_file = None + self.diff_file = None + + def setup(self, context): + self.before_file = os.path.join(context.output_directory, 'before', 'proc', 'interrupts') + self.after_file = os.path.join(context.output_directory, 'after', 'proc', 'interrupts') + self.diff_file = os.path.join(context.output_directory, 'diff', 'proc', 'interrupts') + + def start(self, context): + with open(_f(self.before_file), 'w') as wfh: + wfh.write(self.device.execute('cat /proc/interrupts')) + + def stop(self, context): + with open(_f(self.after_file), 'w') as wfh: + wfh.write(self.device.execute('cat /proc/interrupts')) + + def update_result(self, context): + # If workload execution failed, the after_file may not have been created. + if os.path.isfile(self.after_file): + _diff_interrupt_files(self.before_file, self.after_file, _f(self.diff_file)) + + +class DynamicFrequencyInstrument(SysfsExtractor): + + name = 'cpufreq' + description = """ + Collects dynamic frequency (DVFS) settings before and after workload execution. + + """ + + tarname = 'cpufreq.tar.gz' + + parameters = [ + Parameter('paths', mandatory=False, override=True), + ] + + def setup(self, context): + self.paths = ['/sys/devices/system/cpu'] + if self.use_tmpfs: + self.paths.append('/sys/class/devfreq/*') # the '*' would cause problems for adb pull. + super(DynamicFrequencyInstrument, self).setup(context) + + def validate(self): + # temp-fs would have been set in super's validate, if not explicitly specified. + if not self.tmpfs_mount_point.endswith('-cpufreq'): # pylint: disable=access-member-before-definition + self.tmpfs_mount_point += '-cpufreq' + + +def _diff_interrupt_files(before, after, result): # pylint: disable=R0914 + output_lines = [] + with open(before) as bfh: + with open(after) as ofh: + for bline, aline in izip(bfh, ofh): + bchunks = bline.strip().split() + while True: + achunks = aline.strip().split() + if achunks[0] == bchunks[0]: + diffchunks = [''] + diffchunks.append(achunks[0]) + diffchunks.extend([diff_tokens(b, a) for b, a + in zip(bchunks[1:], achunks[1:])]) + output_lines.append(diffchunks) + break + else: # new category appeared in the after file + diffchunks = ['>'] + achunks + output_lines.append(diffchunks) + try: + aline = ofh.next() + except StopIteration: + break + + # Offset heading columns by one to allow for row labels on subsequent + # lines. + output_lines[0].insert(0, '') + + # Any "columns" that do not have headings in the first row are not actually + # columns -- they are a single column where space-spearated words got + # split. Merge them back together to prevent them from being + # column-aligned by write_table. + table_rows = [output_lines[0]] + num_cols = len(output_lines[0]) + for row in output_lines[1:]: + table_row = row[:num_cols] + table_row.append(' '.join(row[num_cols:])) + table_rows.append(table_row) + + with open(result, 'w') as wfh: + write_table(table_rows, wfh) + + +def _diff_sysfs_dirs(before, after, result): # pylint: disable=R0914 + before_files = [] + os.path.walk(before, + lambda arg, dirname, names: arg.extend([os.path.join(dirname, f) for f in names]), + before_files + ) + before_files = filter(os.path.isfile, before_files) + files = [os.path.relpath(f, before) for f in before_files] + after_files = [os.path.join(after, f) for f in files] + diff_files = [os.path.join(result, f) for f in files] + + for bfile, afile, dfile in zip(before_files, after_files, diff_files): + if not os.path.isfile(afile): + logger.debug('sysfs_diff: {} does not exist or is not a file'.format(afile)) + continue + + with open(bfile) as bfh, open(afile) as afh: # pylint: disable=C0321 + with open(_f(dfile), 'w') as dfh: + for i, (bline, aline) in enumerate(izip_longest(bfh, afh), 1): + if aline is None: + logger.debug('Lines missing from {}'.format(afile)) + break + bchunks = re.split(r'(\W+)', bline) + achunks = re.split(r'(\W+)', aline) + if len(bchunks) != len(achunks): + logger.debug('Token length mismatch in {} on line {}'.format(bfile, i)) + dfh.write('xxx ' + bline) + continue + if ((len([c for c in bchunks if c.strip()]) == len([c for c in achunks if c.strip()]) == 2) and + (bchunks[0] == achunks[0])): + # if there are only two columns and the first column is the + # same, assume it's a "header" column and do not diff it. + dchunks = [bchunks[0]] + [diff_tokens(b, a) for b, a in zip(bchunks[1:], achunks[1:])] + else: + dchunks = [diff_tokens(b, a) for b, a in zip(bchunks, achunks)] + dfh.write(''.join(dchunks)) diff --git a/wa/utils/log.py b/wa/utils/log.py index 8dbe5f20..906116e6 100644 --- a/wa/utils/log.py +++ b/wa/utils/log.py @@ -22,6 +22,8 @@ import subprocess import colorama +from devlib import DevlibError + from wa.framework import signal from wa.framework.exception import WAError from wa.utils.misc import get_traceback @@ -78,7 +80,7 @@ def set_level(level): def add_file(filepath, level=logging.DEBUG, - fmt='%(asctime)s %(levelname)-8s %(name)s: %(message)-10.10s'): + fmt='%(asctime)s %(levelname)-8s %(name)10.10s: %(message)s'): root_logger = logging.getLogger() file_handler = logging.FileHandler(filepath) file_handler.setLevel(level) @@ -142,7 +144,7 @@ def log_error(e, logger, critical=False): if isinstance(e, KeyboardInterrupt): log_func('Got CTRL-C. Aborting.') - elif isinstance(e, WAError): + elif isinstance(e, WAError) or isinstance(e, DevlibError): log_func(e) elif isinstance(e, subprocess.CalledProcessError): tb = get_traceback() diff --git a/wa/utils/serializer.py b/wa/utils/serializer.py index b2535961..56d8f988 100644 --- a/wa/utils/serializer.py +++ b/wa/utils/serializer.py @@ -126,6 +126,10 @@ class json(object): def dump(o, wfh, indent=4, *args, **kwargs): return _json.dump(o, wfh, cls=WAJSONEncoder, indent=indent, *args, **kwargs) + @staticmethod + def dumps(o, indent=4, *args, **kwargs): + return _json.dumps(o, cls=WAJSONEncoder, indent=indent, *args, **kwargs) + @staticmethod def load(fh, *args, **kwargs): try: diff --git a/wa/utils/types.py b/wa/utils/types.py index c23d2886..fed3004f 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -29,6 +29,7 @@ import os import re import math import shlex +import string from bisect import insort from collections import defaultdict, MutableMapping from copy import copy @@ -475,3 +476,91 @@ class obj_dict(MutableMapping): return self.__dict__['dict'][name] else: raise AttributeError("No such attribute: " + name) + + +class level(object): + """ + A level has a name and behaves like a string when printed, + however it also has a numeric value which is used in comparisons. + + """ + + def __init__(self, name, value): + self.name = name + self.value = value + + def __str__(self): + return self.name + + def __repr__(self): + return '{}({})'.format(self.name, self.value) + + def __cmp__(self, other): + if isinstance(other, level): + return cmp(self.value, other.value) + else: + return cmp(self.value, other) + + def __eq__(self, other): + if isinstance(other, level): + return self.value == other.value + elif isinstance(other, basestring): + return self.name == other + else: + return self.value == other + + def __ne__(self, other): + if isinstance(other, level): + return self.value != other.value + elif isinstance(other, basestring): + return self.name != other + else: + return self.value != other + + +def enum(args, start=0): + """ + Creates a class with attributes named by the first argument. + Each attribute is a ``level`` so they behave is integers in comparisons. + The value of the first attribute is specified by the second argument + (``0`` if not specified). + + :: + MyEnum = enum(['A', 'B', 'C']) + + is roughly equivalent of:: + + class MyEnum(object): + A = 0 + B = 1 + C = 2 + + however it also implement some specialized behaviors for comparisons and + instantiation. + + """ + + class Enum(object): + + def __new__(cls, name): + for attr_name in dir(cls): + if attr_name.startswith('__'): + continue + + attr = getattr(cls, attr_name) + if name == attr: + return attr + + raise ValueError('Invalid enum value: {}'.format(repr(name))) + + levels = [] + for i, v in enumerate(args, start): + name = string.upper(identifier(v)) + lv = level(v, i) + setattr(Enum, name, lv) + levels.append(lv) + + setattr(Enum, 'values', levels) + + return Enum +