From 24402660c4dae1a16b86285ff8d4383f6ce7d9f9 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Thu, 16 Mar 2017 17:54:48 +0000 Subject: [PATCH 1/9] Initial implementation of state tracking and output handling. --- wa/commands/run.py | 4 +- wa/framework/configuration/core.py | 6 +- wa/framework/configuration/execution.py | 2 + wa/framework/execution.py | 80 +++-- wa/framework/job.py | 16 +- wa/framework/output.py | 314 ++++++++++++++---- wa/framework/run.py | 418 ++++++------------------ wa/utils/serializer.py | 1 + wa/utils/types.py | 30 +- wa/workloads/dhrystone/__init__.py | 7 +- 10 files changed, 443 insertions(+), 435 deletions(-) diff --git a/wa/commands/run.py b/wa/commands/run.py index dc351e68..8ab6dd01 100644 --- a/wa/commands/run.py +++ b/wa/commands/run.py @@ -24,7 +24,7 @@ from wa.framework import pluginloader from wa.framework.configuration import RunConfiguration from wa.framework.configuration.parsers import AgendaParser, ConfigParser from wa.framework.execution import Executor -from wa.framework.output import init_wa_output +from wa.framework.output import init_run_output from wa.framework.version import get_wa_version from wa.framework.exception import NotFoundError, ConfigError from wa.utils import log @@ -123,7 +123,7 @@ class RunCommand(Command): output_directory = settings.default_output_directory self.logger.debug('Using output directory: {}'.format(output_directory)) try: - return init_wa_output(output_directory, config, args.force) + return init_run_output(output_directory, config, args.force) except RuntimeError as e: if 'path exists' in str(e): msg = 'Output directory "{}" exists.\nPlease specify another '\ diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 189e87c9..4dc94169 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -24,6 +24,7 @@ from wa.utils.types import (identifier, integer, boolean, list_of_strings, list_of, toggle_set, obj_dict, enum) from wa.utils.serializer import is_pod + # Mapping for kind conversion; see docs for convert_types below KIND_MAP = { int: integer, @@ -31,7 +32,10 @@ KIND_MAP = { dict: OrderedDict, } -JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', +RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', + 'COMPLETED', 'OK', 'FAILED', 'PARTIAL', 'ABORTED']) + +JobStatus = enum(['NEW', 'PENDING', 'RUNNING', 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 4cfddd53..cde9b260 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -9,6 +9,7 @@ from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache from wa.framework.exception import NotFoundError from wa.framework.job import Job +from wa.framework.run import JobState from wa.utils.types import enum @@ -106,6 +107,7 @@ class ConfigManager(object): job = Job(spec, i, context) job.load(context.tm.target) self._jobs.append(job) + context.run_state.add_job(job) self._jobs_generated = True diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 45b78a53..aafe4046 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -49,17 +49,18 @@ from itertools import izip_longest import wa.framework.signal as signal from wa.framework import instrumentation, pluginloader -from wa.framework.configuration.core import settings -from wa.framework.configuration.execution import JobStatus +from wa.framework.configuration.core import settings, RunStatus, JobStatus from wa.framework.exception import (WAError, ConfigError, TimeoutError, InstrumentError, TargetError, TargetNotRespondingError) +from wa.framework.output import init_job_output from wa.framework.plugin import Artifact from wa.framework.resource import ResourceResolver +from wa.framework.run import RunState from wa.framework.target.info import TargetInfo from wa.framework.target.manager import TargetManager from wa.utils import log -from wa.utils.misc import (ensure_directory_exists as _d, +from wa.utils.misc import (ensure_directory_exists as _d, merge_config_values, get_traceback, format_duration) from wa.utils.serializer import json @@ -105,16 +106,26 @@ class ExecutionContext(object): return self.current_job.spec.id != self.next_job.spec.id @property - def output_directory(self): + def job_output(self): if self.current_job: - return os.path.join(self.output.basepath, self.current_job.output_name) + return self.current_job.output + + @property + def output(self): + if self.current_job: + return self.job_output + return self.run_output + + @property + def output_directory(self): return self.output.basepath def __init__(self, cm, tm, output): self.logger = logging.getLogger('context') self.cm = cm self.tm = tm - self.output = output + self.run_output = output + self.run_state = output.state self.logger.debug('Loading resource discoverers') self.resolver = ResourceResolver(cm) self.resolver.load() @@ -127,31 +138,56 @@ class ExecutionContext(object): self.output.write_info() self.job_queue = copy(self.cm.jobs) self.completed_jobs = [] + self.run_state.status = RunStatus.STARTED + self.output.write_state() def end_run(self): self.output.info.end_time = datetime.now() self.output.info.duration = self.output.info.end_time -\ self.output.info.start_time self.output.write_info() + self.output.write_state() + self.output.write_result() def start_job(self): if not self.job_queue: raise RuntimeError('No jobs to run') self.current_job = self.job_queue.pop(0) - os.makedirs(self.output_directory) + self.current_job.output = init_job_output(self.run_output, self.current_job) + self.update_job_state(self.current_job) return self.current_job def end_job(self): if not self.current_job: raise RuntimeError('No jobs in progress') self.completed_jobs.append(self.current_job) + self.update_job_state(self.current_job) + self.output.write_result() self.current_job = None def move_failed(self, job): - attempt = job.retries + 1 - failed_name = '{}-attempt{:02}'.format(job.output_name, attempt) - self.output.move_failed(job.output_name, failed_name) + self.run_output.move_failed(job.output) + def update_job_state(self, job): + self.run_state.update_job(job) + self.run_output.write_state() + + def write_state(self): + self.run_output.write_state() + + def add_metric(self, name, value, units=None, lower_is_better=False, + classifiers=None): + if self.current_job: + classifiers = merge_config_values(self.current_job.classifiers, + classifiers) + self.output.add_metric(name, value, units, lower_is_better, classifiers) + + def add_artifact(self, name, path, kind, description=None, classifiers=None): + self.output.add_artifact(name, path, kind, description, classifiers) + + def add_run_artifact(self, name, path, kind, description=None, + classifiers=None): + self.run_output.add_artifact(name, path, kind, description, classifiers) class OldExecutionContext(object): """ @@ -335,6 +371,7 @@ class Executor(object): self.logger.info('Generating jobs') config_manager.generate_jobs(context) output.write_job_specs(config_manager.job_specs) + output.write_state() self.logger.info('Installing instrumentation') for instrument in config_manager.get_instruments(target_manager.target): @@ -345,8 +382,6 @@ class Executor(object): runner = Runner(context) runner.run() - - def execute_postamble(self): """ This happens after the run has completed. The overall results of the run are @@ -374,22 +409,6 @@ class Executor(object): self.logger.warn('There were warnings during execution.') self.logger.warn('Please see {}'.format(self.config.log_file)) - def _get_runner(self, result_manager): - if not self.config.execution_order or self.config.execution_order == 'by_iteration': - if self.config.reboot_policy == 'each_spec': - self.logger.info('each_spec reboot policy with the default by_iteration execution order is ' - 'equivalent to each_iteration policy.') - runnercls = ByIterationRunner - elif self.config.execution_order in ['classic', 'by_spec']: - runnercls = BySpecRunner - elif self.config.execution_order == 'by_section': - runnercls = BySectionRunner - elif self.config.execution_order == 'random': - runnercls = RandomRunner - else: - raise ConfigError('Unexpected execution order: {}'.format(self.config.execution_order)) - return runnercls(self.device_manager, self.context, result_manager) - def _error_signalled_callback(self): self.error_logged = True signal.disconnect(self._error_signalled_callback, signal.ERROR_LOGGED) @@ -436,6 +455,7 @@ class Runner(object): for job in self.context.job_queue: job.initialize(self.context) log.dedent() + self.context.write_state() def finalize_run(self): self.logger.info('Finalizing run') @@ -448,6 +468,7 @@ class Runner(object): try: log.indent() self.do_run_job(job, context) + job.status = JobStatus.OK except KeyboardInterrupt: job.status = JobStatus.ABORTED raise @@ -503,12 +524,13 @@ class Runner(object): rc = self.context.cm.run_config if job.status in rc.retry_on_status: if job.retries < rc.max_retries: - msg = 'Job {} iteration {} complted with status {}. retrying...' + msg = 'Job {} iteration {} completed with status {}. retrying...' self.logger.error(msg.format(job.id, job.status, job.iteration)) self.context.move_failed(job) job.retries += 1 job.status = JobStatus.PENDING self.context.job_queue.insert(0, job) + self.context.write_state() else: msg = 'Job {} iteration {} completed with status {}. '\ 'Max retries exceeded.' diff --git a/wa/framework/job.py b/wa/framework/job.py index 598d6072..af9df1da 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -11,8 +11,12 @@ class Job(object): return self.spec.id @property - def output_name(self): - return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) + def label(self): + return self.spec.label + + @property + def classifiers(self): + return self.spec.classifiers def __init__(self, spec, iteration, context): self.logger = logging.getLogger('job') @@ -31,27 +35,33 @@ class Job(object): **self.spec.workload_parameters) self.workload.init_resources(self.context) self.workload.validate() - self.status = JobStatus.LOADED def initialize(self, context): self.logger.info('Initializing job {}'.format(self.id)) + self.workload.initialize(context) self.status = JobStatus.PENDING + context.update_job_state(self) def configure_target(self, context): self.logger.info('Configuring target for job {}'.format(self.id)) def setup(self, context): self.logger.info('Setting up job {}'.format(self.id)) + self.workload.setup(context) def run(self, context): self.logger.info('Running job {}'.format(self.id)) + self.workload.run(context) def process_output(self, context): self.logger.info('Processing output for job {}'.format(self.id)) + self.workload.update_result(context) def teardown(self, context): self.logger.info('Tearing down job {}'.format(self.id)) + self.workload.teardown(context) def finalize(self, context): self.logger.info('Finalizing job {}'.format(self.id)) + self.workload.finalize(context) diff --git a/wa/framework/output.py b/wa/framework/output.py index 16853539..6111ecad 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -7,71 +7,52 @@ import uuid from copy import copy from datetime import timedelta -from wa.framework.configuration.core import JobSpec +from wa.framework.configuration.core import JobSpec, RunStatus from wa.framework.configuration.manager import ConfigManager +from wa.framework.run import RunState, RunInfo from wa.framework.target.info import TargetInfo from wa.utils.misc import touch, ensure_directory_exists from wa.utils.serializer import write_pod, read_pod +from wa.utils.types import enum, numeric logger = logging.getLogger('output') -class RunInfo(object): - """ - Information about the current run, such as its unique ID, run - time, etc. +class Output(object): - """ - @staticmethod - def from_pod(pod): - uid = pod.pop('uuid') - duration = pod.pop('duration') - if uid is not None: - uid = uuid.UUID(uid) - instance = RunInfo(**pod) - instance.uuid = uid - instance.duration = duration if duration is None else\ - timedelta(seconds=duration) - return instance + @property + def resultfile(self): + return os.path.join(self.basepath, 'result.json') - def __init__(self, run_name=None, project=None, project_stage=None, - start_time=None, end_time=None, duration=None): - self.uuid = uuid.uuid4() - self.run_name = None - self.project = None - self.project_stage = None - self.start_time = None - self.end_time = None - self.duration = None + def __init__(self, path): + self.basepath = path + self.result = None - def to_pod(self): - d = copy(self.__dict__) - d['uuid'] = str(self.uuid) - if self.duration is None: - d['duration'] = self.duration - else: - d['duration'] = self.duration.total_seconds() - return d + def reload(self): + pod = read_pod(self.resultfile) + self.result = Result.from_pod(pod) + + def write_result(self): + write_pod(self.result.to_pod(), self.resultfile) + + def add_metric(self, name, value, units=None, lower_is_better=False, + classifiers=None): + self.result.add_metric(name, value, units, lower_is_better, classifiers) + + def add_artifact(self, name, path, kind, description=None, classifiers=None): + if not os.path.exists(path): + msg = 'Attempting to add non-existing artifact: {}' + raise HostError(msg.format(path)) + path = os.path.relpath(path, self.basepath) + + if isinstance(kind, basestring): + kind = ArtifactType(kind) + + self.result.add_artifact(name, path, kind, description, classifiers) -class RunState(object): - """ - Represents the state of a WA run. - - """ - @staticmethod - def from_pod(pod): - return RunState() - - def __init__(self): - pass - - def to_pod(self): - return {} - - -class RunOutput(object): +class RunOutput(Output): @property def logfile(self): @@ -111,9 +92,11 @@ class RunOutput(object): return ensure_directory_exists(path) def __init__(self, path): - self.basepath = path + super(RunOutput, self).__init__(path) self.info = None self.state = None + self.result = None + self.jobs = None if (not os.path.isfile(self.statefile) or not os.path.isfile(self.infofile)): msg = '"{}" does not exist or is not a valid WA output directory.' @@ -121,8 +104,10 @@ class RunOutput(object): self.reload() def reload(self): + super(RunOutput, self).reload() self.info = RunInfo.from_pod(read_pod(self.infofile)) self.state = RunState.from_pod(read_pod(self.statefile)) + # TODO: propulate the jobs from info in the state def write_info(self): write_pod(self.info.to_pod(), self.infofile) @@ -157,17 +142,215 @@ class RunOutput(object): pod = read_pod(self.jobsfile) return [JobSpec.from_pod(jp) for jp in pod['jobs']] - def move_failed(self, name, failed_name): - path = os.path.join(self.basepath, name) + def move_failed(self, job_output): + name = os.path.basename(job_output.basepath) + attempt = job_output.retry + 1 + failed_name = '{}-attempt{:02}'.format(name, attempt) failed_path = os.path.join(self.failed_dir, failed_name) - if not os.path.exists(path): - raise ValueError('Path {} does not exist'.format(path)) if os.path.exists(failed_path): raise ValueError('Path {} already exists'.format(failed_path)) - shutil.move(path, failed_path) + shutil.move(job_output.basepath, failed_path) + job_output.basepath = failed_path -def init_wa_output(path, wa_state, force=False): +class JobOutput(Output): + + def __init__(self, path, id, label, iteration, retry): + self.basepath = path + self.id = id + self.label = label + self.iteration = iteration + self.retry = retry + self.result = None + self.reload() + + +class Result(object): + + @staticmethod + def from_pod(pod): + instance = Result() + instance.metrics = [Metric.from_pod(m) for m in pod['metrics']] + instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']] + return instance + + def __init__(self): + self.metrics = [] + self.artifacts = [] + + def add_metric(self, name, value, units=None, lower_is_better=False, + classifiers=None): + metric = Metric(name, value, units, lower_is_better, classifiers) + logger.debug('Adding metric: {}'.format(metric)) + self.metrics.append(metric) + + def add_artifact(self, name, path, kind, description=None, classifiers=None): + artifact = Artifact(name, path, kind, description=description, + classifiers=classifiers) + logger.debug('Adding artifact: {}'.format(artifact)) + self.artifacts.append(artifact) + + def to_pod(self): + return dict( + metrics=[m.to_pod() for m in self.metrics], + artifacts=[a.to_pod() for a in self.artifacts], + ) + + +ArtifactType = enum(['log', 'meta', 'data', 'export', 'raw']) + + +class Artifact(object): + """ + This is an artifact generated during execution/post-processing of a + workload. Unlike metrics, this represents an actual artifact, such as a + file, generated. This may be "result", such as trace, or it could be "meta + data" such as logs. These are distinguished using the ``kind`` attribute, + which also helps WA decide how it should be handled. Currently supported + kinds are: + + :log: A log file. Not part of "results" as such but contains + information about the run/workload execution that be useful for + diagnostics/meta analysis. + :meta: A file containing metadata. This is not part of "results", but + contains information that may be necessary to reproduce the + results (contrast with ``log`` artifacts which are *not* + necessary). + :data: This file contains new data, not available otherwise and should + be considered part of the "results" generated by WA. Most traces + would fall into this category. + :export: Exported version of results or some other artifact. This + signifies that this artifact does not contain any new data + that is not available elsewhere and that it may be safely + discarded without losing information. + :raw: Signifies that this is a raw dump/log that is normally processed + to extract useful information and is then discarded. In a sense, + it is the opposite of ``export``, but in general may also be + discarded. + + .. note:: whether a file is marked as ``log``/``data`` or ``raw`` + depends on how important it is to preserve this file, + e.g. when archiving, vs how much space it takes up. + Unlike ``export`` artifacts which are (almost) always + ignored by other exporters as that would never result + in data loss, ``raw`` files *may* be processed by + exporters if they decided that the risk of losing + potentially (though unlikely) useful data is greater + than the time/space cost of handling the artifact (e.g. + a database uploader may choose to ignore ``raw`` + artifacts, where as a network filer archiver may choose + to archive them). + + .. note: The kind parameter is intended to represent the logical + function of a particular artifact, not it's intended means of + processing -- this is left entirely up to the result + processors. + + """ + + @staticmethod + def from_pod(pod): + pod['kind'] = ArtifactType(pod['kind']) + return Artifact(**pod) + + def __init__(self, name, path, kind, description=None, classifiers=None): + """" + :param name: Name that uniquely identifies this artifact. + :param path: The *relative* path of the artifact. Depending on the + ``level`` must be either relative to the run or iteration + output directory. Note: this path *must* be delimited + using ``/`` irrespective of the + operating system. + :param kind: The type of the artifact this is (e.g. log file, result, + etc.) this will be used a hit to result processors. This + must be one of ``'log'``, ``'meta'``, ``'data'``, + ``'export'``, ``'raw'``. + :param description: A free-form description of what this artifact is. + :param classifiers: A set of key-value pairs to further classify this + metric beyond current iteration (e.g. this can be + used to identify sub-tests). + + """ + self.name = name + self.path = path.replace('/', os.sep) if path is not None else path + try: + self.kind = ArtifactType(kind) + except ValueError: + msg = 'Invalid Artifact kind: {}; must be in {}' + raise ValueError(msg.format(kind, self.valid_kinds)) + self.description = description + self.classifiers = classifiers or {} + + def to_pod(self): + pod = copy(self.__dict__) + pod['kind'] = str(self.kind) + return pod + + def __str__(self): + return self.path + + def __repr__(self): + return '{} ({}): {}'.format(self.name, self.kind, self.path) + + +class Metric(object): + """ + This is a single metric collected from executing a workload. + + :param name: the name of the metric. Uniquely identifies the metric + within the results. + :param value: The numerical value of the metric for this execution of a + workload. This can be either an int or a float. + :param units: Units for the collected value. Can be None if the value + has no units (e.g. it's a count or a standardised score). + :param lower_is_better: Boolean flag indicating where lower values are + better than higher ones. Defaults to False. + :param classifiers: A set of key-value pairs to further classify this + metric beyond current iteration (e.g. this can be used + to identify sub-tests). + + """ + + __slots__ = ['name', 'value', 'units', 'lower_is_better', 'classifiers'] + + @staticmethod + def from_pod(pod): + return Metric(**pod) + + def __init__(self, name, value, units=None, lower_is_better=False, + classifiers=None): + self.name = name + self.value = numeric(value) + self.units = units + self.lower_is_better = lower_is_better + self.classifiers = classifiers or {} + + def to_pod(self): + return dict( + name=self.name, + value=self.value, + units=self.units, + lower_is_better=self.lower_is_better, + classifiers=self.classifiers, + ) + + def __str__(self): + result = '{}: {}'.format(self.name, self.value) + if self.units: + result += ' ' + self.units + result += ' ({})'.format('-' if self.lower_is_better else '+') + return result + + def __repr__(self): + text = self.__str__() + if self.classifiers: + return '<{} {}>'.format(text, self.classifiers) + else: + return '<{}>'.format(text) + + + +def init_run_output(path, wa_state, force=False): if os.path.exists(path): if force: logger.info('Removing existing output directory.') @@ -188,13 +371,20 @@ def init_wa_output(path, wa_state, force=False): project_stage=wa_state.run_config.project_stage, ) write_pod(info.to_pod(), os.path.join(meta_dir, 'run_info.json')) - - with open(os.path.join(path, '.run_state.json'), 'w') as wfh: - wfh.write('{}') + write_pod(RunState().to_pod(), os.path.join(path, '.run_state.json')) + write_pod(Result().to_pod(), os.path.join(path, 'result.json')) return RunOutput(path) +def init_job_output(run_output, job): + output_name = '{}-{}-{}'.format(job.id, job.spec.label, job.iteration) + path = os.path.join(run_output.basepath, output_name) + ensure_directory_exists(path) + write_pod(Result().to_pod(), os.path.join(path, 'result.json')) + return JobOutput(path, job.id, job.iteration, job.label, job.retries) + + def _save_raw_config(meta_dir, state): raw_config_dir = os.path.join(meta_dir, 'raw_config') os.makedirs(raw_config_dir) @@ -206,5 +396,3 @@ def _save_raw_config(meta_dir, state): dest_path = os.path.join(raw_config_dir, 'cfg{}-{}'.format(i, basename)) shutil.copy(source, dest_path) - - diff --git a/wa/framework/run.py b/wa/framework/run.py index 72aacffd..b5ca5979 100644 --- a/wa/framework/run.py +++ b/wa/framework/run.py @@ -18,338 +18,112 @@ from copy import copy from datetime import datetime, timedelta from collections import OrderedDict -from wa.framework import signal, pluginloader, log -from wa.framework.plugin import Plugin -from wa.framework.output import Status -from wa.framework.resource import ResourceResolver -from wa.framework.exception import JobError -from wa.utils import counter -from wa.utils.serializer import json -from wa.utils.misc import ensure_directory_exists as _d -from wa.utils.types import caseless_string +from wa.framework.configuration.core import RunStatus +class RunInfo(object): + """ + Information about the current run, such as its unique ID, run + time, etc. -class JobActor(object): + """ + @staticmethod + def from_pod(pod): + uid = pod.pop('uuid') + duration = pod.pop('duration') + if uid is not None: + uid = uuid.UUID(uid) + instance = RunInfo(**pod) + instance.uuid = uid + instance.duration = duration if duration is None else\ + timedelta(seconds=duration) + return instance - def get_config(self): - return {} + def __init__(self, run_name=None, project=None, project_stage=None, + start_time=None, end_time=None, duration=None): + self.uuid = uuid.uuid4() + self.run_name = None + self.project = None + self.project_stage = None + self.start_time = None + self.end_time = None + self.duration = None - def initialize(self, context): - pass - - def run(self): - pass - - def finalize(self): - pass - - def restart(self): - pass - - def complete(self): - pass + def to_pod(self): + d = copy(self.__dict__) + d['uuid'] = str(self.uuid) + if self.duration is None: + d['duration'] = self.duration + else: + d['duration'] = self.duration.total_seconds() + return d -class RunnerJob(object): +class RunState(object): + """ + Represents the state of a WA run. + + """ + @staticmethod + def from_pod(pod): + instance = RunState() + instance.status = RunStatus(pod['status']) + instance.timestamp = pod['timestamp'] + jss = [JobState.from_pod(j) for j in pod['jobs']] + instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss) + return instance + + def __init__(self): + self.jobs = OrderedDict() + self.status = RunStatus.NEW + self.timestamp = datetime.now() + + def add_job(self, job): + job_state = JobState(job.id, job.label, job.iteration, job.status) + self.jobs[(job_state.id, job_state.iteration)] = job_state + + def update_job(self, job): + state = self.jobs[(job.id, job.iteration)] + state.status = job.status + state.timestamp = datetime.now() + + def to_pod(self): + return OrderedDict( + status=str(self.status), + timestamp=self.timestamp, + jobs=[j.to_pod() for j in self.jobs.itervalues()], + ) + + +class JobState(object): + + @staticmethod + def from_pod(pod): + instance = JobState(pod['id'], pod['label'], JobStatus(pod['status'])) + instance.retries = pod['retries'] + instance.iteration = pod['iteration'] + instance.timestamp = pod['timestamp'] + return instance @property - def status(self): - return self.output.status + def output_name(self): + return '{}-{}-{}'.format(self.id, self.label, self.iteration) - @status.setter - def status(self, value): - self.output.status = value - - @property - def should_retry(self): - return self.attempt <= self.max_retries - - def __init__(self, id, actor, output, max_retries): + def __init__(self, id, label, iteration, status): self.id = id - self.actor = actor - self.output = output - self.max_retries = max_retries - self.status = Status.NEW - self.attempt = 0 + self.label = label + self.iteration = iteration + self.status = status + self.retries = 0 + self.timestamp = datetime.now() - def initialize(self, context): - self.actor.initialize(context) - self.status = Status.PENDING - - def run(self): - self.status = Status.RUNNING - self.attempt += 1 - self.output.config = self.actor.get_config() - self.output.initialize() - self.actor.run() - self.status = Status.COMPLETE - - def finalize(self): - self.actor.finalize() - - def restart(self): - self.actor.restart() - - def complete(self): - self.actor.complete() - - -__run_methods = set() - - -def runmethod(method): - """ - A method decorator that ensures that a method is invoked only once per run. - - """ - def _method_wrapper(*args, **kwargs): - if method in __run_methods: - return - __run_methods.add(method) - ret = method(*args, **kwargs) - if ret is not None: - message = 'runmethod()\'s must return None; method "{}" returned "{}"' - raise RuntimeError(message.format(method, ret)) - return _method_wrapper - - -def reset_runmethods(): - global __run_methods - __run_methods = set() - - -class Runner(object): - - @property - def info(self): - return self.output.info - - @property - def status(self): - return self.output.status - - @status.setter - def status(self, value): - self.output.status = value - - @property - def jobs_pending(self): - return len(self.job_queue) > 0 - - @property - def current_job(self): - if self.job_queue: - return self.job_queue[0] - - @property - def previous_job(self): - if self.completed_jobs: - return self.completed_jobs[-1] - - @property - def next_job(self): - if len(self.job_queue) > 1: - return self.job_queue[1] - - def __init__(self, output): - self.logger = logging.getLogger('runner') - self.output = output - self.context = RunContext(self) - self.status = Status.NEW - self.job_queue = [] - self.completed_jobs = [] - self._known_ids = set([]) - - def add_job(self, job_id, actor, max_retries=2): - job_id = caseless_string(job_id) - if job_id in self._known_ids: - raise JobError('Job with id "{}" already exists'.format(job_id)) - output = self.output.create_job_output(job_id) - self.job_queue.append(RunnerJob(job_id, actor, output, max_retries)) - self._known_ids.add(job_id) - - def initialize(self): - self.logger.info('Initializing run') - self.start_time = datetime.now() - if not self.info.start_time: - self.info.start_time = self.start_time - self.info.duration = timedelta() - - self.context.initialize() - for job in self.job_queue: - job.initialize(self.context) - self.persist_state() - self.logger.info('Run initialized') - - def run(self): - self.status = Status.RUNNING - reset_runmethods() - signal.send(signal.RUN_STARTED, self, self.context) - self.initialize() - signal.send(signal.RUN_INITIALIZED, self, self.context) - self.run_jobs() - signal.send(signal.RUN_COMPLETED, self, self.context) - self.finalize() - signal.send(signal.RUN_FINALIZED, self, self.context) - - def run_jobs(self): - try: - self.logger.info('Running jobs') - while self.jobs_pending: - self.begin_job() - log.indent() - try: - self.current_job.run() - except KeyboardInterrupt: - self.current_job.status = Status.ABORTED - signal.send(signal.JOB_ABORTED, self, self.current_job) - raise - except Exception as e: - self.current_job.status = Status.FAILED - log.log_error(e, self.logger) - signal.send(signal.JOB_FAILED, self, self.current_job) - else: - self.current_job.status = Status.COMPLETE - finally: - log.dedent() - self.complete_job() - except KeyboardInterrupt: - self.status = Status.ABORTED - while self.job_queue: - job = self.job_queue.pop(0) - job.status = RunnerJob.ABORTED - self.completed_jobs.append(job) - signal.send(signal.RUN_ABORTED, self, self) - raise - except Exception as e: - self.status = Status.FAILED - log.log_error(e, self.logger) - signal.send(signal.RUN_FAILED, self, self) - else: - self.status = Status.COMPLETE - - def finalize(self): - self.logger.info('Finalizing run') - for job in self.job_queue: - job.finalize() - self.end_time = datetime.now() - self.info.end_time = self.end_time - self.info.duration += self.end_time - self.start_time - self.persist_state() - signal.send(signal.RUN_FINALIZED, self, self) - self.logger.info('Run completed') - - def begin_job(self): - self.logger.info('Starting job {}'.format(self.current_job.id)) - signal.send(signal.JOB_STARTED, self, self.current_job) - self.persist_state() - - def complete_job(self): - if self.current_job.status == Status.FAILED: - self.output.move_failed(self.current_job.output) - if self.current_job.should_retry: - self.logger.info('Restarting job {}'.format(self.current_job.id)) - self.persist_state() - self.current_job.restart() - signal.send(signal.JOB_RESTARTED, self, self.current_job) - return - - self.logger.info('Completing job {}'.format(self.current_job.id)) - self.current_job.complete() - self.persist_state() - signal.send(signal.JOB_COMPLETED, self, self.current_job) - job = self.job_queue.pop(0) - self.completed_jobs.append(job) - - def persist_state(self): - self.output.persist() - - -class RunContext(object): - """ - Provides a context for instrumentation. Keeps track of things like - current workload and iteration. - - """ - - @property - def run_output(self): - return self.runner.output - - @property - def current_job(self): - return self.runner.current_job - - @property - def run_output_directory(self): - return self.run_output.output_directory - - @property - def output_directory(self): - if self.runner.current_job: - return self.runner.current_job.output.output_directory - else: - return self.run_output.output_directory - - @property - def info_directory(self): - return self.run_output.info_directory - - @property - def config_directory(self): - return self.run_output.config_directory - - @property - def failed_directory(self): - return self.run_output.failed_directory - - @property - def log_file(self): - return os.path.join(self.output_directory, 'run.log') - - - def __init__(self, runner): - self.runner = runner - self.job = None - self.iteration = None - self.job_output = None - self.resolver = ResourceResolver() - - def initialize(self): - self.resolver.load() - - def get_path(self, subpath): - if self.current_job is None: - return self.run_output.get_path(subpath) - else: - return self.current_job.output.get_path(subpath) - - def add_metric(self, *args, **kwargs): - if self.current_job is None: - self.run_output.add_metric(*args, **kwargs) - else: - self.current_job.output.add_metric(*args, **kwargs) - - def add_artifact(self, name, path, kind, *args, **kwargs): - if self.current_job is None: - self.add_run_artifact(name, path, kind, *args, **kwargs) - else: - self.add_job_artifact(name, path, kind, *args, **kwargs) - - def add_run_artifact(self, *args, **kwargs): - self.run_output.add_artifiact(*args, **kwargs) - - def add_job_artifact(self, *args, **kwargs): - self.current_job.output.add_artifact(*args, **kwargs) - - def get_artifact(self, name): - if self.iteration_artifacts: - for art in self.iteration_artifacts: - if art.name == name: - return art - for art in self.run_artifacts: - if art.name == name: - return art - return None + def to_pod(self): + return OrderedDict( + id=self.id, + label=self.label, + iteration=self.iteration, + status=str(self.status), + retries=0, + timestamp=self.timestamp, + ) diff --git a/wa/utils/serializer.py b/wa/utils/serializer.py index 56d8f988..14c3eccf 100644 --- a/wa/utils/serializer.py +++ b/wa/utils/serializer.py @@ -232,6 +232,7 @@ def read_pod(source, fmt=None): message = 'source must be a path or an open file handle; got {}' raise ValueError(message.format(type(source))) + def write_pod(pod, dest, fmt=None): if isinstance(dest, basestring): with open(dest, 'w') as wfh: diff --git a/wa/utils/types.py b/wa/utils/types.py index fed3004f..359ff662 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -78,9 +78,10 @@ def list_of_bools(value, interpret_strings=True): """ Value must be iterable. All elements will be converted to ``bool``\ s. - .. note:: By default, ``boolean()`` conversion function will be used, which means that - strings like ``"0"`` or ``"false"`` will be interpreted as ``False``. If this - is undesirable, set ``interpret_strings`` to ``False``. + .. note:: By default, ``boolean()`` conversion function will be used, which + means that strings like ``"0"`` or ``"false"`` will be + interpreted as ``False``. If this is undesirable, set + ``interpret_strings`` to ``False``. """ if not isiterable(value): @@ -133,8 +134,8 @@ def list_or_string(value): def list_or_caseless_string(value): """ - Converts the value into a list of ``caseless_string``'s. If the value is not iterable - a one-element list with stringified value will be returned. + Converts the value into a list of ``caseless_string``'s. If the value is + not iterable a one-element list with stringified value will be returned. """ if isinstance(value, basestring): @@ -148,9 +149,10 @@ def list_or_caseless_string(value): def list_or(type_): """ - Generator for "list or" types. These take either a single value or a list values - and return a list of the specfied ``type_`` performing the conversion on the value - (if a single value is specified) or each of the elemented of the specified list. + Generator for "list or" types. These take either a single value or a list + values and return a list of the specfied ``type_`` performing the + conversion on the value (if a single value is specified) or each of the + elemented of the specified list. """ list_type = list_of(type_) @@ -176,8 +178,8 @@ none_type = type(None) def regex(value): """ - Regular expression. If value is a string, it will be complied with no flags. If you - want to specify flags, value must be precompiled. + Regular expression. If value is a string, it will be complied with no + flags. If you want to specify flags, value must be precompiled. """ if isinstance(value, regex_type): @@ -480,13 +482,13 @@ class obj_dict(MutableMapping): class level(object): """ - A level has a name and behaves like a string when printed, - however it also has a numeric value which is used in comparisons. + A level has a name and behaves like a string when printed, however it also + has a numeric value which is used in ordering comparisons. """ def __init__(self, name, value): - self.name = name + self.name = caseless_string(name) self.value = value def __str__(self): @@ -555,7 +557,7 @@ def enum(args, start=0): levels = [] for i, v in enumerate(args, start): - name = string.upper(identifier(v)) + name = caseless_string(identifier(v)) lv = level(v, i) setattr(Enum, name, lv) levels.append(lv) diff --git a/wa/workloads/dhrystone/__init__.py b/wa/workloads/dhrystone/__init__.py index 9cc2d9ba..2c3805d4 100644 --- a/wa/workloads/dhrystone/__init__.py +++ b/wa/workloads/dhrystone/__init__.py @@ -90,14 +90,18 @@ class Dhrystone(Workload): outfile = os.path.join(context.output_directory, 'dhrystone.output') with open(outfile, 'w') as wfh: wfh.write(self.output) + context.add_artifact('dhrystone-output', outfile, 'raw', "dhrystone's stdout") + score_count = 0 dmips_count = 0 total_score = 0 total_dmips = 0 + for line in self.output.split('\n'): match = self.time_regex.search(line) if match: - context.add_metric('time', float(match.group('time')), 'seconds', lower_is_better=True) + context.add_metric('time', float(match.group('time')), 'seconds', + lower_is_better=True) else: match = self.bm_regex.search(line) if match: @@ -114,6 +118,7 @@ class Dhrystone(Workload): context.add_metric(metric, value) dmips_count += 1 total_dmips += value + context.add_metric('total DMIPS', total_dmips) context.add_metric('total score', total_score) From 4287e90153f3f768657ed7b45b2190b6649d821c Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Fri, 17 Mar 2017 09:36:35 +0000 Subject: [PATCH 2/9] dhrystone tidy --- wa/workloads/dhrystone/__init__.py | 49 ++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/wa/workloads/dhrystone/__init__.py b/wa/workloads/dhrystone/__init__.py index 2c3805d4..e452d082 100644 --- a/wa/workloads/dhrystone/__init__.py +++ b/wa/workloads/dhrystone/__init__.py @@ -47,19 +47,31 @@ class Dhrystone(Workload): parameters = [ Parameter('duration', kind=int, default=0, - description='The duration, in seconds, for which dhrystone will be executed. ' - 'Either this or ``mloops`` should be specified but not both.'), + description=''' + The duration, in seconds, for which dhrystone will be + executed. Either this or ``mloops`` should be specified but + not both. + '''), Parameter('mloops', kind=int, default=0, - description='Millions of loops to run. Either this or ``duration`` should be ' - 'specified, but not both. If neither is specified, this will default ' - 'to ``{}``'.format(default_mloops)), + description=''' + Millions of loops to run. Either this or ``duration`` should + be specified, but not both. If neither is specified, this + will default ' to ``{}`` + '''.format(default_mloops)), Parameter('threads', kind=int, default=4, - description='The number of separate dhrystone "threads" that will be forked.'), + description=''' + The number of separate dhrystone "threads" that will be forked. + '''), Parameter('delay', kind=int, default=0, - description=('The delay, in seconds, between kicking off of dhrystone ' - 'threads (if ``threads`` > 1).')), + description=(''' + The delay, in seconds, between kicking off of dhrystone + threads (if ``threads`` > 1). + ''')), Parameter('taskset_mask', kind=int, default=0, - description='The processes spawned by sysbench will be pinned to cores as specified by this parameter'), + description=''' + The processes spawned by sysbench will be pinned to cores as + specified by this parameter. + '''), ] def initialize(self, context): @@ -67,7 +79,10 @@ class Dhrystone(Workload): Dhrystone.target_exe = self.target.install(host_exe) def setup(self, context): - execution_mode = '-l {}'.format(self.mloops) if self.mloops else '-r {}'.format(self.duration) + if self.mloops: + execution_mode = '-l {}'.format(self.mloops) + else: + execution_mode = '-r {}'.format(self.duration) if self.taskset_mask: taskset_string = 'busybox taskset 0x{:x} '.format(self.taskset_mask) else: @@ -76,12 +91,18 @@ class Dhrystone(Workload): self.target_exe, execution_mode, self.threads, self.delay) - self.timeout = self.duration and self.duration + self.delay * self.threads + 10 or 300 + if self.duration: + self.timeout = self.duration + self.delay * self.threads + 10 + else: + self.timeout = 300 + self.target.killall('dhrystone') def run(self, context): try: - self.output = self.target.execute(self.command, timeout=self.timeout, check_exit_code=False) + self.output = self.target.execute(self.command, + timeout=self.timeout, + check_exit_code=False) except KeyboardInterrupt: self.target.killall('dhrystone') raise @@ -127,7 +148,9 @@ class Dhrystone(Workload): def validate(self): if self.mloops and self.duration: # pylint: disable=E0203 - raise ConfigError('mloops and duration cannot be both specified at the same time for dhrystone.') + msg = 'mloops and duration cannot be both specified at the '\ + 'same time for dhrystone.' + raise ConfigError(msg) if not self.mloops and not self.duration: # pylint: disable=E0203 self.mloops = self.default_mloops From c5cd2b9298d7841ac95178c6ddc8402177b50e1f Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Fri, 17 Mar 2017 15:57:05 +0000 Subject: [PATCH 3/9] Initial priority implementation - Fixed up some of the signal map for instrumentation - Changed how priorites are specified -- no longer method name prefixes but dedicated decorators, including an easy way of specifying a custom priority level (no longer need to manually connect signals) - Updated ExecutionTimeInstrument to work with the new system - Also removed some dead code --- wa/__init__.py | 3 +- wa/framework/execution.py | 132 +------------------------- wa/framework/instrumentation.py | 140 +++++++++++++++------------- wa/framework/job.py | 20 ++-- wa/framework/plugin.py | 49 ---------- wa/framework/signal.py | 67 +++++++------ wa/instrumentation/misc/__init__.py | 16 ++-- wa/utils/types.py | 8 +- 8 files changed, 144 insertions(+), 291 deletions(-) diff --git a/wa/__init__.py b/wa/__init__.py index 575c5963..ab6a7cca 100644 --- a/wa/__init__.py +++ b/wa/__init__.py @@ -7,6 +7,7 @@ from wa.framework.exception import (ResultProcessorError, ResourceError, from wa.framework.exception import (WAError, NotFoundError, ValidationError, WorkloadError) from wa.framework.exception import WorkerThreadError, PluginLoaderError -from wa.framework.instrumentation import Instrument +from wa.framework.instrumentation import (Instrument, very_slow, slow, normal, fast, + very_fast) from wa.framework.plugin import Plugin, Parameter from wa.framework.workload import Workload diff --git a/wa/framework/execution.py b/wa/framework/execution.py index aafe4046..5697075c 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -189,132 +189,6 @@ class ExecutionContext(object): classifiers=None): self.run_output.add_artifact(name, path, kind, description, classifiers) -class OldExecutionContext(object): - """ - Provides a context for instrumentation. Keeps track of things like - current workload and iteration. - - This class also provides two status members that can be used by workloads - and instrumentation to keep track of arbitrary state. ``result`` - is reset on each new iteration of a workload; run_status is maintained - throughout a Workload Automation run. - - """ - - # These are the artifacts generated by the core framework. - default_run_artifacts = [ - Artifact('runlog', 'run.log', 'log', mandatory=True, - description='The log for the entire run.'), - ] - - @property - def current_iteration(self): - if self.current_job: - spec_id = self.current_job.spec.id - return self.job_iteration_counts[spec_id] - else: - return None - - @property - def job_status(self): - if not self.current_job: - return None - return self.current_job.result.status - - @property - def workload(self): - return getattr(self.spec, 'workload', None) - - @property - def spec(self): - return getattr(self.current_job, 'spec', None) - - @property - def result(self): - return getattr(self.current_job, 'result', self.run_result) - - def __init__(self, device_manager, config): - self.device_manager = device_manager - self.device = self.device_manager.target - self.config = config - self.reboot_policy = config.reboot_policy - self.output_directory = None - self.current_job = None - self.resolver = None - self.last_error = None - self.run_info = None - self.run_result = None - self.run_output_directory = self.config.output_directory - self.host_working_directory = self.config.meta_directory - self.iteration_artifacts = None - self.run_artifacts = copy(self.default_run_artifacts) - self.job_iteration_counts = defaultdict(int) - self.aborted = False - self.runner = None - - def initialize(self): - if not os.path.isdir(self.run_output_directory): - os.makedirs(self.run_output_directory) - self.output_directory = self.run_output_directory - self.resolver = ResourceResolver(self.config) - self.run_info = RunInfo(self.config) - self.run_result = RunResult(self.run_info, self.run_output_directory) - - def next_job(self, job): - """Invoked by the runner when starting a new iteration of workload execution.""" - self.current_job = job - self.job_iteration_counts[self.spec.id] += 1 - if not self.aborted: - outdir_name = '_'.join(map(str, [self.spec.label, self.spec.id, self.current_iteration])) - self.output_directory = _d(os.path.join(self.run_output_directory, outdir_name)) - self.iteration_artifacts = [wa for wa in self.workload.artifacts] - self.current_job.result.iteration = self.current_iteration - self.current_job.result.output_directory = self.output_directory - - def end_job(self): - if self.current_job.result.status == JobStatus.ABORTED: - self.aborted = True - self.current_job = None - self.output_directory = self.run_output_directory - - def add_metric(self, *args, **kwargs): - self.result.add_metric(*args, **kwargs) - - def add_artifact(self, name, path, kind, *args, **kwargs): - if self.current_job is None: - self.add_run_artifact(name, path, kind, *args, **kwargs) - else: - self.add_iteration_artifact(name, path, kind, *args, **kwargs) - - def add_run_artifact(self, name, path, kind, *args, **kwargs): - path = _check_artifact_path(path, self.run_output_directory) - self.run_artifacts.append(Artifact(name, path, kind, Artifact.ITERATION, *args, **kwargs)) - - def add_iteration_artifact(self, name, path, kind, *args, **kwargs): - path = _check_artifact_path(path, self.output_directory) - self.iteration_artifacts.append(Artifact(name, path, kind, Artifact.RUN, *args, **kwargs)) - - def get_artifact(self, name): - if self.iteration_artifacts: - for art in self.iteration_artifacts: - if art.name == name: - return art - for art in self.run_artifacts: - if art.name == name: - return art - return None - - -def _check_artifact_path(path, rootpath): - if path.startswith(rootpath): - return os.path.abspath(path) - rootpath = os.path.abspath(rootpath) - full_path = os.path.join(rootpath, path) - if not os.path.isfile(full_path): - msg = 'Cannot add artifact because {} does not exist.' - raise ValueError(msg.format(full_path)) - return full_path - class Executor(object): """ @@ -380,7 +254,10 @@ class Executor(object): self.logger.info('Starting run') runner = Runner(context) + signal.send(signal.RUN_STARTED, self) runner.run() + #TODO: postamble goes here. + signal.send(signal.RUN_COMPLETED, self) def execute_postamble(self): """ @@ -430,7 +307,6 @@ class Runner(object): self.config = self.context.cm def run(self): - self.send(signal.RUN_STARTED) try: self.initialize_run() self.send(signal.RUN_INITIALIZED) @@ -446,7 +322,7 @@ class Runner(object): raise e finally: self.finalize_run() - self.send(signal.RUN_COMPLETED) + self.send(signal.RUN_FINALIZED) def initialize_run(self): self.logger.info('Initializing run') diff --git a/wa/framework/instrumentation.py b/wa/framework/instrumentation.py index 69386f13..f96c0cec 100644 --- a/wa/framework/instrumentation.py +++ b/wa/framework/instrumentation.py @@ -105,8 +105,9 @@ from collections import OrderedDict from wa.framework import signal from wa.framework.plugin import Plugin from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError +from wa.utils.log import log_error from wa.utils.misc import get_traceback, isiterable -from wa.utils.types import identifier +from wa.utils.types import identifier, enum, level logger = logging.getLogger('instrumentation') @@ -120,14 +121,14 @@ logger = logging.getLogger('instrumentation') SIGNAL_MAP = OrderedDict([ # Below are "aliases" for some of the more common signals to allow # instrumentation to have similar structure to workloads - ('initialize', signal.SUCCESSFUL_RUN_INIT), - # ('setup', signal.SUCCESSFUL_WORKLOAD_SETUP), - # ('start', signal.BEFORE_WORKLOAD_EXECUTION), - # ('stop', signal.AFTER_WORKLOAD_EXECUTION), - # ('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE), - # ('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE), - # ('teardown', signal.AFTER_WORKLOAD_TEARDOWN), - # ('finalize', signal.RUN_FIN), + ('initialize', signal.RUN_INITIALIZED), + ('setup', signal.BEFORE_WORKLOAD_SETUP), + ('start', signal.BEFORE_WORKLOAD_EXECUTION), + ('stop', signal.AFTER_WORKLOAD_EXECUTION), + ('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE), + ('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE), + ('teardown', signal.AFTER_WORKLOAD_TEARDOWN), + ('finalize', signal.RUN_FINALIZED), # ('on_run_start', signal.RUN_START), # ('on_run_end', signal.RUN_END), @@ -171,13 +172,37 @@ SIGNAL_MAP = OrderedDict([ # ('on_warning', signal.WARNING_LOGGED), ]) -PRIORITY_MAP = OrderedDict([ - ('very_fast_', 20), - ('fast_', 10), - ('normal_', 0), - ('slow_', -10), - ('very_slow_', -20), -]) + +Priority = enum(['very_slow', 'slow', 'normal', 'fast', 'very_fast'], -20, 10) + + +def get_priority(func): + return getattr(getattr(func, 'im_func', func), + 'priority', Priority.normal) + + +def priority(priority): + def decorate(func): + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + wrapper.func_name = func.func_name + if priority in Priority.values: + wrapper.priority = Priority(priority) + else: + if not isinstance(priority, int): + msg = 'Invalid priorty "{}"; must be an int or one of {}' + raise ValueError(msg.format(priority, Priority.values)) + wrapper.priority = level('custom', priority) + return wrapper + return decorate + + +very_slow = priority(Priority.very_slow) +slow = priority(Priority.slow) +normal = priority(Priority.normal) +fast = priority(Priority.fast) +very_fast = priority(Priority.very_fast) + installed = [] @@ -244,18 +269,7 @@ class ManagedCallback(object): logger.error('Error in insturment {}'.format(self.instrument.name)) global failures_detected # pylint: disable=W0603 failures_detected = True - if isinstance(e, WAError): - logger.error(e) - else: - tb = get_traceback() - logger.error(tb) - logger.error('{}({})'.format(e.__class__.__name__, e)) - if not context.current_iteration: - # Error occureed outside of an iteration (most likely - # during intial setup or teardown). Since this would affect - # the rest of the run, mark the instument as broken so that - # it doesn't get re-enabled for subsequent iterations. - self.instrument.is_broken = True + log_error(e, logger) disable(self.instrument) @@ -274,34 +288,38 @@ def install(instrument): """ logger.debug('Installing instrument %s.', instrument) - if is_installed(instrument): - raise ValueError('Instrument {} is already installed.'.format(instrument.name)) - for attr_name in dir(instrument): - priority = 0 - stripped_attr_name = attr_name - for key, value in PRIORITY_MAP.iteritems(): - if attr_name.startswith(key): - stripped_attr_name = attr_name[len(key):] - priority = value - break - if stripped_attr_name in SIGNAL_MAP: - attr = getattr(instrument, attr_name) - if not callable(attr): - raise ValueError('Attribute {} not callable in {}.'.format(attr_name, instrument)) - argspec = inspect.getargspec(attr) - arg_num = len(argspec.args) - # Instrument callbacks will be passed exactly two arguments: self - # (the instrument instance to which the callback is bound) and - # context. However, we also allow callbacks to capture the context - # in variable arguments (declared as "*args" in the definition). - if arg_num > 2 or (arg_num < 2 and argspec.varargs is None): - message = '{} must take exactly 2 positional arguments; {} given.' - raise ValueError(message.format(attr_name, arg_num)) - logger.debug('\tConnecting %s to %s', attr.__name__, SIGNAL_MAP[stripped_attr_name]) - mc = ManagedCallback(instrument, attr) - _callbacks.append(mc) - signal.connect(mc, SIGNAL_MAP[stripped_attr_name], priority=priority) + if is_installed(instrument): + msg = 'Instrument {} is already installed.' + raise ValueError(msg.format(instrument.name)) + + for attr_name in dir(instrument): + if attr_name not in SIGNAL_MAP: + continue + + attr = getattr(instrument, attr_name) + + if not callable(attr): + msg = 'Attribute {} not callable in {}.' + raise ValueError(msg.format(attr_name, instrument)) + argspec = inspect.getargspec(attr) + arg_num = len(argspec.args) + # Instrument callbacks will be passed exactly two arguments: self + # (the instrument instance to which the callback is bound) and + # context. However, we also allow callbacks to capture the context + # in variable arguments (declared as "*args" in the definition). + if arg_num > 2 or (arg_num < 2 and argspec.varargs is None): + message = '{} must take exactly 2 positional arguments; {} given.' + raise ValueError(message.format(attr_name, arg_num)) + + priority = get_priority(attr) + logger.debug('\tConnecting %s to %s with priority %s(%d)', attr.__name__, + SIGNAL_MAP[attr_name], priority.name, priority.value) + + mc = ManagedCallback(instrument, attr) + _callbacks.append(mc) + signal.connect(mc, SIGNAL_MAP[attr_name], priority=priority.value) + installed.append(instrument) @@ -385,15 +403,3 @@ class Instrument(Plugin): self.target = target self.is_enabled = True self.is_broken = False - - def initialize(self, context): - pass - - def finalize(self, context): - pass - - def __str__(self): - return self.name - - def __repr__(self): - return 'Instrument({})'.format(self.name) diff --git a/wa/framework/job.py b/wa/framework/job.py index af9df1da..2a0e1b20 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -1,6 +1,6 @@ import logging -from wa.framework import pluginloader +from wa.framework import pluginloader, signal from wa.framework.configuration.core import JobStatus @@ -38,7 +38,8 @@ class Job(object): def initialize(self, context): self.logger.info('Initializing job {}'.format(self.id)) - self.workload.initialize(context) + with signal.wrap('WORKLOAD_INITIALIZED', self, context): + self.workload.initialize(context) self.status = JobStatus.PENDING context.update_job_state(self) @@ -47,21 +48,26 @@ class Job(object): def setup(self, context): self.logger.info('Setting up job {}'.format(self.id)) - self.workload.setup(context) + with signal.wrap('WORKLOAD_SETUP', self, context): + self.workload.setup(context) def run(self, context): self.logger.info('Running job {}'.format(self.id)) - self.workload.run(context) + with signal.wrap('WORKLOAD_EXECUTION', self, context): + self.workload.run(context) def process_output(self, context): self.logger.info('Processing output for job {}'.format(self.id)) - self.workload.update_result(context) + with signal.wrap('WORKLOAD_RESULT_UPDATE', self, context): + self.workload.update_result(context) def teardown(self, context): self.logger.info('Tearing down job {}'.format(self.id)) - self.workload.teardown(context) + with signal.wrap('WORKLOAD_TEARDOWN', self, context): + self.workload.teardown(context) def finalize(self, context): self.logger.info('Finalizing job {}'.format(self.id)) - self.workload.finalize(context) + with signal.wrap('WORKLOAD_FINALIZED', self, context): + self.workload.finalize(context) diff --git a/wa/framework/plugin.py b/wa/framework/plugin.py index f6ef4e2e..7c8e5dba 100644 --- a/wa/framework/plugin.py +++ b/wa/framework/plugin.py @@ -279,7 +279,6 @@ class PluginMeta(type): mcs._propagate_attributes(bases, attrs, clsname) cls = type.__new__(mcs, clsname, bases, attrs) mcs._setup_aliases(cls) - mcs._implement_virtual(cls, bases) return cls @classmethod @@ -324,48 +323,6 @@ class PluginMeta(type): alias.plugin_name = cls.name cls.aliases.add(alias) - @classmethod - def _implement_virtual(mcs, cls, bases): - """ - This implements automatic method propagation to the bases, so - that you don't have to do something like - - super(cls, self).vmname() - - This also ensures that the methods that have beend identified as - "globally virtual" are executed exactly once per WA execution, even if - invoked through instances of different subclasses - - """ - methods = {} - called_globals = set() - for vmname in mcs.virtual_methods: - clsmethod = getattr(cls, vmname, None) - if clsmethod: - basemethods = [getattr(b, vmname) for b in bases - if hasattr(b, vmname)] - methods[vmname] = [bm for bm in basemethods if bm != clsmethod] - methods[vmname].append(clsmethod) - - def generate_method_wrapper(vname): # pylint: disable=unused-argument - # this creates a closure with the method name so that it - # does not need to be passed to the wrapper as an argument, - # leaving the wrapper to accept exactly the same set of - # arguments as the method it is wrapping. - name__ = vmname # pylint: disable=cell-var-from-loop - - def wrapper(self, *args, **kwargs): - for dm in methods[name__]: - if name__ in mcs.global_virtuals: - if dm not in called_globals: - dm(self, *args, **kwargs) - called_globals.add(dm) - else: - dm(self, *args, **kwargs) - return wrapper - - setattr(cls, vmname, generate_method_wrapper(vmname)) - class Plugin(object): """ @@ -444,12 +401,6 @@ class Plugin(object): for param in self.parameters: param.validate(self) - def initialize(self, context): - pass - - def finalize(self, context): - pass - def check_artifacts(self, context, level): """ Make sure that all mandatory artifacts have been generated. diff --git a/wa/framework/signal.py b/wa/framework/signal.py index 7dbfd73d..09fe7b4e 100644 --- a/wa/framework/signal.py +++ b/wa/framework/signal.py @@ -22,6 +22,7 @@ that has prioritization added to handler invocation. import logging from contextlib import contextmanager +import wrapt from louie import dispatcher from wa.utils.types import prioritylist @@ -67,21 +68,26 @@ class Signal(object): return id(self.name) +# Signals associated with run-related events RUN_STARTED = Signal('run-started', 'sent at the beginning of the run') RUN_INITIALIZED = Signal('run-initialized', 'set after the run has been initialized') RUN_ABORTED = Signal('run-aborted', 'set when the run has been aborted due to a keyboard interrupt') RUN_FAILED = Signal('run-failed', 'set if the run has failed to complete all jobs.' ) -RUN_COMPLETED = Signal('run-completed', 'set upon completion of the run (regardless of whether or not it has failed') RUN_FINALIZED = Signal('run-finalized', 'set after the run has been finalized') +RUN_COMPLETED = Signal('run-completed', 'set upon completion of the run (regardless of whether or not it has failed') + +# Signals associated with job-related events JOB_STARTED = Signal('job-started', 'set when a a new job has been started') JOB_ABORTED = Signal('job-aborted', description=''' sent if a job has been aborted due to a keyboard interrupt. - .. note:: While the status of every job that has not had a chance to run - due to being interrupted will be set to "ABORTED", this signal will - only be sent for the job that was actually running at the time. + .. note:: While the status of every job that has not had a + chance to run due to being interrupted will be + set to "ABORTED", this signal will only be sent + for the job that was actually running at the + time. ''') JOB_FAILED = Signal('job-failed', description='set if the job has failed') @@ -89,6 +95,34 @@ JOB_RESTARTED = Signal('job-restarted') JOB_COMPLETED = Signal('job-completed') JOB_FINALIZED = Signal('job-finalized') + +# Signals associated with particular stages of workload execution +BEFORE_WORKLOAD_INITIALIZED = Signal('before-workload-initialized', invert_priority=True) +SUCCESSFUL_WORKLOAD_INITIALIZED = Signal('successful-workload-initialized') +AFTER_WORKLOAD_INITIALIZED = Signal('after-workload-initialized') + +BEFORE_WORKLOAD_SETUP = Signal('before-workload-setup', invert_priority=True) +SUCCESSFUL_WORKLOAD_SETUP = Signal('successful-workload-setup') +AFTER_WORKLOAD_SETUP = Signal('after-workload-setup') + +BEFORE_WORKLOAD_EXECUTION = Signal('before-workload-execution', invert_priority=True) +SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution') +AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution') + +BEFORE_WORKLOAD_RESULT_UPDATE = Signal('before-workload-result-update', invert_priority=True) +SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal('successful-workload-result-update') +AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update') + +BEFORE_WORKLOAD_TEARDOWN = Signal('before-workload-teardown', invert_priority=True) +SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown') +AFTER_WORKLOAD_TEARDOWN = Signal('after-workload-teardown') + +BEFORE_WORKLOAD_FINALIZED = Signal('before-workload-finalized', invert_priority=True) +SUCCESSFUL_WORKLOAD_FINALIZED = Signal('successful-workload-finalized') +AFTER_WORKLOAD_FINALIZED = Signal('after-workload-finalized') + + +# Signals indicating exceptional conditions ERROR_LOGGED = Signal('error-logged') WARNING_LOGGED = Signal('warning-logged') @@ -138,26 +172,6 @@ BEFORE_TARGET_DISCONNECT = Signal('before-target-disconnect', invert_priority=Tr SUCCESSFUL_TARGET_DISCONNECT = Signal('successful-target-disconnect') AFTER_TARGET_DISCONNECT = Signal('after-target-disconnect') -BEFORE_WORKLOAD_SETUP = Signal( - 'before-workload-setup', invert_priority=True) -SUCCESSFUL_WORKLOAD_SETUP = Signal('successful-workload-setup') -AFTER_WORKLOAD_SETUP = Signal('after-workload-setup') - -BEFORE_WORKLOAD_EXECUTION = Signal( - 'before-workload-execution', invert_priority=True) -SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution') -AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution') - -BEFORE_WORKLOAD_RESULT_UPDATE = Signal( - 'before-workload-result-update', invert_priority=True) -SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal( - 'successful-workload-result-update') -AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update') - -BEFORE_WORKLOAD_TEARDOWN = Signal( - 'before-workload-teardown', invert_priority=True) -SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown') -AFTER_WORKLOAD_TEARDOWN = Signal('after-workload-teardown') BEFORE_OVERALL_RESULTS_PROCESSING = Signal( 'before-overall-results-process', invert_priority=True) @@ -289,7 +303,7 @@ def safe_send(signal, sender=dispatcher.Anonymous, """ try: logger.debug('Safe-sending {} from {}'.format(signal, sender)) - send(singnal, sender, *args, **kwargs) + send(signal, sender, *args, **kwargs) except Exception as e: if any(isinstance(e, p) for p in propagate): raise e @@ -297,9 +311,10 @@ def safe_send(signal, sender=dispatcher.Anonymous, @contextmanager -def wrap(signal_name, sender=dispatcher.Anonymous, safe=False, *args, **kwargs): +def wrap(signal_name, sender=dispatcher.Anonymous,*args, **kwargs): """Wraps the suite in before/after signals, ensuring that after signal is always sent.""" + safe = kwargs.pop('safe', False) signal_name = signal_name.upper().replace('-', '_') send_func = safe_send if safe else send try: diff --git a/wa/instrumentation/misc/__init__.py b/wa/instrumentation/misc/__init__.py index 634f4014..ea6fda39 100644 --- a/wa/instrumentation/misc/__init__.py +++ b/wa/instrumentation/misc/__init__.py @@ -37,7 +37,7 @@ from devlib.exception import TargetError from devlib.utils.android import ApkInfo -from wa import Instrument, Parameter +from wa import Instrument, Parameter, very_fast from wa.framework import signal from wa.framework.exception import ConfigError from wa.utils.misc import diff_tokens, write_table, check_output, as_relative @@ -206,26 +206,22 @@ class ExecutionTimeInstrument(Instrument): """ - priority = 15 - def __init__(self, target, **kwargs): super(ExecutionTimeInstrument, self).__init__(target, **kwargs) self.start_time = None self.end_time = None - def on_run_start(self, context): - signal.connect(self.get_start_time, signal.BEFORE_WORKLOAD_EXECUTION, priority=self.priority) - signal.connect(self.get_stop_time, signal.AFTER_WORKLOAD_EXECUTION, priority=self.priority) - - def get_start_time(self, context): + @very_fast + def start(self, context): self.start_time = time.time() - def get_stop_time(self, context): + @very_fast + def stop(self, context): self.end_time = time.time() def update_result(self, context): execution_time = self.end_time - self.start_time - context.result.add_metric('execution_time', execution_time, 'seconds') + context.add_metric('execution_time', execution_time, 'seconds') class ApkVersion(Instrument): diff --git a/wa/utils/types.py b/wa/utils/types.py index 359ff662..d925c14d 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -520,7 +520,7 @@ class level(object): return self.value != other -def enum(args, start=0): +def enum(args, start=0, step=1): """ Creates a class with attributes named by the first argument. Each attribute is a ``level`` so they behave is integers in comparisons. @@ -556,11 +556,13 @@ def enum(args, start=0): raise ValueError('Invalid enum value: {}'.format(repr(name))) levels = [] - for i, v in enumerate(args, start): + n = start + for v in args: name = caseless_string(identifier(v)) - lv = level(v, i) + lv = level(v, n) setattr(Enum, name, lv) levels.append(lv) + n += step setattr(Enum, 'values', levels) From add6dafa2d4b6b39594ae27d8e6371c77d71a373 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Fri, 17 Mar 2017 16:28:21 +0000 Subject: [PATCH 4/9] job: upped logging level of loading message to info --- wa/framework/configuration/execution.py | 3 +++ wa/framework/job.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index cde9b260..66df05ca 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -10,6 +10,7 @@ from wa.framework.configuration.plugin_cache import PluginCache from wa.framework.exception import NotFoundError from wa.framework.job import Job from wa.framework.run import JobState +from wa.utils import log from wa.utils.types import enum @@ -103,11 +104,13 @@ class ConfigManager(object): def generate_jobs(self, context): job_specs = self.jobs_config.generate_job_specs(context.tm) exec_order = self.run_config.execution_order + log.indent() for spec, i in permute_iterations(job_specs, exec_order): job = Job(spec, i, context) job.load(context.tm.target) self._jobs.append(job) context.run_state.add_job(job) + log.dedent() self._jobs_generated = True diff --git a/wa/framework/job.py b/wa/framework/job.py index 2a0e1b20..3a3ee2d4 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -29,7 +29,7 @@ class Job(object): self.retries = 0 def load(self, target, loader=pluginloader): - self.logger.debug('Loading job {}'.format(self.id)) + self.logger.info('Loading job {}'.format(self.id)) self.workload = loader.get_workload(self.spec.workload_name, target, **self.spec.workload_parameters) From 326ab827ed3fc75893b5827525a30d38c967d54f Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Fri, 17 Mar 2017 17:05:40 +0000 Subject: [PATCH 5/9] Implement Executor's postamble --- wa/framework/configuration/core.py | 4 ++-- wa/framework/execution.py | 31 +++++++++++++----------------- wa/framework/run.py | 15 +++++++++++++-- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 4dc94169..11595e83 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -33,10 +33,10 @@ KIND_MAP = { } RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', - 'COMPLETED', 'OK', 'FAILED', 'PARTIAL', 'ABORTED']) + 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) JobStatus = enum(['NEW', 'PENDING', 'RUNNING', - 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) + 'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) ########################## diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 5697075c..77d66a63 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -212,7 +212,6 @@ class Executor(object): pluginloader = None self.device_manager = None self.device = None - self.context = None def execute(self, config_manager, output): """ @@ -256,35 +255,31 @@ class Executor(object): runner = Runner(context) signal.send(signal.RUN_STARTED, self) runner.run() - #TODO: postamble goes here. + self.execute_postamble(context, output) signal.send(signal.RUN_COMPLETED, self) - def execute_postamble(self): - """ - This happens after the run has completed. The overall results of the run are - summarised to the user. - - """ - result = self.context.run_result - counter = Counter() - for ir in result.iteration_results: - counter[ir.status] += 1 + def execute_postamble(self, context, output): self.logger.info('Done.') - self.logger.info('Run duration: {}'.format(format_duration(self.context.run_info.duration))) - status_summary = 'Ran a total of {} iterations: '.format(sum(self.context.job_iteration_counts.values())) + duration = format_duration(output.info.duration) + self.logger.info('Run duration: {}'.format(duration)) + num_ran = context.run_state.num_completed_jobs + status_summary = 'Ran a total of {} iterations: '.format(num_ran) + + counter = context.run_state.get_status_counts() parts = [] - for status in JobStatus.values: + for status in reversed(JobStatus.values): if status in counter: parts.append('{} {}'.format(counter[status], status)) self.logger.info(status_summary + ', '.join(parts)) - self.logger.info('Results can be found in {}'.format(self.config.output_directory)) + + self.logger.info('Results can be found in {}'.format(output.basepath)) if self.error_logged: self.logger.warn('There were errors during execution.') - self.logger.warn('Please see {}'.format(self.config.log_file)) + self.logger.warn('Please see {}'.format(output.logfile)) elif self.warning_logged: self.logger.warn('There were warnings during execution.') - self.logger.warn('Please see {}'.format(self.config.log_file)) + self.logger.warn('Please see {}'.format(output.logfile)) def _error_signalled_callback(self): self.error_logged = True diff --git a/wa/framework/run.py b/wa/framework/run.py index b5ca5979..50c35be5 100644 --- a/wa/framework/run.py +++ b/wa/framework/run.py @@ -14,11 +14,11 @@ # import uuid import logging +from collections import OrderedDict, Counter from copy import copy from datetime import datetime, timedelta -from collections import OrderedDict -from wa.framework.configuration.core import RunStatus +from wa.framework.configuration.core import RunStatus, JobStatus class RunInfo(object): @@ -73,6 +73,11 @@ class RunState(object): instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss) return instance + @property + def num_completed_jobs(self): + return sum(1 for js in self.jobs.itervalues() + if js.status > JobStatus.SKIPPED) + def __init__(self): self.jobs = OrderedDict() self.status = RunStatus.NEW @@ -87,6 +92,12 @@ class RunState(object): state.status = job.status state.timestamp = datetime.now() + def get_status_counts(self): + counter = Counter() + for job_state in self.jobs.itervalues(): + counter[job_state.status] += 1 + return counter + def to_pod(self): return OrderedDict( status=str(self.status), From 24ade78c3650859507967665fe2efb151e20f9f5 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Fri, 17 Mar 2017 17:10:30 +0000 Subject: [PATCH 6/9] The old Runner can die now; RIP --- wa/framework/execution.py | 482 -------------------------------------- 1 file changed, 482 deletions(-) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 77d66a63..711c753c 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -15,27 +15,6 @@ # pylint: disable=no-member -""" -This module contains the execution logic for Workload Automation. It defines the -following actors: - - WorkloadSpec: Identifies the workload to be run and defines parameters under - which it should be executed. - - Executor: Responsible for the overall execution process. It instantiates - and/or intialises the other actors, does any necessary vaidation - and kicks off the whole process. - - Execution Context: Provides information about the current state of run - execution to instrumentation. - - RunInfo: Information about the current run. - - Runner: This executes workload specs that are passed to it. It goes through - stages of execution, emitting an appropriate signal at each step to - allow instrumentation to do its stuff. - -""" import logging import os import random @@ -65,16 +44,6 @@ from wa.utils.misc import (ensure_directory_exists as _d, merge_config_values, from wa.utils.serializer import json -# The maximum number of reboot attempts for an iteration. -MAX_REBOOT_ATTEMPTS = 3 - -# If something went wrong during device initialization, wait this -# long (in seconds) before retrying. This is necessary, as retrying -# immediately may not give the device enough time to recover to be able -# to reboot. -REBOOT_DELAY = 3 - - class ExecutionContext(object): @property @@ -415,454 +384,3 @@ class Runner(object): def __str__(self): return 'runner' - -class RunnerJob(object): - """ - Represents a single execution of a ``RunnerJobDescription``. There will be one created for each iteration - specified by ``RunnerJobDescription.number_of_iterations``. - - """ - - def __init__(self, spec, retry=0): - self.spec = spec - self.retry = retry - self.iteration = None - self.result = JobStatus(self.spec) - - -class OldRunner(object): - """ - This class is responsible for actually performing a workload automation - run. The main responsibility of this class is to emit appropriate signals - at the various stages of the run to allow things like traces an other - instrumentation to hook into the process. - - This is an abstract base class that defines each step of the run, but not - the order in which those steps are executed, which is left to the concrete - derived classes. - - """ - class _RunnerError(Exception): - """Internal runner error.""" - pass - - @property - def config(self): - return self.context.config - - @property - def current_job(self): - if self.job_queue: - return self.job_queue[0] - return None - - @property - def previous_job(self): - if self.completed_jobs: - return self.completed_jobs[-1] - return None - - @property - def next_job(self): - if self.job_queue: - if len(self.job_queue) > 1: - return self.job_queue[1] - return None - - @property - def spec_changed(self): - if self.previous_job is None and self.current_job is not None: # Start of run - return True - if self.previous_job is not None and self.current_job is None: # End of run - return True - return self.current_job.spec.id != self.previous_job.spec.id - - @property - def spec_will_change(self): - if self.current_job is None and self.next_job is not None: # Start of run - return True - if self.current_job is not None and self.next_job is None: # End of run - return True - return self.current_job.spec.id != self.next_job.spec.id - - def __init__(self, device_manager, context, result_manager): - self.device_manager = device_manager - self.device = device_manager.target - self.context = context - self.result_manager = result_manager - self.logger = logging.getLogger('Runner') - self.job_queue = [] - self.completed_jobs = [] - self._initial_reset = True - - def init_queue(self, specs): - raise NotImplementedError() - - def run(self): # pylint: disable=too-many-branches - self._send(signal.RUN_START) - with signal.wrap('RUN_INIT'): - self._initialize_run() - - try: - while self.job_queue: - try: - self._init_job() - self._run_job() - except KeyboardInterrupt: - self.current_job.result.status = JobStatus.ABORTED - raise - except Exception, e: # pylint: disable=broad-except - self.current_job.result.status = JobStatus.FAILED - self.current_job.result.add_event(e.message) - if isinstance(e, DeviceNotRespondingError): - self.logger.info('Device appears to be unresponsive.') - if self.context.reboot_policy.can_reboot and self.device.can('reset_power'): - self.logger.info('Attempting to hard-reset the device...') - try: - self.device.boot(hard=True) - self.device.connect() - except DeviceError: # hard_boot not implemented for the device. - raise e - else: - raise e - else: # not a DeviceNotRespondingError - self.logger.error(e) - finally: - self._finalize_job() - except KeyboardInterrupt: - self.logger.info('Got CTRL-C. Finalizing run... (CTRL-C again to abort).') - # Skip through the remaining jobs. - while self.job_queue: - self.context.next_job(self.current_job) - self.current_job.result.status = JobStatus.ABORTED - self._finalize_job() - except DeviceNotRespondingError: - self.logger.info('Device unresponsive and recovery not possible. Skipping the rest of the run.') - self.context.aborted = True - while self.job_queue: - self.context.next_job(self.current_job) - self.current_job.result.status = JobStatus.SKIPPED - self._finalize_job() - - instrumentation.enable_all() - self._finalize_run() - self._process_results() - - self.result_manager.finalize(self.context) - self._send(signal.RUN_END) - - def _initialize_run(self): - self.context.runner = self - self.context.run_info.start_time = datetime.utcnow() - self._connect_to_device() - self.logger.info('Initializing device') - self.device_manager.initialize(self.context) - - self.logger.info('Initializing workloads') - for workload_spec in self.context.config.workload_specs: - workload_spec.workload.initialize(self.context) - - self.context.run_info.device_properties = self.device_manager.info - self.result_manager.initialize(self.context) - - if instrumentation.check_failures(): - raise InstrumentError('Detected failure(s) during instrumentation initialization.') - - def _connect_to_device(self): - if self.context.reboot_policy.perform_initial_boot: - try: - self.device_manager.connect() - except DeviceError: # device may be offline - if self.device.can('reset_power'): - with self._signal_wrap('INITIAL_BOOT'): - self.device.boot(hard=True) - else: - raise DeviceError('Cannot connect to device for initial reboot; ' - 'and device does not support hard reset.') - else: # successfully connected - self.logger.info('\tBooting device') - with self._signal_wrap('INITIAL_BOOT'): - self._reboot_device() - else: - self.logger.info('Connecting to device') - self.device_manager.connect() - - def _init_job(self): - self.current_job.result.status = JobStatus.RUNNING - self.context.next_job(self.current_job) - - def _run_job(self): # pylint: disable=too-many-branches - spec = self.current_job.spec - if not spec.enabled: - self.logger.info('Skipping workload %s (iteration %s)', spec, self.context.current_iteration) - self.current_job.result.status = JobStatus.SKIPPED - return - - self.logger.info('Running workload %s (iteration %s)', spec, self.context.current_iteration) - if spec.flash: - if not self.context.reboot_policy.can_reboot: - raise ConfigError('Cannot flash as reboot_policy does not permit rebooting.') - if not self.device.can('flash'): - raise DeviceError('Device does not support flashing.') - self._flash_device(spec.flash) - elif not self.completed_jobs: - # Never reboot on the very fist job of a run, as we would have done - # the initial reboot if a reboot was needed. - pass - elif self.context.reboot_policy.reboot_on_each_spec and self.spec_changed: - self.logger.debug('Rebooting on spec change.') - self._reboot_device() - elif self.context.reboot_policy.reboot_on_each_iteration: - self.logger.debug('Rebooting on iteration.') - self._reboot_device() - - instrumentation.disable_all() - instrumentation.enable(spec.instrumentation) - self.device_manager.start() - - if self.spec_changed: - self._send(signal.WORKLOAD_SPEC_START) - self._send(signal.ITERATION_START) - - try: - setup_ok = False - with self._handle_errors('Setting up device parameters'): - self.device_manager.set_runtime_parameters(spec.runtime_parameters) - setup_ok = True - - if setup_ok: - with self._handle_errors('running {}'.format(spec.workload.name)): - self.current_job.result.status = JobStatus.RUNNING - self._run_workload_iteration(spec.workload) - else: - self.logger.info('\tSkipping the rest of the iterations for this spec.') - spec.enabled = False - except KeyboardInterrupt: - self._send(signal.ITERATION_END) - self._send(signal.WORKLOAD_SPEC_END) - raise - else: - self._send(signal.ITERATION_END) - if self.spec_will_change or not spec.enabled: - self._send(signal.WORKLOAD_SPEC_END) - finally: - self.device_manager.stop() - - def _finalize_job(self): - self.context.run_result.iteration_results.append(self.current_job.result) - job = self.job_queue.pop(0) - job.iteration = self.context.current_iteration - if job.result.status in self.config.retry_on_status: - if job.retry >= self.config.max_retries: - self.logger.error('Exceeded maxium number of retries. Abandoning job.') - else: - self.logger.info('Job status was {}. Retrying...'.format(job.result.status)) - retry_job = RunnerJob(job.spec, job.retry + 1) - self.job_queue.insert(0, retry_job) - self.completed_jobs.append(job) - self.context.end_job() - - def _finalize_run(self): - self.logger.info('Finalizing workloads') - for workload_spec in self.context.config.workload_specs: - workload_spec.workload.finalize(self.context) - - self.logger.info('Finalizing.') - self._send(signal.RUN_FIN) - - with self._handle_errors('Disconnecting from the device'): - self.device.disconnect() - - info = self.context.run_info - info.end_time = datetime.utcnow() - info.duration = info.end_time - info.start_time - - def _process_results(self): - self.logger.info('Processing overall results') - with self._signal_wrap('OVERALL_RESULTS_PROCESSING'): - if instrumentation.check_failures(): - self.context.run_result.non_iteration_errors = True - self.result_manager.process_run_result(self.context.run_result, self.context) - - def _run_workload_iteration(self, workload): - self.logger.info('\tSetting up') - with self._signal_wrap('WORKLOAD_SETUP'): - try: - workload.setup(self.context) - except: - self.logger.info('\tSkipping the rest of the iterations for this spec.') - self.current_job.spec.enabled = False - raise - try: - - self.logger.info('\tExecuting') - with self._handle_errors('Running workload'): - with self._signal_wrap('WORKLOAD_EXECUTION'): - workload.run(self.context) - - self.logger.info('\tProcessing result') - self._send(signal.BEFORE_WORKLOAD_RESULT_UPDATE) - try: - if self.current_job.result.status != JobStatus.FAILED: - with self._handle_errors('Processing workload result', - on_error_status=JobStatus.PARTIAL): - workload.update_result(self.context) - self._send(signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE) - - if self.current_job.result.status == JobStatus.RUNNING: - self.current_job.result.status = JobStatus.OK - finally: - self._send(signal.AFTER_WORKLOAD_RESULT_UPDATE) - - finally: - self.logger.info('\tTearing down') - with self._handle_errors('Tearing down workload', - on_error_status=JobStatus.NONCRITICAL): - with self._signal_wrap('WORKLOAD_TEARDOWN'): - workload.teardown(self.context) - self.result_manager.add_result(self.current_job.result, self.context) - - def _flash_device(self, flashing_params): - with self._signal_wrap('FLASHING'): - self.device.flash(**flashing_params) - self.device.connect() - - def _reboot_device(self): - with self._signal_wrap('BOOT'): - for reboot_attempts in xrange(MAX_REBOOT_ATTEMPTS): - if reboot_attempts: - self.logger.info('\tRetrying...') - with self._handle_errors('Rebooting device'): - self.device.boot(**self.current_job.spec.boot_parameters) - break - else: - raise DeviceError('Could not reboot device; max reboot attempts exceeded.') - self.device.connect() - - def _send(self, s): - signal.send(s, self, self.context) - - def _take_screenshot(self, filename): - if self.context.output_directory: - filepath = os.path.join(self.context.output_directory, filename) - else: - filepath = os.path.join(settings.output_directory, filename) - self.device.capture_screen(filepath) - - @contextmanager - def _handle_errors(self, action, on_error_status=JobStatus.FAILED): - try: - if action is not None: - self.logger.debug(action) - yield - except (KeyboardInterrupt, DeviceNotRespondingError): - raise - except (WAError, TimeoutError), we: - self.device.check_responsive() - if self.current_job: - self.current_job.result.status = on_error_status - self.current_job.result.add_event(str(we)) - try: - self._take_screenshot('error.png') - except Exception, e: # pylint: disable=W0703 - # We're already in error state, so the fact that taking a - # screenshot failed is not surprising... - pass - if action: - action = action[0].lower() + action[1:] - self.logger.error('Error while {}:\n\t{}'.format(action, we)) - except Exception, e: # pylint: disable=W0703 - error_text = '{}("{}")'.format(e.__class__.__name__, e) - if self.current_job: - self.current_job.result.status = on_error_status - self.current_job.result.add_event(error_text) - self.logger.error('Error while {}'.format(action)) - self.logger.error(error_text) - if isinstance(e, subprocess.CalledProcessError): - self.logger.error('Got:') - self.logger.error(e.output) - tb = get_traceback() - self.logger.error(tb) - - @contextmanager - def _signal_wrap(self, signal_name): - """Wraps the suite in before/after signals, ensuring - that after signal is always sent.""" - before_signal = getattr(signal, 'BEFORE_' + signal_name) - success_signal = getattr(signal, 'SUCCESSFUL_' + signal_name) - after_signal = getattr(signal, 'AFTER_' + signal_name) - try: - self._send(before_signal) - yield - self._send(success_signal) - finally: - self._send(after_signal) - - -class BySpecRunner(Runner): - """ - This is that "classic" implementation that executes all iterations of a workload - spec before proceeding onto the next spec. - - """ - - def init_queue(self, specs): - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable - self.job_queue = [j for spec_jobs in jobs for j in spec_jobs] - - -class BySectionRunner(Runner): - """ - Runs the first iteration for all benchmarks first, before proceeding to the next iteration, - i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2... - - If multiple sections where specified in the agenda, this will run all specs for the first section - followed by all specs for the seciod section, etc. - - e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run - - X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2 - - """ - - def init_queue(self, specs): - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] - self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j] - - -class ByIterationRunner(Runner): - """ - Runs the first iteration for all benchmarks first, before proceeding to the next iteration, - i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2... - - If multiple sections where specified in the agenda, this will run all sections for the first global - spec first, followed by all sections for the second spec, etc. - - e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run - - X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2 - - """ - - def init_queue(self, specs): - sections = OrderedDict() - for s in specs: - if s.section_id not in sections: - sections[s.section_id] = [] - sections[s.section_id].append(s) - specs = [s for section_specs in izip_longest(*sections.values()) for s in section_specs if s] - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] - self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j] - - -class RandomRunner(Runner): - """ - This will run specs in a random order. - - """ - - def init_queue(self, specs): - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable - all_jobs = [j for spec_jobs in jobs for j in spec_jobs] - random.shuffle(all_jobs) - self.job_queue = all_jobs From 482a936639ed1732594ee9dd9bf96dec59704478 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Fri, 17 Mar 2017 17:29:30 +0000 Subject: [PATCH 7/9] Implement job skipping on abort --- wa/framework/execution.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 711c753c..d4e1672d 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -141,6 +141,14 @@ class ExecutionContext(object): self.run_state.update_job(job) self.run_output.write_state() + def skip_remaining_jobs(self): + while self.job_queue: + job = self.job_queue.pop(0) + job.status = JobStatus.SKIPPED + self.run_state.update_job(job) + self.completed_jobs.append(job) + self.write_state() + def write_state(self): self.run_output.write_state() @@ -276,8 +284,11 @@ class Runner(object): self.send(signal.RUN_INITIALIZED) while self.context.job_queue: - with signal.wrap('JOB_EXECUTION', self): - self.run_next_job(self.context) + try: + with signal.wrap('JOB_EXECUTION', self): + self.run_next_job(self.context) + except KeyboardInterrupt: + self.context.skip_remaining_jobs() except Exception as e: if (not getattr(e, 'logged', None) and not isinstance(e, KeyboardInterrupt)): @@ -347,6 +358,7 @@ class Runner(object): raise except KeyboardInterrupt: job.status = JobStatus.ABORTED + self.logger.info('Got CTRL-C. Aborting.') raise except Exception as e: job.status = JobStatus.FAILED From 1c4eef54d67d72b50df6029c299a9c4da45d84a5 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Mon, 20 Mar 2017 14:40:13 +0000 Subject: [PATCH 8/9] Tidying run_config config points --- wa/framework/configuration/core.py | 211 +++++++++++++++++------------ 1 file changed, 123 insertions(+), 88 deletions(-) diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 11595e83..8e2ed919 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -636,109 +636,144 @@ class RunConfiguration(Configuration): # Metadata is separated out because it is not loaded into the auto # generated config file meta_data = [ - ConfigurationPoint('run_name', kind=str, - description=''' - A string that labels the WA run that is being performed. This would typically - be set in the ``config`` section of an agenda (see - :ref:`configuration in an agenda `) rather than in the config file. - - .. _old-style format strings: http://docs.python.org/2/library/stdtypes.html#string-formatting-operations - .. _log record attributes: http://docs.python.org/2/library/logging.html#logrecord-attributes - '''), - ConfigurationPoint('project', kind=str, - description=''' - A string naming the project for which data is being collected. This may be - useful, e.g. when uploading data to a shared database that is populated from - multiple projects. - '''), - ConfigurationPoint('project_stage', kind=dict, - description=''' - A dict or a string that allows adding additional identifier. This is may be - useful for long-running projects. - '''), + ConfigurationPoint( + 'run_name', + kind=str, + description=''' + A string that labels the WA run that is being performed. This would + typically be set in the ``config`` section of an agenda (see + :ref:`configuration in an agenda `) rather + than in the config file. + ''', + ), + ConfigurationPoint( + 'project', + kind=str, + description=''' + A string naming the project for which data is being collected. This + may be useful, e.g. when uploading data to a shared database that + is populated from multiple projects. + ''', + ), + ConfigurationPoint( + 'project_stage', + kind=dict, + description=''' + A dict or a string that allows adding additional identifier. This + is may be useful for long-running projects. + ''', + ), ] config_points = [ - ConfigurationPoint('execution_order', kind=str, default='by_iteration', - allowed_values=['by_iteration', 'by_spec', 'by_section', 'random'], - description=''' - Defines the order in which the agenda spec will be executed. At the moment, - the following execution orders are supported: + ConfigurationPoint( + 'execution_order', + kind=str, + default='by_iteration', + allowed_values=['by_iteration', 'by_spec', 'by_section', 'random'], + description=''' + Defines the order in which the agenda spec will be executed. At the + moment, the following execution orders are supported: - ``"by_iteration"`` - The first iteration of each workload spec is executed one after the other, - so all workloads are executed before proceeding on to the second iteration. - E.g. A1 B1 C1 A2 C2 A3. This is the default if no order is explicitly specified. + ``"by_iteration"`` + The first iteration of each workload spec is executed one after + the other, so all workloads are executed before proceeding on + to the second iteration. E.g. A1 B1 C1 A2 C2 A3. This is the + default if no order is explicitly specified. - In case of multiple sections, this will spread them out, such that specs - from the same section are further part. E.g. given sections X and Y, global - specs A and B, and two iterations, this will run :: + In case of multiple sections, this will spread them out, such + that specs from the same section are further part. E.g. given + sections X and Y, global specs A and B, and two iterations, + this will run :: - X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2 + X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2 - ``"by_section"`` - Same as ``"by_iteration"``, however this will group specs from the same - section together, so given sections X and Y, global specs A and B, and two iterations, - this will run :: + ``"by_section"`` + Same as ``"by_iteration"``, however this will group specs from + the same section together, so given sections X and Y, global + specs A and B, and two iterations, this will run :: - X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2 + X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2 - ``"by_spec"`` - All iterations of the first spec are executed before moving on to the next - spec. E.g. A1 A2 A3 B1 C1 C2 This may also be specified as ``"classic"``, - as this was the way workloads were executed in earlier versions of WA. + ``"by_spec"`` + All iterations of the first spec are executed before moving on + to the next spec. E.g. A1 A2 A3 B1 C1 C2 This may also be + specified as ``"classic"``, as this was the way workloads were + executed in earlier versions of WA. - ``"random"`` - Execution order is entirely random. - '''), - ConfigurationPoint('reboot_policy', kind=RebootPolicy, default='as_needed', - allowed_values=RebootPolicy.valid_policies, - description=''' - This defines when during execution of a run the Device will be rebooted. The - possible values are: + ``"random"`` + Execution order is entirely random. + ''', + ), + ConfigurationPoint( + 'reboot_policy', + kind=RebootPolicy, + default='as_needed', + allowed_values=RebootPolicy.valid_policies, + description=''' + This defines when during execution of a run the Device will be + rebooted. The possible values are: - ``"never"`` - The device will never be rebooted. - ``"initial"`` - The device will be rebooted when the execution first starts, just before - executing the first workload spec. - ``"each_spec"`` - The device will be rebooted before running a new workload spec. - Note: this acts the same as each_iteration when execution order is set to by_iteration - ``"each_iteration"`` - The device will be rebooted before each new iteration. - '''), - ConfigurationPoint('device', kind=str, mandatory=True, - description=''' - This setting defines what specific Device subclass will be used to interact - the connected device. Obviously, this must match your setup. - '''), - ConfigurationPoint('retry_on_status', kind=list_of(JobStatus), - default=['FAILED', 'PARTIAL'], - allowed_values=JobStatus.values, - description=''' - This is list of statuses on which a job will be cosidered to have failed and - will be automatically retried up to ``max_retries`` times. This defaults to - ``["FAILED", "PARTIAL"]`` if not set. Possible values are: + ``"never"`` + The device will never be rebooted. - ``"OK"`` - This iteration has completed and no errors have been detected + ``"initial"`` + The device will be rebooted when the execution first starts, + just before executing the first workload spec. - ``"PARTIAL"`` - One or more instruments have failed (the iteration may still be running). + ``"each_spec"`` + The device will be rebooted before running a new workload spec. - ``"FAILED"`` - The workload itself has failed. + .. note:: this acts the same as each_iteration when execution order + is set to by_iteration - ``"ABORTED"`` - The user interupted the workload - '''), - ConfigurationPoint('max_retries', kind=int, default=2, - description=''' - The maximum number of times failed jobs will be retried before giving up. If - not set. + ``"each_iteration"`` + The device will be rebooted before each new iteration. + '''), + ConfigurationPoint( + 'device', + kind=str, + mandatory=True, + description=''' + This setting defines what specific Device subclass will be used to + interact the connected device. Obviously, this must match your + setup. + ''', + ), + ConfigurationPoint( + 'retry_on_status', + kind=list_of(JobStatus), + default=['FAILED', 'PARTIAL'], + allowed_values=JobStatus.values, + description=''' + This is list of statuses on which a job will be cosidered to have + failed and will be automatically retried up to ``max_retries`` + times. This defaults to ``["FAILED", "PARTIAL"]`` if not set. + Possible values are:: - .. note:: this number does not include the original attempt - '''), + ``"OK"`` + This iteration has completed and no errors have been detected + + ``"PARTIAL"`` + One or more instruments have failed (the iteration may still be running). + + ``"FAILED"`` + The workload itself has failed. + + ``"ABORTED"`` + The user interupted the workload + ''', + ), + ConfigurationPoint( + 'max_retries', + kind=int, + default=2, + description=''' + The maximum number of times failed jobs will be retried before + giving up. If not set. + + .. note:: this number does not include the original attempt + ''', + ), ] configuration = {cp.name: cp for cp in config_points + meta_data} From 9ba126d46d01507e797dcd0ae848e2056dce0ab6 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Mon, 20 Mar 2017 14:44:34 +0000 Subject: [PATCH 9/9] dhrystone: fixed taskset_mask description. --- wa/workloads/dhrystone/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wa/workloads/dhrystone/__init__.py b/wa/workloads/dhrystone/__init__.py index e452d082..60b66d78 100644 --- a/wa/workloads/dhrystone/__init__.py +++ b/wa/workloads/dhrystone/__init__.py @@ -69,7 +69,7 @@ class Dhrystone(Workload): ''')), Parameter('taskset_mask', kind=int, default=0, description=''' - The processes spawned by sysbench will be pinned to cores as + The processes spawned by dhrystone will be pinned to cores as specified by this parameter. '''), ]