From 24402660c4dae1a16b86285ff8d4383f6ce7d9f9 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Thu, 16 Mar 2017 17:54:48 +0000 Subject: [PATCH] Initial implementation of state tracking and output handling. --- wa/commands/run.py | 4 +- wa/framework/configuration/core.py | 6 +- wa/framework/configuration/execution.py | 2 + wa/framework/execution.py | 80 +++-- wa/framework/job.py | 16 +- wa/framework/output.py | 314 ++++++++++++++---- wa/framework/run.py | 418 ++++++------------------ wa/utils/serializer.py | 1 + wa/utils/types.py | 30 +- wa/workloads/dhrystone/__init__.py | 7 +- 10 files changed, 443 insertions(+), 435 deletions(-) diff --git a/wa/commands/run.py b/wa/commands/run.py index dc351e68..8ab6dd01 100644 --- a/wa/commands/run.py +++ b/wa/commands/run.py @@ -24,7 +24,7 @@ from wa.framework import pluginloader from wa.framework.configuration import RunConfiguration from wa.framework.configuration.parsers import AgendaParser, ConfigParser from wa.framework.execution import Executor -from wa.framework.output import init_wa_output +from wa.framework.output import init_run_output from wa.framework.version import get_wa_version from wa.framework.exception import NotFoundError, ConfigError from wa.utils import log @@ -123,7 +123,7 @@ class RunCommand(Command): output_directory = settings.default_output_directory self.logger.debug('Using output directory: {}'.format(output_directory)) try: - return init_wa_output(output_directory, config, args.force) + return init_run_output(output_directory, config, args.force) except RuntimeError as e: if 'path exists' in str(e): msg = 'Output directory "{}" exists.\nPlease specify another '\ diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 189e87c9..4dc94169 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -24,6 +24,7 @@ from wa.utils.types import (identifier, integer, boolean, list_of_strings, list_of, toggle_set, obj_dict, enum) from wa.utils.serializer import is_pod + # Mapping for kind conversion; see docs for convert_types below KIND_MAP = { int: integer, @@ -31,7 +32,10 @@ KIND_MAP = { dict: OrderedDict, } -JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', +RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', + 'COMPLETED', 'OK', 'FAILED', 'PARTIAL', 'ABORTED']) + +JobStatus = enum(['NEW', 'PENDING', 'RUNNING', 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 4cfddd53..cde9b260 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -9,6 +9,7 @@ from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache from wa.framework.exception import NotFoundError from wa.framework.job import Job +from wa.framework.run import JobState from wa.utils.types import enum @@ -106,6 +107,7 @@ class ConfigManager(object): job = Job(spec, i, context) job.load(context.tm.target) self._jobs.append(job) + context.run_state.add_job(job) self._jobs_generated = True diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 45b78a53..aafe4046 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -49,17 +49,18 @@ from itertools import izip_longest import wa.framework.signal as signal from wa.framework import instrumentation, pluginloader -from wa.framework.configuration.core import settings -from wa.framework.configuration.execution import JobStatus +from wa.framework.configuration.core import settings, RunStatus, JobStatus from wa.framework.exception import (WAError, ConfigError, TimeoutError, InstrumentError, TargetError, TargetNotRespondingError) +from wa.framework.output import init_job_output from wa.framework.plugin import Artifact from wa.framework.resource import ResourceResolver +from wa.framework.run import RunState from wa.framework.target.info import TargetInfo from wa.framework.target.manager import TargetManager from wa.utils import log -from wa.utils.misc import (ensure_directory_exists as _d, +from wa.utils.misc import (ensure_directory_exists as _d, merge_config_values, get_traceback, format_duration) from wa.utils.serializer import json @@ -105,16 +106,26 @@ class ExecutionContext(object): return self.current_job.spec.id != self.next_job.spec.id @property - def output_directory(self): + def job_output(self): if self.current_job: - return os.path.join(self.output.basepath, self.current_job.output_name) + return self.current_job.output + + @property + def output(self): + if self.current_job: + return self.job_output + return self.run_output + + @property + def output_directory(self): return self.output.basepath def __init__(self, cm, tm, output): self.logger = logging.getLogger('context') self.cm = cm self.tm = tm - self.output = output + self.run_output = output + self.run_state = output.state self.logger.debug('Loading resource discoverers') self.resolver = ResourceResolver(cm) self.resolver.load() @@ -127,31 +138,56 @@ class ExecutionContext(object): self.output.write_info() self.job_queue = copy(self.cm.jobs) self.completed_jobs = [] + self.run_state.status = RunStatus.STARTED + self.output.write_state() def end_run(self): self.output.info.end_time = datetime.now() self.output.info.duration = self.output.info.end_time -\ self.output.info.start_time self.output.write_info() + self.output.write_state() + self.output.write_result() def start_job(self): if not self.job_queue: raise RuntimeError('No jobs to run') self.current_job = self.job_queue.pop(0) - os.makedirs(self.output_directory) + self.current_job.output = init_job_output(self.run_output, self.current_job) + self.update_job_state(self.current_job) return self.current_job def end_job(self): if not self.current_job: raise RuntimeError('No jobs in progress') self.completed_jobs.append(self.current_job) + self.update_job_state(self.current_job) + self.output.write_result() self.current_job = None def move_failed(self, job): - attempt = job.retries + 1 - failed_name = '{}-attempt{:02}'.format(job.output_name, attempt) - self.output.move_failed(job.output_name, failed_name) + self.run_output.move_failed(job.output) + def update_job_state(self, job): + self.run_state.update_job(job) + self.run_output.write_state() + + def write_state(self): + self.run_output.write_state() + + def add_metric(self, name, value, units=None, lower_is_better=False, + classifiers=None): + if self.current_job: + classifiers = merge_config_values(self.current_job.classifiers, + classifiers) + self.output.add_metric(name, value, units, lower_is_better, classifiers) + + def add_artifact(self, name, path, kind, description=None, classifiers=None): + self.output.add_artifact(name, path, kind, description, classifiers) + + def add_run_artifact(self, name, path, kind, description=None, + classifiers=None): + self.run_output.add_artifact(name, path, kind, description, classifiers) class OldExecutionContext(object): """ @@ -335,6 +371,7 @@ class Executor(object): self.logger.info('Generating jobs') config_manager.generate_jobs(context) output.write_job_specs(config_manager.job_specs) + output.write_state() self.logger.info('Installing instrumentation') for instrument in config_manager.get_instruments(target_manager.target): @@ -345,8 +382,6 @@ class Executor(object): runner = Runner(context) runner.run() - - def execute_postamble(self): """ This happens after the run has completed. The overall results of the run are @@ -374,22 +409,6 @@ class Executor(object): self.logger.warn('There were warnings during execution.') self.logger.warn('Please see {}'.format(self.config.log_file)) - def _get_runner(self, result_manager): - if not self.config.execution_order or self.config.execution_order == 'by_iteration': - if self.config.reboot_policy == 'each_spec': - self.logger.info('each_spec reboot policy with the default by_iteration execution order is ' - 'equivalent to each_iteration policy.') - runnercls = ByIterationRunner - elif self.config.execution_order in ['classic', 'by_spec']: - runnercls = BySpecRunner - elif self.config.execution_order == 'by_section': - runnercls = BySectionRunner - elif self.config.execution_order == 'random': - runnercls = RandomRunner - else: - raise ConfigError('Unexpected execution order: {}'.format(self.config.execution_order)) - return runnercls(self.device_manager, self.context, result_manager) - def _error_signalled_callback(self): self.error_logged = True signal.disconnect(self._error_signalled_callback, signal.ERROR_LOGGED) @@ -436,6 +455,7 @@ class Runner(object): for job in self.context.job_queue: job.initialize(self.context) log.dedent() + self.context.write_state() def finalize_run(self): self.logger.info('Finalizing run') @@ -448,6 +468,7 @@ class Runner(object): try: log.indent() self.do_run_job(job, context) + job.status = JobStatus.OK except KeyboardInterrupt: job.status = JobStatus.ABORTED raise @@ -503,12 +524,13 @@ class Runner(object): rc = self.context.cm.run_config if job.status in rc.retry_on_status: if job.retries < rc.max_retries: - msg = 'Job {} iteration {} complted with status {}. retrying...' + msg = 'Job {} iteration {} completed with status {}. retrying...' self.logger.error(msg.format(job.id, job.status, job.iteration)) self.context.move_failed(job) job.retries += 1 job.status = JobStatus.PENDING self.context.job_queue.insert(0, job) + self.context.write_state() else: msg = 'Job {} iteration {} completed with status {}. '\ 'Max retries exceeded.' diff --git a/wa/framework/job.py b/wa/framework/job.py index 598d6072..af9df1da 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -11,8 +11,12 @@ class Job(object): return self.spec.id @property - def output_name(self): - return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) + def label(self): + return self.spec.label + + @property + def classifiers(self): + return self.spec.classifiers def __init__(self, spec, iteration, context): self.logger = logging.getLogger('job') @@ -31,27 +35,33 @@ class Job(object): **self.spec.workload_parameters) self.workload.init_resources(self.context) self.workload.validate() - self.status = JobStatus.LOADED def initialize(self, context): self.logger.info('Initializing job {}'.format(self.id)) + self.workload.initialize(context) self.status = JobStatus.PENDING + context.update_job_state(self) def configure_target(self, context): self.logger.info('Configuring target for job {}'.format(self.id)) def setup(self, context): self.logger.info('Setting up job {}'.format(self.id)) + self.workload.setup(context) def run(self, context): self.logger.info('Running job {}'.format(self.id)) + self.workload.run(context) def process_output(self, context): self.logger.info('Processing output for job {}'.format(self.id)) + self.workload.update_result(context) def teardown(self, context): self.logger.info('Tearing down job {}'.format(self.id)) + self.workload.teardown(context) def finalize(self, context): self.logger.info('Finalizing job {}'.format(self.id)) + self.workload.finalize(context) diff --git a/wa/framework/output.py b/wa/framework/output.py index 16853539..6111ecad 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -7,71 +7,52 @@ import uuid from copy import copy from datetime import timedelta -from wa.framework.configuration.core import JobSpec +from wa.framework.configuration.core import JobSpec, RunStatus from wa.framework.configuration.manager import ConfigManager +from wa.framework.run import RunState, RunInfo from wa.framework.target.info import TargetInfo from wa.utils.misc import touch, ensure_directory_exists from wa.utils.serializer import write_pod, read_pod +from wa.utils.types import enum, numeric logger = logging.getLogger('output') -class RunInfo(object): - """ - Information about the current run, such as its unique ID, run - time, etc. +class Output(object): - """ - @staticmethod - def from_pod(pod): - uid = pod.pop('uuid') - duration = pod.pop('duration') - if uid is not None: - uid = uuid.UUID(uid) - instance = RunInfo(**pod) - instance.uuid = uid - instance.duration = duration if duration is None else\ - timedelta(seconds=duration) - return instance + @property + def resultfile(self): + return os.path.join(self.basepath, 'result.json') - def __init__(self, run_name=None, project=None, project_stage=None, - start_time=None, end_time=None, duration=None): - self.uuid = uuid.uuid4() - self.run_name = None - self.project = None - self.project_stage = None - self.start_time = None - self.end_time = None - self.duration = None + def __init__(self, path): + self.basepath = path + self.result = None - def to_pod(self): - d = copy(self.__dict__) - d['uuid'] = str(self.uuid) - if self.duration is None: - d['duration'] = self.duration - else: - d['duration'] = self.duration.total_seconds() - return d + def reload(self): + pod = read_pod(self.resultfile) + self.result = Result.from_pod(pod) + + def write_result(self): + write_pod(self.result.to_pod(), self.resultfile) + + def add_metric(self, name, value, units=None, lower_is_better=False, + classifiers=None): + self.result.add_metric(name, value, units, lower_is_better, classifiers) + + def add_artifact(self, name, path, kind, description=None, classifiers=None): + if not os.path.exists(path): + msg = 'Attempting to add non-existing artifact: {}' + raise HostError(msg.format(path)) + path = os.path.relpath(path, self.basepath) + + if isinstance(kind, basestring): + kind = ArtifactType(kind) + + self.result.add_artifact(name, path, kind, description, classifiers) -class RunState(object): - """ - Represents the state of a WA run. - - """ - @staticmethod - def from_pod(pod): - return RunState() - - def __init__(self): - pass - - def to_pod(self): - return {} - - -class RunOutput(object): +class RunOutput(Output): @property def logfile(self): @@ -111,9 +92,11 @@ class RunOutput(object): return ensure_directory_exists(path) def __init__(self, path): - self.basepath = path + super(RunOutput, self).__init__(path) self.info = None self.state = None + self.result = None + self.jobs = None if (not os.path.isfile(self.statefile) or not os.path.isfile(self.infofile)): msg = '"{}" does not exist or is not a valid WA output directory.' @@ -121,8 +104,10 @@ class RunOutput(object): self.reload() def reload(self): + super(RunOutput, self).reload() self.info = RunInfo.from_pod(read_pod(self.infofile)) self.state = RunState.from_pod(read_pod(self.statefile)) + # TODO: propulate the jobs from info in the state def write_info(self): write_pod(self.info.to_pod(), self.infofile) @@ -157,17 +142,215 @@ class RunOutput(object): pod = read_pod(self.jobsfile) return [JobSpec.from_pod(jp) for jp in pod['jobs']] - def move_failed(self, name, failed_name): - path = os.path.join(self.basepath, name) + def move_failed(self, job_output): + name = os.path.basename(job_output.basepath) + attempt = job_output.retry + 1 + failed_name = '{}-attempt{:02}'.format(name, attempt) failed_path = os.path.join(self.failed_dir, failed_name) - if not os.path.exists(path): - raise ValueError('Path {} does not exist'.format(path)) if os.path.exists(failed_path): raise ValueError('Path {} already exists'.format(failed_path)) - shutil.move(path, failed_path) + shutil.move(job_output.basepath, failed_path) + job_output.basepath = failed_path -def init_wa_output(path, wa_state, force=False): +class JobOutput(Output): + + def __init__(self, path, id, label, iteration, retry): + self.basepath = path + self.id = id + self.label = label + self.iteration = iteration + self.retry = retry + self.result = None + self.reload() + + +class Result(object): + + @staticmethod + def from_pod(pod): + instance = Result() + instance.metrics = [Metric.from_pod(m) for m in pod['metrics']] + instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']] + return instance + + def __init__(self): + self.metrics = [] + self.artifacts = [] + + def add_metric(self, name, value, units=None, lower_is_better=False, + classifiers=None): + metric = Metric(name, value, units, lower_is_better, classifiers) + logger.debug('Adding metric: {}'.format(metric)) + self.metrics.append(metric) + + def add_artifact(self, name, path, kind, description=None, classifiers=None): + artifact = Artifact(name, path, kind, description=description, + classifiers=classifiers) + logger.debug('Adding artifact: {}'.format(artifact)) + self.artifacts.append(artifact) + + def to_pod(self): + return dict( + metrics=[m.to_pod() for m in self.metrics], + artifacts=[a.to_pod() for a in self.artifacts], + ) + + +ArtifactType = enum(['log', 'meta', 'data', 'export', 'raw']) + + +class Artifact(object): + """ + This is an artifact generated during execution/post-processing of a + workload. Unlike metrics, this represents an actual artifact, such as a + file, generated. This may be "result", such as trace, or it could be "meta + data" such as logs. These are distinguished using the ``kind`` attribute, + which also helps WA decide how it should be handled. Currently supported + kinds are: + + :log: A log file. Not part of "results" as such but contains + information about the run/workload execution that be useful for + diagnostics/meta analysis. + :meta: A file containing metadata. This is not part of "results", but + contains information that may be necessary to reproduce the + results (contrast with ``log`` artifacts which are *not* + necessary). + :data: This file contains new data, not available otherwise and should + be considered part of the "results" generated by WA. Most traces + would fall into this category. + :export: Exported version of results or some other artifact. This + signifies that this artifact does not contain any new data + that is not available elsewhere and that it may be safely + discarded without losing information. + :raw: Signifies that this is a raw dump/log that is normally processed + to extract useful information and is then discarded. In a sense, + it is the opposite of ``export``, but in general may also be + discarded. + + .. note:: whether a file is marked as ``log``/``data`` or ``raw`` + depends on how important it is to preserve this file, + e.g. when archiving, vs how much space it takes up. + Unlike ``export`` artifacts which are (almost) always + ignored by other exporters as that would never result + in data loss, ``raw`` files *may* be processed by + exporters if they decided that the risk of losing + potentially (though unlikely) useful data is greater + than the time/space cost of handling the artifact (e.g. + a database uploader may choose to ignore ``raw`` + artifacts, where as a network filer archiver may choose + to archive them). + + .. note: The kind parameter is intended to represent the logical + function of a particular artifact, not it's intended means of + processing -- this is left entirely up to the result + processors. + + """ + + @staticmethod + def from_pod(pod): + pod['kind'] = ArtifactType(pod['kind']) + return Artifact(**pod) + + def __init__(self, name, path, kind, description=None, classifiers=None): + """" + :param name: Name that uniquely identifies this artifact. + :param path: The *relative* path of the artifact. Depending on the + ``level`` must be either relative to the run or iteration + output directory. Note: this path *must* be delimited + using ``/`` irrespective of the + operating system. + :param kind: The type of the artifact this is (e.g. log file, result, + etc.) this will be used a hit to result processors. This + must be one of ``'log'``, ``'meta'``, ``'data'``, + ``'export'``, ``'raw'``. + :param description: A free-form description of what this artifact is. + :param classifiers: A set of key-value pairs to further classify this + metric beyond current iteration (e.g. this can be + used to identify sub-tests). + + """ + self.name = name + self.path = path.replace('/', os.sep) if path is not None else path + try: + self.kind = ArtifactType(kind) + except ValueError: + msg = 'Invalid Artifact kind: {}; must be in {}' + raise ValueError(msg.format(kind, self.valid_kinds)) + self.description = description + self.classifiers = classifiers or {} + + def to_pod(self): + pod = copy(self.__dict__) + pod['kind'] = str(self.kind) + return pod + + def __str__(self): + return self.path + + def __repr__(self): + return '{} ({}): {}'.format(self.name, self.kind, self.path) + + +class Metric(object): + """ + This is a single metric collected from executing a workload. + + :param name: the name of the metric. Uniquely identifies the metric + within the results. + :param value: The numerical value of the metric for this execution of a + workload. This can be either an int or a float. + :param units: Units for the collected value. Can be None if the value + has no units (e.g. it's a count or a standardised score). + :param lower_is_better: Boolean flag indicating where lower values are + better than higher ones. Defaults to False. + :param classifiers: A set of key-value pairs to further classify this + metric beyond current iteration (e.g. this can be used + to identify sub-tests). + + """ + + __slots__ = ['name', 'value', 'units', 'lower_is_better', 'classifiers'] + + @staticmethod + def from_pod(pod): + return Metric(**pod) + + def __init__(self, name, value, units=None, lower_is_better=False, + classifiers=None): + self.name = name + self.value = numeric(value) + self.units = units + self.lower_is_better = lower_is_better + self.classifiers = classifiers or {} + + def to_pod(self): + return dict( + name=self.name, + value=self.value, + units=self.units, + lower_is_better=self.lower_is_better, + classifiers=self.classifiers, + ) + + def __str__(self): + result = '{}: {}'.format(self.name, self.value) + if self.units: + result += ' ' + self.units + result += ' ({})'.format('-' if self.lower_is_better else '+') + return result + + def __repr__(self): + text = self.__str__() + if self.classifiers: + return '<{} {}>'.format(text, self.classifiers) + else: + return '<{}>'.format(text) + + + +def init_run_output(path, wa_state, force=False): if os.path.exists(path): if force: logger.info('Removing existing output directory.') @@ -188,13 +371,20 @@ def init_wa_output(path, wa_state, force=False): project_stage=wa_state.run_config.project_stage, ) write_pod(info.to_pod(), os.path.join(meta_dir, 'run_info.json')) - - with open(os.path.join(path, '.run_state.json'), 'w') as wfh: - wfh.write('{}') + write_pod(RunState().to_pod(), os.path.join(path, '.run_state.json')) + write_pod(Result().to_pod(), os.path.join(path, 'result.json')) return RunOutput(path) +def init_job_output(run_output, job): + output_name = '{}-{}-{}'.format(job.id, job.spec.label, job.iteration) + path = os.path.join(run_output.basepath, output_name) + ensure_directory_exists(path) + write_pod(Result().to_pod(), os.path.join(path, 'result.json')) + return JobOutput(path, job.id, job.iteration, job.label, job.retries) + + def _save_raw_config(meta_dir, state): raw_config_dir = os.path.join(meta_dir, 'raw_config') os.makedirs(raw_config_dir) @@ -206,5 +396,3 @@ def _save_raw_config(meta_dir, state): dest_path = os.path.join(raw_config_dir, 'cfg{}-{}'.format(i, basename)) shutil.copy(source, dest_path) - - diff --git a/wa/framework/run.py b/wa/framework/run.py index 72aacffd..b5ca5979 100644 --- a/wa/framework/run.py +++ b/wa/framework/run.py @@ -18,338 +18,112 @@ from copy import copy from datetime import datetime, timedelta from collections import OrderedDict -from wa.framework import signal, pluginloader, log -from wa.framework.plugin import Plugin -from wa.framework.output import Status -from wa.framework.resource import ResourceResolver -from wa.framework.exception import JobError -from wa.utils import counter -from wa.utils.serializer import json -from wa.utils.misc import ensure_directory_exists as _d -from wa.utils.types import caseless_string +from wa.framework.configuration.core import RunStatus +class RunInfo(object): + """ + Information about the current run, such as its unique ID, run + time, etc. -class JobActor(object): + """ + @staticmethod + def from_pod(pod): + uid = pod.pop('uuid') + duration = pod.pop('duration') + if uid is not None: + uid = uuid.UUID(uid) + instance = RunInfo(**pod) + instance.uuid = uid + instance.duration = duration if duration is None else\ + timedelta(seconds=duration) + return instance - def get_config(self): - return {} + def __init__(self, run_name=None, project=None, project_stage=None, + start_time=None, end_time=None, duration=None): + self.uuid = uuid.uuid4() + self.run_name = None + self.project = None + self.project_stage = None + self.start_time = None + self.end_time = None + self.duration = None - def initialize(self, context): - pass - - def run(self): - pass - - def finalize(self): - pass - - def restart(self): - pass - - def complete(self): - pass + def to_pod(self): + d = copy(self.__dict__) + d['uuid'] = str(self.uuid) + if self.duration is None: + d['duration'] = self.duration + else: + d['duration'] = self.duration.total_seconds() + return d -class RunnerJob(object): +class RunState(object): + """ + Represents the state of a WA run. + + """ + @staticmethod + def from_pod(pod): + instance = RunState() + instance.status = RunStatus(pod['status']) + instance.timestamp = pod['timestamp'] + jss = [JobState.from_pod(j) for j in pod['jobs']] + instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss) + return instance + + def __init__(self): + self.jobs = OrderedDict() + self.status = RunStatus.NEW + self.timestamp = datetime.now() + + def add_job(self, job): + job_state = JobState(job.id, job.label, job.iteration, job.status) + self.jobs[(job_state.id, job_state.iteration)] = job_state + + def update_job(self, job): + state = self.jobs[(job.id, job.iteration)] + state.status = job.status + state.timestamp = datetime.now() + + def to_pod(self): + return OrderedDict( + status=str(self.status), + timestamp=self.timestamp, + jobs=[j.to_pod() for j in self.jobs.itervalues()], + ) + + +class JobState(object): + + @staticmethod + def from_pod(pod): + instance = JobState(pod['id'], pod['label'], JobStatus(pod['status'])) + instance.retries = pod['retries'] + instance.iteration = pod['iteration'] + instance.timestamp = pod['timestamp'] + return instance @property - def status(self): - return self.output.status + def output_name(self): + return '{}-{}-{}'.format(self.id, self.label, self.iteration) - @status.setter - def status(self, value): - self.output.status = value - - @property - def should_retry(self): - return self.attempt <= self.max_retries - - def __init__(self, id, actor, output, max_retries): + def __init__(self, id, label, iteration, status): self.id = id - self.actor = actor - self.output = output - self.max_retries = max_retries - self.status = Status.NEW - self.attempt = 0 + self.label = label + self.iteration = iteration + self.status = status + self.retries = 0 + self.timestamp = datetime.now() - def initialize(self, context): - self.actor.initialize(context) - self.status = Status.PENDING - - def run(self): - self.status = Status.RUNNING - self.attempt += 1 - self.output.config = self.actor.get_config() - self.output.initialize() - self.actor.run() - self.status = Status.COMPLETE - - def finalize(self): - self.actor.finalize() - - def restart(self): - self.actor.restart() - - def complete(self): - self.actor.complete() - - -__run_methods = set() - - -def runmethod(method): - """ - A method decorator that ensures that a method is invoked only once per run. - - """ - def _method_wrapper(*args, **kwargs): - if method in __run_methods: - return - __run_methods.add(method) - ret = method(*args, **kwargs) - if ret is not None: - message = 'runmethod()\'s must return None; method "{}" returned "{}"' - raise RuntimeError(message.format(method, ret)) - return _method_wrapper - - -def reset_runmethods(): - global __run_methods - __run_methods = set() - - -class Runner(object): - - @property - def info(self): - return self.output.info - - @property - def status(self): - return self.output.status - - @status.setter - def status(self, value): - self.output.status = value - - @property - def jobs_pending(self): - return len(self.job_queue) > 0 - - @property - def current_job(self): - if self.job_queue: - return self.job_queue[0] - - @property - def previous_job(self): - if self.completed_jobs: - return self.completed_jobs[-1] - - @property - def next_job(self): - if len(self.job_queue) > 1: - return self.job_queue[1] - - def __init__(self, output): - self.logger = logging.getLogger('runner') - self.output = output - self.context = RunContext(self) - self.status = Status.NEW - self.job_queue = [] - self.completed_jobs = [] - self._known_ids = set([]) - - def add_job(self, job_id, actor, max_retries=2): - job_id = caseless_string(job_id) - if job_id in self._known_ids: - raise JobError('Job with id "{}" already exists'.format(job_id)) - output = self.output.create_job_output(job_id) - self.job_queue.append(RunnerJob(job_id, actor, output, max_retries)) - self._known_ids.add(job_id) - - def initialize(self): - self.logger.info('Initializing run') - self.start_time = datetime.now() - if not self.info.start_time: - self.info.start_time = self.start_time - self.info.duration = timedelta() - - self.context.initialize() - for job in self.job_queue: - job.initialize(self.context) - self.persist_state() - self.logger.info('Run initialized') - - def run(self): - self.status = Status.RUNNING - reset_runmethods() - signal.send(signal.RUN_STARTED, self, self.context) - self.initialize() - signal.send(signal.RUN_INITIALIZED, self, self.context) - self.run_jobs() - signal.send(signal.RUN_COMPLETED, self, self.context) - self.finalize() - signal.send(signal.RUN_FINALIZED, self, self.context) - - def run_jobs(self): - try: - self.logger.info('Running jobs') - while self.jobs_pending: - self.begin_job() - log.indent() - try: - self.current_job.run() - except KeyboardInterrupt: - self.current_job.status = Status.ABORTED - signal.send(signal.JOB_ABORTED, self, self.current_job) - raise - except Exception as e: - self.current_job.status = Status.FAILED - log.log_error(e, self.logger) - signal.send(signal.JOB_FAILED, self, self.current_job) - else: - self.current_job.status = Status.COMPLETE - finally: - log.dedent() - self.complete_job() - except KeyboardInterrupt: - self.status = Status.ABORTED - while self.job_queue: - job = self.job_queue.pop(0) - job.status = RunnerJob.ABORTED - self.completed_jobs.append(job) - signal.send(signal.RUN_ABORTED, self, self) - raise - except Exception as e: - self.status = Status.FAILED - log.log_error(e, self.logger) - signal.send(signal.RUN_FAILED, self, self) - else: - self.status = Status.COMPLETE - - def finalize(self): - self.logger.info('Finalizing run') - for job in self.job_queue: - job.finalize() - self.end_time = datetime.now() - self.info.end_time = self.end_time - self.info.duration += self.end_time - self.start_time - self.persist_state() - signal.send(signal.RUN_FINALIZED, self, self) - self.logger.info('Run completed') - - def begin_job(self): - self.logger.info('Starting job {}'.format(self.current_job.id)) - signal.send(signal.JOB_STARTED, self, self.current_job) - self.persist_state() - - def complete_job(self): - if self.current_job.status == Status.FAILED: - self.output.move_failed(self.current_job.output) - if self.current_job.should_retry: - self.logger.info('Restarting job {}'.format(self.current_job.id)) - self.persist_state() - self.current_job.restart() - signal.send(signal.JOB_RESTARTED, self, self.current_job) - return - - self.logger.info('Completing job {}'.format(self.current_job.id)) - self.current_job.complete() - self.persist_state() - signal.send(signal.JOB_COMPLETED, self, self.current_job) - job = self.job_queue.pop(0) - self.completed_jobs.append(job) - - def persist_state(self): - self.output.persist() - - -class RunContext(object): - """ - Provides a context for instrumentation. Keeps track of things like - current workload and iteration. - - """ - - @property - def run_output(self): - return self.runner.output - - @property - def current_job(self): - return self.runner.current_job - - @property - def run_output_directory(self): - return self.run_output.output_directory - - @property - def output_directory(self): - if self.runner.current_job: - return self.runner.current_job.output.output_directory - else: - return self.run_output.output_directory - - @property - def info_directory(self): - return self.run_output.info_directory - - @property - def config_directory(self): - return self.run_output.config_directory - - @property - def failed_directory(self): - return self.run_output.failed_directory - - @property - def log_file(self): - return os.path.join(self.output_directory, 'run.log') - - - def __init__(self, runner): - self.runner = runner - self.job = None - self.iteration = None - self.job_output = None - self.resolver = ResourceResolver() - - def initialize(self): - self.resolver.load() - - def get_path(self, subpath): - if self.current_job is None: - return self.run_output.get_path(subpath) - else: - return self.current_job.output.get_path(subpath) - - def add_metric(self, *args, **kwargs): - if self.current_job is None: - self.run_output.add_metric(*args, **kwargs) - else: - self.current_job.output.add_metric(*args, **kwargs) - - def add_artifact(self, name, path, kind, *args, **kwargs): - if self.current_job is None: - self.add_run_artifact(name, path, kind, *args, **kwargs) - else: - self.add_job_artifact(name, path, kind, *args, **kwargs) - - def add_run_artifact(self, *args, **kwargs): - self.run_output.add_artifiact(*args, **kwargs) - - def add_job_artifact(self, *args, **kwargs): - self.current_job.output.add_artifact(*args, **kwargs) - - def get_artifact(self, name): - if self.iteration_artifacts: - for art in self.iteration_artifacts: - if art.name == name: - return art - for art in self.run_artifacts: - if art.name == name: - return art - return None + def to_pod(self): + return OrderedDict( + id=self.id, + label=self.label, + iteration=self.iteration, + status=str(self.status), + retries=0, + timestamp=self.timestamp, + ) diff --git a/wa/utils/serializer.py b/wa/utils/serializer.py index 56d8f988..14c3eccf 100644 --- a/wa/utils/serializer.py +++ b/wa/utils/serializer.py @@ -232,6 +232,7 @@ def read_pod(source, fmt=None): message = 'source must be a path or an open file handle; got {}' raise ValueError(message.format(type(source))) + def write_pod(pod, dest, fmt=None): if isinstance(dest, basestring): with open(dest, 'w') as wfh: diff --git a/wa/utils/types.py b/wa/utils/types.py index fed3004f..359ff662 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -78,9 +78,10 @@ def list_of_bools(value, interpret_strings=True): """ Value must be iterable. All elements will be converted to ``bool``\ s. - .. note:: By default, ``boolean()`` conversion function will be used, which means that - strings like ``"0"`` or ``"false"`` will be interpreted as ``False``. If this - is undesirable, set ``interpret_strings`` to ``False``. + .. note:: By default, ``boolean()`` conversion function will be used, which + means that strings like ``"0"`` or ``"false"`` will be + interpreted as ``False``. If this is undesirable, set + ``interpret_strings`` to ``False``. """ if not isiterable(value): @@ -133,8 +134,8 @@ def list_or_string(value): def list_or_caseless_string(value): """ - Converts the value into a list of ``caseless_string``'s. If the value is not iterable - a one-element list with stringified value will be returned. + Converts the value into a list of ``caseless_string``'s. If the value is + not iterable a one-element list with stringified value will be returned. """ if isinstance(value, basestring): @@ -148,9 +149,10 @@ def list_or_caseless_string(value): def list_or(type_): """ - Generator for "list or" types. These take either a single value or a list values - and return a list of the specfied ``type_`` performing the conversion on the value - (if a single value is specified) or each of the elemented of the specified list. + Generator for "list or" types. These take either a single value or a list + values and return a list of the specfied ``type_`` performing the + conversion on the value (if a single value is specified) or each of the + elemented of the specified list. """ list_type = list_of(type_) @@ -176,8 +178,8 @@ none_type = type(None) def regex(value): """ - Regular expression. If value is a string, it will be complied with no flags. If you - want to specify flags, value must be precompiled. + Regular expression. If value is a string, it will be complied with no + flags. If you want to specify flags, value must be precompiled. """ if isinstance(value, regex_type): @@ -480,13 +482,13 @@ class obj_dict(MutableMapping): class level(object): """ - A level has a name and behaves like a string when printed, - however it also has a numeric value which is used in comparisons. + A level has a name and behaves like a string when printed, however it also + has a numeric value which is used in ordering comparisons. """ def __init__(self, name, value): - self.name = name + self.name = caseless_string(name) self.value = value def __str__(self): @@ -555,7 +557,7 @@ def enum(args, start=0): levels = [] for i, v in enumerate(args, start): - name = string.upper(identifier(v)) + name = caseless_string(identifier(v)) lv = level(v, i) setattr(Enum, name, lv) levels.append(lv) diff --git a/wa/workloads/dhrystone/__init__.py b/wa/workloads/dhrystone/__init__.py index 9cc2d9ba..2c3805d4 100644 --- a/wa/workloads/dhrystone/__init__.py +++ b/wa/workloads/dhrystone/__init__.py @@ -90,14 +90,18 @@ class Dhrystone(Workload): outfile = os.path.join(context.output_directory, 'dhrystone.output') with open(outfile, 'w') as wfh: wfh.write(self.output) + context.add_artifact('dhrystone-output', outfile, 'raw', "dhrystone's stdout") + score_count = 0 dmips_count = 0 total_score = 0 total_dmips = 0 + for line in self.output.split('\n'): match = self.time_regex.search(line) if match: - context.add_metric('time', float(match.group('time')), 'seconds', lower_is_better=True) + context.add_metric('time', float(match.group('time')), 'seconds', + lower_is_better=True) else: match = self.bm_regex.search(line) if match: @@ -114,6 +118,7 @@ class Dhrystone(Workload): context.add_metric(metric, value) dmips_count += 1 total_dmips += value + context.add_metric('total DMIPS', total_dmips) context.add_metric('total score', total_score)