1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-03-21 18:18:41 +00:00

Merge pull request #370 from setrofim/next

Initial implementation of state tracking and output handling.
This commit is contained in:
setrofim 2017-03-20 14:45:30 +00:00 committed by GitHub
commit efaaf6b7c8
15 changed files with 774 additions and 1316 deletions

View File

@ -7,6 +7,7 @@ from wa.framework.exception import (ResultProcessorError, ResourceError,
from wa.framework.exception import (WAError, NotFoundError, ValidationError,
WorkloadError)
from wa.framework.exception import WorkerThreadError, PluginLoaderError
from wa.framework.instrumentation import Instrument
from wa.framework.instrumentation import (Instrument, very_slow, slow, normal, fast,
very_fast)
from wa.framework.plugin import Plugin, Parameter
from wa.framework.workload import Workload

View File

@ -24,7 +24,7 @@ from wa.framework import pluginloader
from wa.framework.configuration import RunConfiguration
from wa.framework.configuration.parsers import AgendaParser, ConfigParser
from wa.framework.execution import Executor
from wa.framework.output import init_wa_output
from wa.framework.output import init_run_output
from wa.framework.version import get_wa_version
from wa.framework.exception import NotFoundError, ConfigError
from wa.utils import log
@ -123,7 +123,7 @@ class RunCommand(Command):
output_directory = settings.default_output_directory
self.logger.debug('Using output directory: {}'.format(output_directory))
try:
return init_wa_output(output_directory, config, args.force)
return init_run_output(output_directory, config, args.force)
except RuntimeError as e:
if 'path exists' in str(e):
msg = 'Output directory "{}" exists.\nPlease specify another '\

View File

@ -24,6 +24,7 @@ from wa.utils.types import (identifier, integer, boolean, list_of_strings,
list_of, toggle_set, obj_dict, enum)
from wa.utils.serializer import is_pod
# Mapping for kind conversion; see docs for convert_types below
KIND_MAP = {
int: integer,
@ -31,8 +32,11 @@ KIND_MAP = {
dict: OrderedDict,
}
JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING',
'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED'])
RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING',
'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
JobStatus = enum(['NEW', 'PENDING', 'RUNNING',
'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
##########################
@ -632,109 +636,144 @@ class RunConfiguration(Configuration):
# Metadata is separated out because it is not loaded into the auto
# generated config file
meta_data = [
ConfigurationPoint('run_name', kind=str,
description='''
A string that labels the WA run that is being performed. This would typically
be set in the ``config`` section of an agenda (see
:ref:`configuration in an agenda <configuration_in_agenda>`) rather than in the config file.
.. _old-style format strings: http://docs.python.org/2/library/stdtypes.html#string-formatting-operations
.. _log record attributes: http://docs.python.org/2/library/logging.html#logrecord-attributes
'''),
ConfigurationPoint('project', kind=str,
description='''
A string naming the project for which data is being collected. This may be
useful, e.g. when uploading data to a shared database that is populated from
multiple projects.
'''),
ConfigurationPoint('project_stage', kind=dict,
description='''
A dict or a string that allows adding additional identifier. This is may be
useful for long-running projects.
'''),
ConfigurationPoint(
'run_name',
kind=str,
description='''
A string that labels the WA run that is being performed. This would
typically be set in the ``config`` section of an agenda (see
:ref:`configuration in an agenda <configuration_in_agenda>`) rather
than in the config file.
''',
),
ConfigurationPoint(
'project',
kind=str,
description='''
A string naming the project for which data is being collected. This
may be useful, e.g. when uploading data to a shared database that
is populated from multiple projects.
''',
),
ConfigurationPoint(
'project_stage',
kind=dict,
description='''
A dict or a string that allows adding additional identifier. This
is may be useful for long-running projects.
''',
),
]
config_points = [
ConfigurationPoint('execution_order', kind=str, default='by_iteration',
allowed_values=['by_iteration', 'by_spec', 'by_section', 'random'],
description='''
Defines the order in which the agenda spec will be executed. At the moment,
the following execution orders are supported:
ConfigurationPoint(
'execution_order',
kind=str,
default='by_iteration',
allowed_values=['by_iteration', 'by_spec', 'by_section', 'random'],
description='''
Defines the order in which the agenda spec will be executed. At the
moment, the following execution orders are supported:
``"by_iteration"``
The first iteration of each workload spec is executed one after the other,
so all workloads are executed before proceeding on to the second iteration.
E.g. A1 B1 C1 A2 C2 A3. This is the default if no order is explicitly specified.
``"by_iteration"``
The first iteration of each workload spec is executed one after
the other, so all workloads are executed before proceeding on
to the second iteration. E.g. A1 B1 C1 A2 C2 A3. This is the
default if no order is explicitly specified.
In case of multiple sections, this will spread them out, such that specs
from the same section are further part. E.g. given sections X and Y, global
specs A and B, and two iterations, this will run ::
In case of multiple sections, this will spread them out, such
that specs from the same section are further part. E.g. given
sections X and Y, global specs A and B, and two iterations,
this will run ::
X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2
X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2
``"by_section"``
Same as ``"by_iteration"``, however this will group specs from the same
section together, so given sections X and Y, global specs A and B, and two iterations,
this will run ::
``"by_section"``
Same as ``"by_iteration"``, however this will group specs from
the same section together, so given sections X and Y, global
specs A and B, and two iterations, this will run ::
X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2
X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2
``"by_spec"``
All iterations of the first spec are executed before moving on to the next
spec. E.g. A1 A2 A3 B1 C1 C2 This may also be specified as ``"classic"``,
as this was the way workloads were executed in earlier versions of WA.
``"by_spec"``
All iterations of the first spec are executed before moving on
to the next spec. E.g. A1 A2 A3 B1 C1 C2 This may also be
specified as ``"classic"``, as this was the way workloads were
executed in earlier versions of WA.
``"random"``
Execution order is entirely random.
'''),
ConfigurationPoint('reboot_policy', kind=RebootPolicy, default='as_needed',
allowed_values=RebootPolicy.valid_policies,
description='''
This defines when during execution of a run the Device will be rebooted. The
possible values are:
``"random"``
Execution order is entirely random.
''',
),
ConfigurationPoint(
'reboot_policy',
kind=RebootPolicy,
default='as_needed',
allowed_values=RebootPolicy.valid_policies,
description='''
This defines when during execution of a run the Device will be
rebooted. The possible values are:
``"never"``
The device will never be rebooted.
``"initial"``
The device will be rebooted when the execution first starts, just before
executing the first workload spec.
``"each_spec"``
The device will be rebooted before running a new workload spec.
Note: this acts the same as each_iteration when execution order is set to by_iteration
``"each_iteration"``
The device will be rebooted before each new iteration.
'''),
ConfigurationPoint('device', kind=str, mandatory=True,
description='''
This setting defines what specific Device subclass will be used to interact
the connected device. Obviously, this must match your setup.
'''),
ConfigurationPoint('retry_on_status', kind=list_of(JobStatus),
default=['FAILED', 'PARTIAL'],
allowed_values=JobStatus.values,
description='''
This is list of statuses on which a job will be cosidered to have failed and
will be automatically retried up to ``max_retries`` times. This defaults to
``["FAILED", "PARTIAL"]`` if not set. Possible values are:
``"never"``
The device will never be rebooted.
``"OK"``
This iteration has completed and no errors have been detected
``"initial"``
The device will be rebooted when the execution first starts,
just before executing the first workload spec.
``"PARTIAL"``
One or more instruments have failed (the iteration may still be running).
``"each_spec"``
The device will be rebooted before running a new workload spec.
``"FAILED"``
The workload itself has failed.
.. note:: this acts the same as each_iteration when execution order
is set to by_iteration
``"ABORTED"``
The user interupted the workload
'''),
ConfigurationPoint('max_retries', kind=int, default=2,
description='''
The maximum number of times failed jobs will be retried before giving up. If
not set.
``"each_iteration"``
The device will be rebooted before each new iteration.
'''),
ConfigurationPoint(
'device',
kind=str,
mandatory=True,
description='''
This setting defines what specific Device subclass will be used to
interact the connected device. Obviously, this must match your
setup.
''',
),
ConfigurationPoint(
'retry_on_status',
kind=list_of(JobStatus),
default=['FAILED', 'PARTIAL'],
allowed_values=JobStatus.values,
description='''
This is list of statuses on which a job will be cosidered to have
failed and will be automatically retried up to ``max_retries``
times. This defaults to ``["FAILED", "PARTIAL"]`` if not set.
Possible values are::
.. note:: this number does not include the original attempt
'''),
``"OK"``
This iteration has completed and no errors have been detected
``"PARTIAL"``
One or more instruments have failed (the iteration may still be running).
``"FAILED"``
The workload itself has failed.
``"ABORTED"``
The user interupted the workload
''',
),
ConfigurationPoint(
'max_retries',
kind=int,
default=2,
description='''
The maximum number of times failed jobs will be retried before
giving up. If not set.
.. note:: this number does not include the original attempt
''',
),
]
configuration = {cp.name: cp for cp in config_points + meta_data}

View File

@ -9,6 +9,8 @@ from wa.framework.configuration.parsers import ConfigParser
from wa.framework.configuration.plugin_cache import PluginCache
from wa.framework.exception import NotFoundError
from wa.framework.job import Job
from wa.framework.run import JobState
from wa.utils import log
from wa.utils.types import enum
@ -102,10 +104,13 @@ class ConfigManager(object):
def generate_jobs(self, context):
job_specs = self.jobs_config.generate_job_specs(context.tm)
exec_order = self.run_config.execution_order
log.indent()
for spec, i in permute_iterations(job_specs, exec_order):
job = Job(spec, i, context)
job.load(context.tm.target)
self._jobs.append(job)
context.run_state.add_job(job)
log.dedent()
self._jobs_generated = True

View File

@ -15,27 +15,6 @@
# pylint: disable=no-member
"""
This module contains the execution logic for Workload Automation. It defines the
following actors:
WorkloadSpec: Identifies the workload to be run and defines parameters under
which it should be executed.
Executor: Responsible for the overall execution process. It instantiates
and/or intialises the other actors, does any necessary vaidation
and kicks off the whole process.
Execution Context: Provides information about the current state of run
execution to instrumentation.
RunInfo: Information about the current run.
Runner: This executes workload specs that are passed to it. It goes through
stages of execution, emitting an appropriate signal at each step to
allow instrumentation to do its stuff.
"""
import logging
import os
import random
@ -49,31 +28,22 @@ from itertools import izip_longest
import wa.framework.signal as signal
from wa.framework import instrumentation, pluginloader
from wa.framework.configuration.core import settings
from wa.framework.configuration.execution import JobStatus
from wa.framework.configuration.core import settings, RunStatus, JobStatus
from wa.framework.exception import (WAError, ConfigError, TimeoutError,
InstrumentError, TargetError,
TargetNotRespondingError)
from wa.framework.output import init_job_output
from wa.framework.plugin import Artifact
from wa.framework.resource import ResourceResolver
from wa.framework.run import RunState
from wa.framework.target.info import TargetInfo
from wa.framework.target.manager import TargetManager
from wa.utils import log
from wa.utils.misc import (ensure_directory_exists as _d,
from wa.utils.misc import (ensure_directory_exists as _d, merge_config_values,
get_traceback, format_duration)
from wa.utils.serializer import json
# The maximum number of reboot attempts for an iteration.
MAX_REBOOT_ATTEMPTS = 3
# If something went wrong during device initialization, wait this
# long (in seconds) before retrying. This is necessary, as retrying
# immediately may not give the device enough time to recover to be able
# to reboot.
REBOOT_DELAY = 3
class ExecutionContext(object):
@property
@ -105,16 +75,26 @@ class ExecutionContext(object):
return self.current_job.spec.id != self.next_job.spec.id
@property
def output_directory(self):
def job_output(self):
if self.current_job:
return os.path.join(self.output.basepath, self.current_job.output_name)
return self.current_job.output
@property
def output(self):
if self.current_job:
return self.job_output
return self.run_output
@property
def output_directory(self):
return self.output.basepath
def __init__(self, cm, tm, output):
self.logger = logging.getLogger('context')
self.cm = cm
self.tm = tm
self.output = output
self.run_output = output
self.run_state = output.state
self.logger.debug('Loading resource discoverers')
self.resolver = ResourceResolver(cm)
self.resolver.load()
@ -127,157 +107,64 @@ class ExecutionContext(object):
self.output.write_info()
self.job_queue = copy(self.cm.jobs)
self.completed_jobs = []
self.run_state.status = RunStatus.STARTED
self.output.write_state()
def end_run(self):
self.output.info.end_time = datetime.now()
self.output.info.duration = self.output.info.end_time -\
self.output.info.start_time
self.output.write_info()
self.output.write_state()
self.output.write_result()
def start_job(self):
if not self.job_queue:
raise RuntimeError('No jobs to run')
self.current_job = self.job_queue.pop(0)
os.makedirs(self.output_directory)
self.current_job.output = init_job_output(self.run_output, self.current_job)
self.update_job_state(self.current_job)
return self.current_job
def end_job(self):
if not self.current_job:
raise RuntimeError('No jobs in progress')
self.completed_jobs.append(self.current_job)
self.update_job_state(self.current_job)
self.output.write_result()
self.current_job = None
def move_failed(self, job):
attempt = job.retries + 1
failed_name = '{}-attempt{:02}'.format(job.output_name, attempt)
self.output.move_failed(job.output_name, failed_name)
self.run_output.move_failed(job.output)
def update_job_state(self, job):
self.run_state.update_job(job)
self.run_output.write_state()
class OldExecutionContext(object):
"""
Provides a context for instrumentation. Keeps track of things like
current workload and iteration.
def skip_remaining_jobs(self):
while self.job_queue:
job = self.job_queue.pop(0)
job.status = JobStatus.SKIPPED
self.run_state.update_job(job)
self.completed_jobs.append(job)
self.write_state()
This class also provides two status members that can be used by workloads
and instrumentation to keep track of arbitrary state. ``result``
is reset on each new iteration of a workload; run_status is maintained
throughout a Workload Automation run.
def write_state(self):
self.run_output.write_state()
"""
# These are the artifacts generated by the core framework.
default_run_artifacts = [
Artifact('runlog', 'run.log', 'log', mandatory=True,
description='The log for the entire run.'),
]
@property
def current_iteration(self):
def add_metric(self, name, value, units=None, lower_is_better=False,
classifiers=None):
if self.current_job:
spec_id = self.current_job.spec.id
return self.job_iteration_counts[spec_id]
else:
return None
classifiers = merge_config_values(self.current_job.classifiers,
classifiers)
self.output.add_metric(name, value, units, lower_is_better, classifiers)
@property
def job_status(self):
if not self.current_job:
return None
return self.current_job.result.status
def add_artifact(self, name, path, kind, description=None, classifiers=None):
self.output.add_artifact(name, path, kind, description, classifiers)
@property
def workload(self):
return getattr(self.spec, 'workload', None)
@property
def spec(self):
return getattr(self.current_job, 'spec', None)
@property
def result(self):
return getattr(self.current_job, 'result', self.run_result)
def __init__(self, device_manager, config):
self.device_manager = device_manager
self.device = self.device_manager.target
self.config = config
self.reboot_policy = config.reboot_policy
self.output_directory = None
self.current_job = None
self.resolver = None
self.last_error = None
self.run_info = None
self.run_result = None
self.run_output_directory = self.config.output_directory
self.host_working_directory = self.config.meta_directory
self.iteration_artifacts = None
self.run_artifacts = copy(self.default_run_artifacts)
self.job_iteration_counts = defaultdict(int)
self.aborted = False
self.runner = None
def initialize(self):
if not os.path.isdir(self.run_output_directory):
os.makedirs(self.run_output_directory)
self.output_directory = self.run_output_directory
self.resolver = ResourceResolver(self.config)
self.run_info = RunInfo(self.config)
self.run_result = RunResult(self.run_info, self.run_output_directory)
def next_job(self, job):
"""Invoked by the runner when starting a new iteration of workload execution."""
self.current_job = job
self.job_iteration_counts[self.spec.id] += 1
if not self.aborted:
outdir_name = '_'.join(map(str, [self.spec.label, self.spec.id, self.current_iteration]))
self.output_directory = _d(os.path.join(self.run_output_directory, outdir_name))
self.iteration_artifacts = [wa for wa in self.workload.artifacts]
self.current_job.result.iteration = self.current_iteration
self.current_job.result.output_directory = self.output_directory
def end_job(self):
if self.current_job.result.status == JobStatus.ABORTED:
self.aborted = True
self.current_job = None
self.output_directory = self.run_output_directory
def add_metric(self, *args, **kwargs):
self.result.add_metric(*args, **kwargs)
def add_artifact(self, name, path, kind, *args, **kwargs):
if self.current_job is None:
self.add_run_artifact(name, path, kind, *args, **kwargs)
else:
self.add_iteration_artifact(name, path, kind, *args, **kwargs)
def add_run_artifact(self, name, path, kind, *args, **kwargs):
path = _check_artifact_path(path, self.run_output_directory)
self.run_artifacts.append(Artifact(name, path, kind, Artifact.ITERATION, *args, **kwargs))
def add_iteration_artifact(self, name, path, kind, *args, **kwargs):
path = _check_artifact_path(path, self.output_directory)
self.iteration_artifacts.append(Artifact(name, path, kind, Artifact.RUN, *args, **kwargs))
def get_artifact(self, name):
if self.iteration_artifacts:
for art in self.iteration_artifacts:
if art.name == name:
return art
for art in self.run_artifacts:
if art.name == name:
return art
return None
def _check_artifact_path(path, rootpath):
if path.startswith(rootpath):
return os.path.abspath(path)
rootpath = os.path.abspath(rootpath)
full_path = os.path.join(rootpath, path)
if not os.path.isfile(full_path):
msg = 'Cannot add artifact because {} does not exist.'
raise ValueError(msg.format(full_path))
return full_path
def add_run_artifact(self, name, path, kind, description=None,
classifiers=None):
self.run_output.add_artifact(name, path, kind, description, classifiers)
class Executor(object):
@ -302,7 +189,6 @@ class Executor(object):
pluginloader = None
self.device_manager = None
self.device = None
self.context = None
def execute(self, config_manager, output):
"""
@ -335,6 +221,7 @@ class Executor(object):
self.logger.info('Generating jobs')
config_manager.generate_jobs(context)
output.write_job_specs(config_manager.job_specs)
output.write_state()
self.logger.info('Installing instrumentation')
for instrument in config_manager.get_instruments(target_manager.target):
@ -343,52 +230,33 @@ class Executor(object):
self.logger.info('Starting run')
runner = Runner(context)
signal.send(signal.RUN_STARTED, self)
runner.run()
self.execute_postamble(context, output)
signal.send(signal.RUN_COMPLETED, self)
def execute_postamble(self):
"""
This happens after the run has completed. The overall results of the run are
summarised to the user.
"""
result = self.context.run_result
counter = Counter()
for ir in result.iteration_results:
counter[ir.status] += 1
def execute_postamble(self, context, output):
self.logger.info('Done.')
self.logger.info('Run duration: {}'.format(format_duration(self.context.run_info.duration)))
status_summary = 'Ran a total of {} iterations: '.format(sum(self.context.job_iteration_counts.values()))
duration = format_duration(output.info.duration)
self.logger.info('Run duration: {}'.format(duration))
num_ran = context.run_state.num_completed_jobs
status_summary = 'Ran a total of {} iterations: '.format(num_ran)
counter = context.run_state.get_status_counts()
parts = []
for status in JobStatus.values:
for status in reversed(JobStatus.values):
if status in counter:
parts.append('{} {}'.format(counter[status], status))
self.logger.info(status_summary + ', '.join(parts))
self.logger.info('Results can be found in {}'.format(self.config.output_directory))
self.logger.info('Results can be found in {}'.format(output.basepath))
if self.error_logged:
self.logger.warn('There were errors during execution.')
self.logger.warn('Please see {}'.format(self.config.log_file))
self.logger.warn('Please see {}'.format(output.logfile))
elif self.warning_logged:
self.logger.warn('There were warnings during execution.')
self.logger.warn('Please see {}'.format(self.config.log_file))
def _get_runner(self, result_manager):
if not self.config.execution_order or self.config.execution_order == 'by_iteration':
if self.config.reboot_policy == 'each_spec':
self.logger.info('each_spec reboot policy with the default by_iteration execution order is '
'equivalent to each_iteration policy.')
runnercls = ByIterationRunner
elif self.config.execution_order in ['classic', 'by_spec']:
runnercls = BySpecRunner
elif self.config.execution_order == 'by_section':
runnercls = BySectionRunner
elif self.config.execution_order == 'random':
runnercls = RandomRunner
else:
raise ConfigError('Unexpected execution order: {}'.format(self.config.execution_order))
return runnercls(self.device_manager, self.context, result_manager)
self.logger.warn('Please see {}'.format(output.logfile))
def _error_signalled_callback(self):
self.error_logged = True
@ -411,14 +279,16 @@ class Runner(object):
self.config = self.context.cm
def run(self):
self.send(signal.RUN_STARTED)
try:
self.initialize_run()
self.send(signal.RUN_INITIALIZED)
while self.context.job_queue:
with signal.wrap('JOB_EXECUTION', self):
self.run_next_job(self.context)
try:
with signal.wrap('JOB_EXECUTION', self):
self.run_next_job(self.context)
except KeyboardInterrupt:
self.context.skip_remaining_jobs()
except Exception as e:
if (not getattr(e, 'logged', None) and
not isinstance(e, KeyboardInterrupt)):
@ -427,7 +297,7 @@ class Runner(object):
raise e
finally:
self.finalize_run()
self.send(signal.RUN_COMPLETED)
self.send(signal.RUN_FINALIZED)
def initialize_run(self):
self.logger.info('Initializing run')
@ -436,6 +306,7 @@ class Runner(object):
for job in self.context.job_queue:
job.initialize(self.context)
log.dedent()
self.context.write_state()
def finalize_run(self):
self.logger.info('Finalizing run')
@ -448,6 +319,7 @@ class Runner(object):
try:
log.indent()
self.do_run_job(job, context)
job.status = JobStatus.OK
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
raise
@ -486,6 +358,7 @@ class Runner(object):
raise
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
self.logger.info('Got CTRL-C. Aborting.')
raise
except Exception as e:
job.status = JobStatus.FAILED
@ -503,12 +376,13 @@ class Runner(object):
rc = self.context.cm.run_config
if job.status in rc.retry_on_status:
if job.retries < rc.max_retries:
msg = 'Job {} iteration {} complted with status {}. retrying...'
msg = 'Job {} iteration {} completed with status {}. retrying...'
self.logger.error(msg.format(job.id, job.status, job.iteration))
self.context.move_failed(job)
job.retries += 1
job.status = JobStatus.PENDING
self.context.job_queue.insert(0, job)
self.context.write_state()
else:
msg = 'Job {} iteration {} completed with status {}. '\
'Max retries exceeded.'
@ -522,454 +396,3 @@ class Runner(object):
def __str__(self):
return 'runner'
class RunnerJob(object):
"""
Represents a single execution of a ``RunnerJobDescription``. There will be one created for each iteration
specified by ``RunnerJobDescription.number_of_iterations``.
"""
def __init__(self, spec, retry=0):
self.spec = spec
self.retry = retry
self.iteration = None
self.result = JobStatus(self.spec)
class OldRunner(object):
"""
This class is responsible for actually performing a workload automation
run. The main responsibility of this class is to emit appropriate signals
at the various stages of the run to allow things like traces an other
instrumentation to hook into the process.
This is an abstract base class that defines each step of the run, but not
the order in which those steps are executed, which is left to the concrete
derived classes.
"""
class _RunnerError(Exception):
"""Internal runner error."""
pass
@property
def config(self):
return self.context.config
@property
def current_job(self):
if self.job_queue:
return self.job_queue[0]
return None
@property
def previous_job(self):
if self.completed_jobs:
return self.completed_jobs[-1]
return None
@property
def next_job(self):
if self.job_queue:
if len(self.job_queue) > 1:
return self.job_queue[1]
return None
@property
def spec_changed(self):
if self.previous_job is None and self.current_job is not None: # Start of run
return True
if self.previous_job is not None and self.current_job is None: # End of run
return True
return self.current_job.spec.id != self.previous_job.spec.id
@property
def spec_will_change(self):
if self.current_job is None and self.next_job is not None: # Start of run
return True
if self.current_job is not None and self.next_job is None: # End of run
return True
return self.current_job.spec.id != self.next_job.spec.id
def __init__(self, device_manager, context, result_manager):
self.device_manager = device_manager
self.device = device_manager.target
self.context = context
self.result_manager = result_manager
self.logger = logging.getLogger('Runner')
self.job_queue = []
self.completed_jobs = []
self._initial_reset = True
def init_queue(self, specs):
raise NotImplementedError()
def run(self): # pylint: disable=too-many-branches
self._send(signal.RUN_START)
with signal.wrap('RUN_INIT'):
self._initialize_run()
try:
while self.job_queue:
try:
self._init_job()
self._run_job()
except KeyboardInterrupt:
self.current_job.result.status = JobStatus.ABORTED
raise
except Exception, e: # pylint: disable=broad-except
self.current_job.result.status = JobStatus.FAILED
self.current_job.result.add_event(e.message)
if isinstance(e, DeviceNotRespondingError):
self.logger.info('Device appears to be unresponsive.')
if self.context.reboot_policy.can_reboot and self.device.can('reset_power'):
self.logger.info('Attempting to hard-reset the device...')
try:
self.device.boot(hard=True)
self.device.connect()
except DeviceError: # hard_boot not implemented for the device.
raise e
else:
raise e
else: # not a DeviceNotRespondingError
self.logger.error(e)
finally:
self._finalize_job()
except KeyboardInterrupt:
self.logger.info('Got CTRL-C. Finalizing run... (CTRL-C again to abort).')
# Skip through the remaining jobs.
while self.job_queue:
self.context.next_job(self.current_job)
self.current_job.result.status = JobStatus.ABORTED
self._finalize_job()
except DeviceNotRespondingError:
self.logger.info('Device unresponsive and recovery not possible. Skipping the rest of the run.')
self.context.aborted = True
while self.job_queue:
self.context.next_job(self.current_job)
self.current_job.result.status = JobStatus.SKIPPED
self._finalize_job()
instrumentation.enable_all()
self._finalize_run()
self._process_results()
self.result_manager.finalize(self.context)
self._send(signal.RUN_END)
def _initialize_run(self):
self.context.runner = self
self.context.run_info.start_time = datetime.utcnow()
self._connect_to_device()
self.logger.info('Initializing device')
self.device_manager.initialize(self.context)
self.logger.info('Initializing workloads')
for workload_spec in self.context.config.workload_specs:
workload_spec.workload.initialize(self.context)
self.context.run_info.device_properties = self.device_manager.info
self.result_manager.initialize(self.context)
if instrumentation.check_failures():
raise InstrumentError('Detected failure(s) during instrumentation initialization.')
def _connect_to_device(self):
if self.context.reboot_policy.perform_initial_boot:
try:
self.device_manager.connect()
except DeviceError: # device may be offline
if self.device.can('reset_power'):
with self._signal_wrap('INITIAL_BOOT'):
self.device.boot(hard=True)
else:
raise DeviceError('Cannot connect to device for initial reboot; '
'and device does not support hard reset.')
else: # successfully connected
self.logger.info('\tBooting device')
with self._signal_wrap('INITIAL_BOOT'):
self._reboot_device()
else:
self.logger.info('Connecting to device')
self.device_manager.connect()
def _init_job(self):
self.current_job.result.status = JobStatus.RUNNING
self.context.next_job(self.current_job)
def _run_job(self): # pylint: disable=too-many-branches
spec = self.current_job.spec
if not spec.enabled:
self.logger.info('Skipping workload %s (iteration %s)', spec, self.context.current_iteration)
self.current_job.result.status = JobStatus.SKIPPED
return
self.logger.info('Running workload %s (iteration %s)', spec, self.context.current_iteration)
if spec.flash:
if not self.context.reboot_policy.can_reboot:
raise ConfigError('Cannot flash as reboot_policy does not permit rebooting.')
if not self.device.can('flash'):
raise DeviceError('Device does not support flashing.')
self._flash_device(spec.flash)
elif not self.completed_jobs:
# Never reboot on the very fist job of a run, as we would have done
# the initial reboot if a reboot was needed.
pass
elif self.context.reboot_policy.reboot_on_each_spec and self.spec_changed:
self.logger.debug('Rebooting on spec change.')
self._reboot_device()
elif self.context.reboot_policy.reboot_on_each_iteration:
self.logger.debug('Rebooting on iteration.')
self._reboot_device()
instrumentation.disable_all()
instrumentation.enable(spec.instrumentation)
self.device_manager.start()
if self.spec_changed:
self._send(signal.WORKLOAD_SPEC_START)
self._send(signal.ITERATION_START)
try:
setup_ok = False
with self._handle_errors('Setting up device parameters'):
self.device_manager.set_runtime_parameters(spec.runtime_parameters)
setup_ok = True
if setup_ok:
with self._handle_errors('running {}'.format(spec.workload.name)):
self.current_job.result.status = JobStatus.RUNNING
self._run_workload_iteration(spec.workload)
else:
self.logger.info('\tSkipping the rest of the iterations for this spec.')
spec.enabled = False
except KeyboardInterrupt:
self._send(signal.ITERATION_END)
self._send(signal.WORKLOAD_SPEC_END)
raise
else:
self._send(signal.ITERATION_END)
if self.spec_will_change or not spec.enabled:
self._send(signal.WORKLOAD_SPEC_END)
finally:
self.device_manager.stop()
def _finalize_job(self):
self.context.run_result.iteration_results.append(self.current_job.result)
job = self.job_queue.pop(0)
job.iteration = self.context.current_iteration
if job.result.status in self.config.retry_on_status:
if job.retry >= self.config.max_retries:
self.logger.error('Exceeded maxium number of retries. Abandoning job.')
else:
self.logger.info('Job status was {}. Retrying...'.format(job.result.status))
retry_job = RunnerJob(job.spec, job.retry + 1)
self.job_queue.insert(0, retry_job)
self.completed_jobs.append(job)
self.context.end_job()
def _finalize_run(self):
self.logger.info('Finalizing workloads')
for workload_spec in self.context.config.workload_specs:
workload_spec.workload.finalize(self.context)
self.logger.info('Finalizing.')
self._send(signal.RUN_FIN)
with self._handle_errors('Disconnecting from the device'):
self.device.disconnect()
info = self.context.run_info
info.end_time = datetime.utcnow()
info.duration = info.end_time - info.start_time
def _process_results(self):
self.logger.info('Processing overall results')
with self._signal_wrap('OVERALL_RESULTS_PROCESSING'):
if instrumentation.check_failures():
self.context.run_result.non_iteration_errors = True
self.result_manager.process_run_result(self.context.run_result, self.context)
def _run_workload_iteration(self, workload):
self.logger.info('\tSetting up')
with self._signal_wrap('WORKLOAD_SETUP'):
try:
workload.setup(self.context)
except:
self.logger.info('\tSkipping the rest of the iterations for this spec.')
self.current_job.spec.enabled = False
raise
try:
self.logger.info('\tExecuting')
with self._handle_errors('Running workload'):
with self._signal_wrap('WORKLOAD_EXECUTION'):
workload.run(self.context)
self.logger.info('\tProcessing result')
self._send(signal.BEFORE_WORKLOAD_RESULT_UPDATE)
try:
if self.current_job.result.status != JobStatus.FAILED:
with self._handle_errors('Processing workload result',
on_error_status=JobStatus.PARTIAL):
workload.update_result(self.context)
self._send(signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE)
if self.current_job.result.status == JobStatus.RUNNING:
self.current_job.result.status = JobStatus.OK
finally:
self._send(signal.AFTER_WORKLOAD_RESULT_UPDATE)
finally:
self.logger.info('\tTearing down')
with self._handle_errors('Tearing down workload',
on_error_status=JobStatus.NONCRITICAL):
with self._signal_wrap('WORKLOAD_TEARDOWN'):
workload.teardown(self.context)
self.result_manager.add_result(self.current_job.result, self.context)
def _flash_device(self, flashing_params):
with self._signal_wrap('FLASHING'):
self.device.flash(**flashing_params)
self.device.connect()
def _reboot_device(self):
with self._signal_wrap('BOOT'):
for reboot_attempts in xrange(MAX_REBOOT_ATTEMPTS):
if reboot_attempts:
self.logger.info('\tRetrying...')
with self._handle_errors('Rebooting device'):
self.device.boot(**self.current_job.spec.boot_parameters)
break
else:
raise DeviceError('Could not reboot device; max reboot attempts exceeded.')
self.device.connect()
def _send(self, s):
signal.send(s, self, self.context)
def _take_screenshot(self, filename):
if self.context.output_directory:
filepath = os.path.join(self.context.output_directory, filename)
else:
filepath = os.path.join(settings.output_directory, filename)
self.device.capture_screen(filepath)
@contextmanager
def _handle_errors(self, action, on_error_status=JobStatus.FAILED):
try:
if action is not None:
self.logger.debug(action)
yield
except (KeyboardInterrupt, DeviceNotRespondingError):
raise
except (WAError, TimeoutError), we:
self.device.check_responsive()
if self.current_job:
self.current_job.result.status = on_error_status
self.current_job.result.add_event(str(we))
try:
self._take_screenshot('error.png')
except Exception, e: # pylint: disable=W0703
# We're already in error state, so the fact that taking a
# screenshot failed is not surprising...
pass
if action:
action = action[0].lower() + action[1:]
self.logger.error('Error while {}:\n\t{}'.format(action, we))
except Exception, e: # pylint: disable=W0703
error_text = '{}("{}")'.format(e.__class__.__name__, e)
if self.current_job:
self.current_job.result.status = on_error_status
self.current_job.result.add_event(error_text)
self.logger.error('Error while {}'.format(action))
self.logger.error(error_text)
if isinstance(e, subprocess.CalledProcessError):
self.logger.error('Got:')
self.logger.error(e.output)
tb = get_traceback()
self.logger.error(tb)
@contextmanager
def _signal_wrap(self, signal_name):
"""Wraps the suite in before/after signals, ensuring
that after signal is always sent."""
before_signal = getattr(signal, 'BEFORE_' + signal_name)
success_signal = getattr(signal, 'SUCCESSFUL_' + signal_name)
after_signal = getattr(signal, 'AFTER_' + signal_name)
try:
self._send(before_signal)
yield
self._send(success_signal)
finally:
self._send(after_signal)
class BySpecRunner(Runner):
"""
This is that "classic" implementation that executes all iterations of a workload
spec before proceeding onto the next spec.
"""
def init_queue(self, specs):
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable
self.job_queue = [j for spec_jobs in jobs for j in spec_jobs]
class BySectionRunner(Runner):
"""
Runs the first iteration for all benchmarks first, before proceeding to the next iteration,
i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2...
If multiple sections where specified in the agenda, this will run all specs for the first section
followed by all specs for the seciod section, etc.
e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run
X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2
"""
def init_queue(self, specs):
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs]
self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j]
class ByIterationRunner(Runner):
"""
Runs the first iteration for all benchmarks first, before proceeding to the next iteration,
i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2...
If multiple sections where specified in the agenda, this will run all sections for the first global
spec first, followed by all sections for the second spec, etc.
e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run
X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2
"""
def init_queue(self, specs):
sections = OrderedDict()
for s in specs:
if s.section_id not in sections:
sections[s.section_id] = []
sections[s.section_id].append(s)
specs = [s for section_specs in izip_longest(*sections.values()) for s in section_specs if s]
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs]
self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j]
class RandomRunner(Runner):
"""
This will run specs in a random order.
"""
def init_queue(self, specs):
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable
all_jobs = [j for spec_jobs in jobs for j in spec_jobs]
random.shuffle(all_jobs)
self.job_queue = all_jobs

View File

@ -105,8 +105,9 @@ from collections import OrderedDict
from wa.framework import signal
from wa.framework.plugin import Plugin
from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError
from wa.utils.log import log_error
from wa.utils.misc import get_traceback, isiterable
from wa.utils.types import identifier
from wa.utils.types import identifier, enum, level
logger = logging.getLogger('instrumentation')
@ -120,14 +121,14 @@ logger = logging.getLogger('instrumentation')
SIGNAL_MAP = OrderedDict([
# Below are "aliases" for some of the more common signals to allow
# instrumentation to have similar structure to workloads
('initialize', signal.SUCCESSFUL_RUN_INIT),
# ('setup', signal.SUCCESSFUL_WORKLOAD_SETUP),
# ('start', signal.BEFORE_WORKLOAD_EXECUTION),
# ('stop', signal.AFTER_WORKLOAD_EXECUTION),
# ('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE),
# ('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE),
# ('teardown', signal.AFTER_WORKLOAD_TEARDOWN),
# ('finalize', signal.RUN_FIN),
('initialize', signal.RUN_INITIALIZED),
('setup', signal.BEFORE_WORKLOAD_SETUP),
('start', signal.BEFORE_WORKLOAD_EXECUTION),
('stop', signal.AFTER_WORKLOAD_EXECUTION),
('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE),
('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE),
('teardown', signal.AFTER_WORKLOAD_TEARDOWN),
('finalize', signal.RUN_FINALIZED),
# ('on_run_start', signal.RUN_START),
# ('on_run_end', signal.RUN_END),
@ -171,13 +172,37 @@ SIGNAL_MAP = OrderedDict([
# ('on_warning', signal.WARNING_LOGGED),
])
PRIORITY_MAP = OrderedDict([
('very_fast_', 20),
('fast_', 10),
('normal_', 0),
('slow_', -10),
('very_slow_', -20),
])
Priority = enum(['very_slow', 'slow', 'normal', 'fast', 'very_fast'], -20, 10)
def get_priority(func):
return getattr(getattr(func, 'im_func', func),
'priority', Priority.normal)
def priority(priority):
def decorate(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.func_name = func.func_name
if priority in Priority.values:
wrapper.priority = Priority(priority)
else:
if not isinstance(priority, int):
msg = 'Invalid priorty "{}"; must be an int or one of {}'
raise ValueError(msg.format(priority, Priority.values))
wrapper.priority = level('custom', priority)
return wrapper
return decorate
very_slow = priority(Priority.very_slow)
slow = priority(Priority.slow)
normal = priority(Priority.normal)
fast = priority(Priority.fast)
very_fast = priority(Priority.very_fast)
installed = []
@ -244,18 +269,7 @@ class ManagedCallback(object):
logger.error('Error in insturment {}'.format(self.instrument.name))
global failures_detected # pylint: disable=W0603
failures_detected = True
if isinstance(e, WAError):
logger.error(e)
else:
tb = get_traceback()
logger.error(tb)
logger.error('{}({})'.format(e.__class__.__name__, e))
if not context.current_iteration:
# Error occureed outside of an iteration (most likely
# during intial setup or teardown). Since this would affect
# the rest of the run, mark the instument as broken so that
# it doesn't get re-enabled for subsequent iterations.
self.instrument.is_broken = True
log_error(e, logger)
disable(self.instrument)
@ -274,34 +288,38 @@ def install(instrument):
"""
logger.debug('Installing instrument %s.', instrument)
if is_installed(instrument):
raise ValueError('Instrument {} is already installed.'.format(instrument.name))
for attr_name in dir(instrument):
priority = 0
stripped_attr_name = attr_name
for key, value in PRIORITY_MAP.iteritems():
if attr_name.startswith(key):
stripped_attr_name = attr_name[len(key):]
priority = value
break
if stripped_attr_name in SIGNAL_MAP:
attr = getattr(instrument, attr_name)
if not callable(attr):
raise ValueError('Attribute {} not callable in {}.'.format(attr_name, instrument))
argspec = inspect.getargspec(attr)
arg_num = len(argspec.args)
# Instrument callbacks will be passed exactly two arguments: self
# (the instrument instance to which the callback is bound) and
# context. However, we also allow callbacks to capture the context
# in variable arguments (declared as "*args" in the definition).
if arg_num > 2 or (arg_num < 2 and argspec.varargs is None):
message = '{} must take exactly 2 positional arguments; {} given.'
raise ValueError(message.format(attr_name, arg_num))
logger.debug('\tConnecting %s to %s', attr.__name__, SIGNAL_MAP[stripped_attr_name])
mc = ManagedCallback(instrument, attr)
_callbacks.append(mc)
signal.connect(mc, SIGNAL_MAP[stripped_attr_name], priority=priority)
if is_installed(instrument):
msg = 'Instrument {} is already installed.'
raise ValueError(msg.format(instrument.name))
for attr_name in dir(instrument):
if attr_name not in SIGNAL_MAP:
continue
attr = getattr(instrument, attr_name)
if not callable(attr):
msg = 'Attribute {} not callable in {}.'
raise ValueError(msg.format(attr_name, instrument))
argspec = inspect.getargspec(attr)
arg_num = len(argspec.args)
# Instrument callbacks will be passed exactly two arguments: self
# (the instrument instance to which the callback is bound) and
# context. However, we also allow callbacks to capture the context
# in variable arguments (declared as "*args" in the definition).
if arg_num > 2 or (arg_num < 2 and argspec.varargs is None):
message = '{} must take exactly 2 positional arguments; {} given.'
raise ValueError(message.format(attr_name, arg_num))
priority = get_priority(attr)
logger.debug('\tConnecting %s to %s with priority %s(%d)', attr.__name__,
SIGNAL_MAP[attr_name], priority.name, priority.value)
mc = ManagedCallback(instrument, attr)
_callbacks.append(mc)
signal.connect(mc, SIGNAL_MAP[attr_name], priority=priority.value)
installed.append(instrument)
@ -385,15 +403,3 @@ class Instrument(Plugin):
self.target = target
self.is_enabled = True
self.is_broken = False
def initialize(self, context):
pass
def finalize(self, context):
pass
def __str__(self):
return self.name
def __repr__(self):
return 'Instrument({})'.format(self.name)

View File

@ -1,6 +1,6 @@
import logging
from wa.framework import pluginloader
from wa.framework import pluginloader, signal
from wa.framework.configuration.core import JobStatus
@ -11,8 +11,12 @@ class Job(object):
return self.spec.id
@property
def output_name(self):
return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration)
def label(self):
return self.spec.label
@property
def classifiers(self):
return self.spec.classifiers
def __init__(self, spec, iteration, context):
self.logger = logging.getLogger('job')
@ -25,33 +29,45 @@ class Job(object):
self.retries = 0
def load(self, target, loader=pluginloader):
self.logger.debug('Loading job {}'.format(self.id))
self.logger.info('Loading job {}'.format(self.id))
self.workload = loader.get_workload(self.spec.workload_name,
target,
**self.spec.workload_parameters)
self.workload.init_resources(self.context)
self.workload.validate()
self.status = JobStatus.LOADED
def initialize(self, context):
self.logger.info('Initializing job {}'.format(self.id))
with signal.wrap('WORKLOAD_INITIALIZED', self, context):
self.workload.initialize(context)
self.status = JobStatus.PENDING
context.update_job_state(self)
def configure_target(self, context):
self.logger.info('Configuring target for job {}'.format(self.id))
def setup(self, context):
self.logger.info('Setting up job {}'.format(self.id))
with signal.wrap('WORKLOAD_SETUP', self, context):
self.workload.setup(context)
def run(self, context):
self.logger.info('Running job {}'.format(self.id))
with signal.wrap('WORKLOAD_EXECUTION', self, context):
self.workload.run(context)
def process_output(self, context):
self.logger.info('Processing output for job {}'.format(self.id))
with signal.wrap('WORKLOAD_RESULT_UPDATE', self, context):
self.workload.update_result(context)
def teardown(self, context):
self.logger.info('Tearing down job {}'.format(self.id))
with signal.wrap('WORKLOAD_TEARDOWN', self, context):
self.workload.teardown(context)
def finalize(self, context):
self.logger.info('Finalizing job {}'.format(self.id))
with signal.wrap('WORKLOAD_FINALIZED', self, context):
self.workload.finalize(context)

View File

@ -7,71 +7,52 @@ import uuid
from copy import copy
from datetime import timedelta
from wa.framework.configuration.core import JobSpec
from wa.framework.configuration.core import JobSpec, RunStatus
from wa.framework.configuration.manager import ConfigManager
from wa.framework.run import RunState, RunInfo
from wa.framework.target.info import TargetInfo
from wa.utils.misc import touch, ensure_directory_exists
from wa.utils.serializer import write_pod, read_pod
from wa.utils.types import enum, numeric
logger = logging.getLogger('output')
class RunInfo(object):
"""
Information about the current run, such as its unique ID, run
time, etc.
class Output(object):
"""
@staticmethod
def from_pod(pod):
uid = pod.pop('uuid')
duration = pod.pop('duration')
if uid is not None:
uid = uuid.UUID(uid)
instance = RunInfo(**pod)
instance.uuid = uid
instance.duration = duration if duration is None else\
timedelta(seconds=duration)
return instance
@property
def resultfile(self):
return os.path.join(self.basepath, 'result.json')
def __init__(self, run_name=None, project=None, project_stage=None,
start_time=None, end_time=None, duration=None):
self.uuid = uuid.uuid4()
self.run_name = None
self.project = None
self.project_stage = None
self.start_time = None
self.end_time = None
self.duration = None
def __init__(self, path):
self.basepath = path
self.result = None
def to_pod(self):
d = copy(self.__dict__)
d['uuid'] = str(self.uuid)
if self.duration is None:
d['duration'] = self.duration
else:
d['duration'] = self.duration.total_seconds()
return d
def reload(self):
pod = read_pod(self.resultfile)
self.result = Result.from_pod(pod)
def write_result(self):
write_pod(self.result.to_pod(), self.resultfile)
def add_metric(self, name, value, units=None, lower_is_better=False,
classifiers=None):
self.result.add_metric(name, value, units, lower_is_better, classifiers)
def add_artifact(self, name, path, kind, description=None, classifiers=None):
if not os.path.exists(path):
msg = 'Attempting to add non-existing artifact: {}'
raise HostError(msg.format(path))
path = os.path.relpath(path, self.basepath)
if isinstance(kind, basestring):
kind = ArtifactType(kind)
self.result.add_artifact(name, path, kind, description, classifiers)
class RunState(object):
"""
Represents the state of a WA run.
"""
@staticmethod
def from_pod(pod):
return RunState()
def __init__(self):
pass
def to_pod(self):
return {}
class RunOutput(object):
class RunOutput(Output):
@property
def logfile(self):
@ -111,9 +92,11 @@ class RunOutput(object):
return ensure_directory_exists(path)
def __init__(self, path):
self.basepath = path
super(RunOutput, self).__init__(path)
self.info = None
self.state = None
self.result = None
self.jobs = None
if (not os.path.isfile(self.statefile) or
not os.path.isfile(self.infofile)):
msg = '"{}" does not exist or is not a valid WA output directory.'
@ -121,8 +104,10 @@ class RunOutput(object):
self.reload()
def reload(self):
super(RunOutput, self).reload()
self.info = RunInfo.from_pod(read_pod(self.infofile))
self.state = RunState.from_pod(read_pod(self.statefile))
# TODO: propulate the jobs from info in the state
def write_info(self):
write_pod(self.info.to_pod(), self.infofile)
@ -157,17 +142,215 @@ class RunOutput(object):
pod = read_pod(self.jobsfile)
return [JobSpec.from_pod(jp) for jp in pod['jobs']]
def move_failed(self, name, failed_name):
path = os.path.join(self.basepath, name)
def move_failed(self, job_output):
name = os.path.basename(job_output.basepath)
attempt = job_output.retry + 1
failed_name = '{}-attempt{:02}'.format(name, attempt)
failed_path = os.path.join(self.failed_dir, failed_name)
if not os.path.exists(path):
raise ValueError('Path {} does not exist'.format(path))
if os.path.exists(failed_path):
raise ValueError('Path {} already exists'.format(failed_path))
shutil.move(path, failed_path)
shutil.move(job_output.basepath, failed_path)
job_output.basepath = failed_path
def init_wa_output(path, wa_state, force=False):
class JobOutput(Output):
def __init__(self, path, id, label, iteration, retry):
self.basepath = path
self.id = id
self.label = label
self.iteration = iteration
self.retry = retry
self.result = None
self.reload()
class Result(object):
@staticmethod
def from_pod(pod):
instance = Result()
instance.metrics = [Metric.from_pod(m) for m in pod['metrics']]
instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']]
return instance
def __init__(self):
self.metrics = []
self.artifacts = []
def add_metric(self, name, value, units=None, lower_is_better=False,
classifiers=None):
metric = Metric(name, value, units, lower_is_better, classifiers)
logger.debug('Adding metric: {}'.format(metric))
self.metrics.append(metric)
def add_artifact(self, name, path, kind, description=None, classifiers=None):
artifact = Artifact(name, path, kind, description=description,
classifiers=classifiers)
logger.debug('Adding artifact: {}'.format(artifact))
self.artifacts.append(artifact)
def to_pod(self):
return dict(
metrics=[m.to_pod() for m in self.metrics],
artifacts=[a.to_pod() for a in self.artifacts],
)
ArtifactType = enum(['log', 'meta', 'data', 'export', 'raw'])
class Artifact(object):
"""
This is an artifact generated during execution/post-processing of a
workload. Unlike metrics, this represents an actual artifact, such as a
file, generated. This may be "result", such as trace, or it could be "meta
data" such as logs. These are distinguished using the ``kind`` attribute,
which also helps WA decide how it should be handled. Currently supported
kinds are:
:log: A log file. Not part of "results" as such but contains
information about the run/workload execution that be useful for
diagnostics/meta analysis.
:meta: A file containing metadata. This is not part of "results", but
contains information that may be necessary to reproduce the
results (contrast with ``log`` artifacts which are *not*
necessary).
:data: This file contains new data, not available otherwise and should
be considered part of the "results" generated by WA. Most traces
would fall into this category.
:export: Exported version of results or some other artifact. This
signifies that this artifact does not contain any new data
that is not available elsewhere and that it may be safely
discarded without losing information.
:raw: Signifies that this is a raw dump/log that is normally processed
to extract useful information and is then discarded. In a sense,
it is the opposite of ``export``, but in general may also be
discarded.
.. note:: whether a file is marked as ``log``/``data`` or ``raw``
depends on how important it is to preserve this file,
e.g. when archiving, vs how much space it takes up.
Unlike ``export`` artifacts which are (almost) always
ignored by other exporters as that would never result
in data loss, ``raw`` files *may* be processed by
exporters if they decided that the risk of losing
potentially (though unlikely) useful data is greater
than the time/space cost of handling the artifact (e.g.
a database uploader may choose to ignore ``raw``
artifacts, where as a network filer archiver may choose
to archive them).
.. note: The kind parameter is intended to represent the logical
function of a particular artifact, not it's intended means of
processing -- this is left entirely up to the result
processors.
"""
@staticmethod
def from_pod(pod):
pod['kind'] = ArtifactType(pod['kind'])
return Artifact(**pod)
def __init__(self, name, path, kind, description=None, classifiers=None):
""""
:param name: Name that uniquely identifies this artifact.
:param path: The *relative* path of the artifact. Depending on the
``level`` must be either relative to the run or iteration
output directory. Note: this path *must* be delimited
using ``/`` irrespective of the
operating system.
:param kind: The type of the artifact this is (e.g. log file, result,
etc.) this will be used a hit to result processors. This
must be one of ``'log'``, ``'meta'``, ``'data'``,
``'export'``, ``'raw'``.
:param description: A free-form description of what this artifact is.
:param classifiers: A set of key-value pairs to further classify this
metric beyond current iteration (e.g. this can be
used to identify sub-tests).
"""
self.name = name
self.path = path.replace('/', os.sep) if path is not None else path
try:
self.kind = ArtifactType(kind)
except ValueError:
msg = 'Invalid Artifact kind: {}; must be in {}'
raise ValueError(msg.format(kind, self.valid_kinds))
self.description = description
self.classifiers = classifiers or {}
def to_pod(self):
pod = copy(self.__dict__)
pod['kind'] = str(self.kind)
return pod
def __str__(self):
return self.path
def __repr__(self):
return '{} ({}): {}'.format(self.name, self.kind, self.path)
class Metric(object):
"""
This is a single metric collected from executing a workload.
:param name: the name of the metric. Uniquely identifies the metric
within the results.
:param value: The numerical value of the metric for this execution of a
workload. This can be either an int or a float.
:param units: Units for the collected value. Can be None if the value
has no units (e.g. it's a count or a standardised score).
:param lower_is_better: Boolean flag indicating where lower values are
better than higher ones. Defaults to False.
:param classifiers: A set of key-value pairs to further classify this
metric beyond current iteration (e.g. this can be used
to identify sub-tests).
"""
__slots__ = ['name', 'value', 'units', 'lower_is_better', 'classifiers']
@staticmethod
def from_pod(pod):
return Metric(**pod)
def __init__(self, name, value, units=None, lower_is_better=False,
classifiers=None):
self.name = name
self.value = numeric(value)
self.units = units
self.lower_is_better = lower_is_better
self.classifiers = classifiers or {}
def to_pod(self):
return dict(
name=self.name,
value=self.value,
units=self.units,
lower_is_better=self.lower_is_better,
classifiers=self.classifiers,
)
def __str__(self):
result = '{}: {}'.format(self.name, self.value)
if self.units:
result += ' ' + self.units
result += ' ({})'.format('-' if self.lower_is_better else '+')
return result
def __repr__(self):
text = self.__str__()
if self.classifiers:
return '<{} {}>'.format(text, self.classifiers)
else:
return '<{}>'.format(text)
def init_run_output(path, wa_state, force=False):
if os.path.exists(path):
if force:
logger.info('Removing existing output directory.')
@ -188,13 +371,20 @@ def init_wa_output(path, wa_state, force=False):
project_stage=wa_state.run_config.project_stage,
)
write_pod(info.to_pod(), os.path.join(meta_dir, 'run_info.json'))
with open(os.path.join(path, '.run_state.json'), 'w') as wfh:
wfh.write('{}')
write_pod(RunState().to_pod(), os.path.join(path, '.run_state.json'))
write_pod(Result().to_pod(), os.path.join(path, 'result.json'))
return RunOutput(path)
def init_job_output(run_output, job):
output_name = '{}-{}-{}'.format(job.id, job.spec.label, job.iteration)
path = os.path.join(run_output.basepath, output_name)
ensure_directory_exists(path)
write_pod(Result().to_pod(), os.path.join(path, 'result.json'))
return JobOutput(path, job.id, job.iteration, job.label, job.retries)
def _save_raw_config(meta_dir, state):
raw_config_dir = os.path.join(meta_dir, 'raw_config')
os.makedirs(raw_config_dir)
@ -206,5 +396,3 @@ def _save_raw_config(meta_dir, state):
dest_path = os.path.join(raw_config_dir, 'cfg{}-{}'.format(i, basename))
shutil.copy(source, dest_path)

View File

@ -279,7 +279,6 @@ class PluginMeta(type):
mcs._propagate_attributes(bases, attrs, clsname)
cls = type.__new__(mcs, clsname, bases, attrs)
mcs._setup_aliases(cls)
mcs._implement_virtual(cls, bases)
return cls
@classmethod
@ -324,48 +323,6 @@ class PluginMeta(type):
alias.plugin_name = cls.name
cls.aliases.add(alias)
@classmethod
def _implement_virtual(mcs, cls, bases):
"""
This implements automatic method propagation to the bases, so
that you don't have to do something like
super(cls, self).vmname()
This also ensures that the methods that have beend identified as
"globally virtual" are executed exactly once per WA execution, even if
invoked through instances of different subclasses
"""
methods = {}
called_globals = set()
for vmname in mcs.virtual_methods:
clsmethod = getattr(cls, vmname, None)
if clsmethod:
basemethods = [getattr(b, vmname) for b in bases
if hasattr(b, vmname)]
methods[vmname] = [bm for bm in basemethods if bm != clsmethod]
methods[vmname].append(clsmethod)
def generate_method_wrapper(vname): # pylint: disable=unused-argument
# this creates a closure with the method name so that it
# does not need to be passed to the wrapper as an argument,
# leaving the wrapper to accept exactly the same set of
# arguments as the method it is wrapping.
name__ = vmname # pylint: disable=cell-var-from-loop
def wrapper(self, *args, **kwargs):
for dm in methods[name__]:
if name__ in mcs.global_virtuals:
if dm not in called_globals:
dm(self, *args, **kwargs)
called_globals.add(dm)
else:
dm(self, *args, **kwargs)
return wrapper
setattr(cls, vmname, generate_method_wrapper(vmname))
class Plugin(object):
"""
@ -444,12 +401,6 @@ class Plugin(object):
for param in self.parameters:
param.validate(self)
def initialize(self, context):
pass
def finalize(self, context):
pass
def check_artifacts(self, context, level):
"""
Make sure that all mandatory artifacts have been generated.

View File

@ -14,342 +14,127 @@
#
import uuid
import logging
from collections import OrderedDict, Counter
from copy import copy
from datetime import datetime, timedelta
from collections import OrderedDict
from wa.framework import signal, pluginloader, log
from wa.framework.plugin import Plugin
from wa.framework.output import Status
from wa.framework.resource import ResourceResolver
from wa.framework.exception import JobError
from wa.utils import counter
from wa.utils.serializer import json
from wa.utils.misc import ensure_directory_exists as _d
from wa.utils.types import caseless_string
from wa.framework.configuration.core import RunStatus, JobStatus
class RunInfo(object):
"""
Information about the current run, such as its unique ID, run
time, etc.
class JobActor(object):
"""
@staticmethod
def from_pod(pod):
uid = pod.pop('uuid')
duration = pod.pop('duration')
if uid is not None:
uid = uuid.UUID(uid)
instance = RunInfo(**pod)
instance.uuid = uid
instance.duration = duration if duration is None else\
timedelta(seconds=duration)
return instance
def get_config(self):
return {}
def __init__(self, run_name=None, project=None, project_stage=None,
start_time=None, end_time=None, duration=None):
self.uuid = uuid.uuid4()
self.run_name = None
self.project = None
self.project_stage = None
self.start_time = None
self.end_time = None
self.duration = None
def initialize(self, context):
pass
def run(self):
pass
def finalize(self):
pass
def restart(self):
pass
def complete(self):
pass
def to_pod(self):
d = copy(self.__dict__)
d['uuid'] = str(self.uuid)
if self.duration is None:
d['duration'] = self.duration
else:
d['duration'] = self.duration.total_seconds()
return d
class RunnerJob(object):
class RunState(object):
"""
Represents the state of a WA run.
"""
@staticmethod
def from_pod(pod):
instance = RunState()
instance.status = RunStatus(pod['status'])
instance.timestamp = pod['timestamp']
jss = [JobState.from_pod(j) for j in pod['jobs']]
instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss)
return instance
@property
def status(self):
return self.output.status
def num_completed_jobs(self):
return sum(1 for js in self.jobs.itervalues()
if js.status > JobStatus.SKIPPED)
@status.setter
def status(self, value):
self.output.status = value
def __init__(self):
self.jobs = OrderedDict()
self.status = RunStatus.NEW
self.timestamp = datetime.now()
def add_job(self, job):
job_state = JobState(job.id, job.label, job.iteration, job.status)
self.jobs[(job_state.id, job_state.iteration)] = job_state
def update_job(self, job):
state = self.jobs[(job.id, job.iteration)]
state.status = job.status
state.timestamp = datetime.now()
def get_status_counts(self):
counter = Counter()
for job_state in self.jobs.itervalues():
counter[job_state.status] += 1
return counter
def to_pod(self):
return OrderedDict(
status=str(self.status),
timestamp=self.timestamp,
jobs=[j.to_pod() for j in self.jobs.itervalues()],
)
class JobState(object):
@staticmethod
def from_pod(pod):
instance = JobState(pod['id'], pod['label'], JobStatus(pod['status']))
instance.retries = pod['retries']
instance.iteration = pod['iteration']
instance.timestamp = pod['timestamp']
return instance
@property
def should_retry(self):
return self.attempt <= self.max_retries
def output_name(self):
return '{}-{}-{}'.format(self.id, self.label, self.iteration)
def __init__(self, id, actor, output, max_retries):
def __init__(self, id, label, iteration, status):
self.id = id
self.actor = actor
self.output = output
self.max_retries = max_retries
self.status = Status.NEW
self.attempt = 0
self.label = label
self.iteration = iteration
self.status = status
self.retries = 0
self.timestamp = datetime.now()
def initialize(self, context):
self.actor.initialize(context)
self.status = Status.PENDING
def run(self):
self.status = Status.RUNNING
self.attempt += 1
self.output.config = self.actor.get_config()
self.output.initialize()
self.actor.run()
self.status = Status.COMPLETE
def finalize(self):
self.actor.finalize()
def restart(self):
self.actor.restart()
def complete(self):
self.actor.complete()
__run_methods = set()
def runmethod(method):
"""
A method decorator that ensures that a method is invoked only once per run.
"""
def _method_wrapper(*args, **kwargs):
if method in __run_methods:
return
__run_methods.add(method)
ret = method(*args, **kwargs)
if ret is not None:
message = 'runmethod()\'s must return None; method "{}" returned "{}"'
raise RuntimeError(message.format(method, ret))
return _method_wrapper
def reset_runmethods():
global __run_methods
__run_methods = set()
class Runner(object):
@property
def info(self):
return self.output.info
@property
def status(self):
return self.output.status
@status.setter
def status(self, value):
self.output.status = value
@property
def jobs_pending(self):
return len(self.job_queue) > 0
@property
def current_job(self):
if self.job_queue:
return self.job_queue[0]
@property
def previous_job(self):
if self.completed_jobs:
return self.completed_jobs[-1]
@property
def next_job(self):
if len(self.job_queue) > 1:
return self.job_queue[1]
def __init__(self, output):
self.logger = logging.getLogger('runner')
self.output = output
self.context = RunContext(self)
self.status = Status.NEW
self.job_queue = []
self.completed_jobs = []
self._known_ids = set([])
def add_job(self, job_id, actor, max_retries=2):
job_id = caseless_string(job_id)
if job_id in self._known_ids:
raise JobError('Job with id "{}" already exists'.format(job_id))
output = self.output.create_job_output(job_id)
self.job_queue.append(RunnerJob(job_id, actor, output, max_retries))
self._known_ids.add(job_id)
def initialize(self):
self.logger.info('Initializing run')
self.start_time = datetime.now()
if not self.info.start_time:
self.info.start_time = self.start_time
self.info.duration = timedelta()
self.context.initialize()
for job in self.job_queue:
job.initialize(self.context)
self.persist_state()
self.logger.info('Run initialized')
def run(self):
self.status = Status.RUNNING
reset_runmethods()
signal.send(signal.RUN_STARTED, self, self.context)
self.initialize()
signal.send(signal.RUN_INITIALIZED, self, self.context)
self.run_jobs()
signal.send(signal.RUN_COMPLETED, self, self.context)
self.finalize()
signal.send(signal.RUN_FINALIZED, self, self.context)
def run_jobs(self):
try:
self.logger.info('Running jobs')
while self.jobs_pending:
self.begin_job()
log.indent()
try:
self.current_job.run()
except KeyboardInterrupt:
self.current_job.status = Status.ABORTED
signal.send(signal.JOB_ABORTED, self, self.current_job)
raise
except Exception as e:
self.current_job.status = Status.FAILED
log.log_error(e, self.logger)
signal.send(signal.JOB_FAILED, self, self.current_job)
else:
self.current_job.status = Status.COMPLETE
finally:
log.dedent()
self.complete_job()
except KeyboardInterrupt:
self.status = Status.ABORTED
while self.job_queue:
job = self.job_queue.pop(0)
job.status = RunnerJob.ABORTED
self.completed_jobs.append(job)
signal.send(signal.RUN_ABORTED, self, self)
raise
except Exception as e:
self.status = Status.FAILED
log.log_error(e, self.logger)
signal.send(signal.RUN_FAILED, self, self)
else:
self.status = Status.COMPLETE
def finalize(self):
self.logger.info('Finalizing run')
for job in self.job_queue:
job.finalize()
self.end_time = datetime.now()
self.info.end_time = self.end_time
self.info.duration += self.end_time - self.start_time
self.persist_state()
signal.send(signal.RUN_FINALIZED, self, self)
self.logger.info('Run completed')
def begin_job(self):
self.logger.info('Starting job {}'.format(self.current_job.id))
signal.send(signal.JOB_STARTED, self, self.current_job)
self.persist_state()
def complete_job(self):
if self.current_job.status == Status.FAILED:
self.output.move_failed(self.current_job.output)
if self.current_job.should_retry:
self.logger.info('Restarting job {}'.format(self.current_job.id))
self.persist_state()
self.current_job.restart()
signal.send(signal.JOB_RESTARTED, self, self.current_job)
return
self.logger.info('Completing job {}'.format(self.current_job.id))
self.current_job.complete()
self.persist_state()
signal.send(signal.JOB_COMPLETED, self, self.current_job)
job = self.job_queue.pop(0)
self.completed_jobs.append(job)
def persist_state(self):
self.output.persist()
class RunContext(object):
"""
Provides a context for instrumentation. Keeps track of things like
current workload and iteration.
"""
@property
def run_output(self):
return self.runner.output
@property
def current_job(self):
return self.runner.current_job
@property
def run_output_directory(self):
return self.run_output.output_directory
@property
def output_directory(self):
if self.runner.current_job:
return self.runner.current_job.output.output_directory
else:
return self.run_output.output_directory
@property
def info_directory(self):
return self.run_output.info_directory
@property
def config_directory(self):
return self.run_output.config_directory
@property
def failed_directory(self):
return self.run_output.failed_directory
@property
def log_file(self):
return os.path.join(self.output_directory, 'run.log')
def __init__(self, runner):
self.runner = runner
self.job = None
self.iteration = None
self.job_output = None
self.resolver = ResourceResolver()
def initialize(self):
self.resolver.load()
def get_path(self, subpath):
if self.current_job is None:
return self.run_output.get_path(subpath)
else:
return self.current_job.output.get_path(subpath)
def add_metric(self, *args, **kwargs):
if self.current_job is None:
self.run_output.add_metric(*args, **kwargs)
else:
self.current_job.output.add_metric(*args, **kwargs)
def add_artifact(self, name, path, kind, *args, **kwargs):
if self.current_job is None:
self.add_run_artifact(name, path, kind, *args, **kwargs)
else:
self.add_job_artifact(name, path, kind, *args, **kwargs)
def add_run_artifact(self, *args, **kwargs):
self.run_output.add_artifiact(*args, **kwargs)
def add_job_artifact(self, *args, **kwargs):
self.current_job.output.add_artifact(*args, **kwargs)
def get_artifact(self, name):
if self.iteration_artifacts:
for art in self.iteration_artifacts:
if art.name == name:
return art
for art in self.run_artifacts:
if art.name == name:
return art
return None
def to_pod(self):
return OrderedDict(
id=self.id,
label=self.label,
iteration=self.iteration,
status=str(self.status),
retries=0,
timestamp=self.timestamp,
)

View File

@ -22,6 +22,7 @@ that has prioritization added to handler invocation.
import logging
from contextlib import contextmanager
import wrapt
from louie import dispatcher
from wa.utils.types import prioritylist
@ -67,21 +68,26 @@ class Signal(object):
return id(self.name)
# Signals associated with run-related events
RUN_STARTED = Signal('run-started', 'sent at the beginning of the run')
RUN_INITIALIZED = Signal('run-initialized', 'set after the run has been initialized')
RUN_ABORTED = Signal('run-aborted', 'set when the run has been aborted due to a keyboard interrupt')
RUN_FAILED = Signal('run-failed', 'set if the run has failed to complete all jobs.' )
RUN_COMPLETED = Signal('run-completed', 'set upon completion of the run (regardless of whether or not it has failed')
RUN_FINALIZED = Signal('run-finalized', 'set after the run has been finalized')
RUN_COMPLETED = Signal('run-completed', 'set upon completion of the run (regardless of whether or not it has failed')
# Signals associated with job-related events
JOB_STARTED = Signal('job-started', 'set when a a new job has been started')
JOB_ABORTED = Signal('job-aborted',
description='''
sent if a job has been aborted due to a keyboard interrupt.
.. note:: While the status of every job that has not had a chance to run
due to being interrupted will be set to "ABORTED", this signal will
only be sent for the job that was actually running at the time.
.. note:: While the status of every job that has not had a
chance to run due to being interrupted will be
set to "ABORTED", this signal will only be sent
for the job that was actually running at the
time.
''')
JOB_FAILED = Signal('job-failed', description='set if the job has failed')
@ -89,6 +95,34 @@ JOB_RESTARTED = Signal('job-restarted')
JOB_COMPLETED = Signal('job-completed')
JOB_FINALIZED = Signal('job-finalized')
# Signals associated with particular stages of workload execution
BEFORE_WORKLOAD_INITIALIZED = Signal('before-workload-initialized', invert_priority=True)
SUCCESSFUL_WORKLOAD_INITIALIZED = Signal('successful-workload-initialized')
AFTER_WORKLOAD_INITIALIZED = Signal('after-workload-initialized')
BEFORE_WORKLOAD_SETUP = Signal('before-workload-setup', invert_priority=True)
SUCCESSFUL_WORKLOAD_SETUP = Signal('successful-workload-setup')
AFTER_WORKLOAD_SETUP = Signal('after-workload-setup')
BEFORE_WORKLOAD_EXECUTION = Signal('before-workload-execution', invert_priority=True)
SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution')
AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution')
BEFORE_WORKLOAD_RESULT_UPDATE = Signal('before-workload-result-update', invert_priority=True)
SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal('successful-workload-result-update')
AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update')
BEFORE_WORKLOAD_TEARDOWN = Signal('before-workload-teardown', invert_priority=True)
SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown')
AFTER_WORKLOAD_TEARDOWN = Signal('after-workload-teardown')
BEFORE_WORKLOAD_FINALIZED = Signal('before-workload-finalized', invert_priority=True)
SUCCESSFUL_WORKLOAD_FINALIZED = Signal('successful-workload-finalized')
AFTER_WORKLOAD_FINALIZED = Signal('after-workload-finalized')
# Signals indicating exceptional conditions
ERROR_LOGGED = Signal('error-logged')
WARNING_LOGGED = Signal('warning-logged')
@ -138,26 +172,6 @@ BEFORE_TARGET_DISCONNECT = Signal('before-target-disconnect', invert_priority=Tr
SUCCESSFUL_TARGET_DISCONNECT = Signal('successful-target-disconnect')
AFTER_TARGET_DISCONNECT = Signal('after-target-disconnect')
BEFORE_WORKLOAD_SETUP = Signal(
'before-workload-setup', invert_priority=True)
SUCCESSFUL_WORKLOAD_SETUP = Signal('successful-workload-setup')
AFTER_WORKLOAD_SETUP = Signal('after-workload-setup')
BEFORE_WORKLOAD_EXECUTION = Signal(
'before-workload-execution', invert_priority=True)
SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution')
AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution')
BEFORE_WORKLOAD_RESULT_UPDATE = Signal(
'before-workload-result-update', invert_priority=True)
SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal(
'successful-workload-result-update')
AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update')
BEFORE_WORKLOAD_TEARDOWN = Signal(
'before-workload-teardown', invert_priority=True)
SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown')
AFTER_WORKLOAD_TEARDOWN = Signal('after-workload-teardown')
BEFORE_OVERALL_RESULTS_PROCESSING = Signal(
'before-overall-results-process', invert_priority=True)
@ -289,7 +303,7 @@ def safe_send(signal, sender=dispatcher.Anonymous,
"""
try:
logger.debug('Safe-sending {} from {}'.format(signal, sender))
send(singnal, sender, *args, **kwargs)
send(signal, sender, *args, **kwargs)
except Exception as e:
if any(isinstance(e, p) for p in propagate):
raise e
@ -297,9 +311,10 @@ def safe_send(signal, sender=dispatcher.Anonymous,
@contextmanager
def wrap(signal_name, sender=dispatcher.Anonymous, safe=False, *args, **kwargs):
def wrap(signal_name, sender=dispatcher.Anonymous,*args, **kwargs):
"""Wraps the suite in before/after signals, ensuring
that after signal is always sent."""
safe = kwargs.pop('safe', False)
signal_name = signal_name.upper().replace('-', '_')
send_func = safe_send if safe else send
try:

View File

@ -37,7 +37,7 @@ from devlib.exception import TargetError
from devlib.utils.android import ApkInfo
from wa import Instrument, Parameter
from wa import Instrument, Parameter, very_fast
from wa.framework import signal
from wa.framework.exception import ConfigError
from wa.utils.misc import diff_tokens, write_table, check_output, as_relative
@ -206,26 +206,22 @@ class ExecutionTimeInstrument(Instrument):
"""
priority = 15
def __init__(self, target, **kwargs):
super(ExecutionTimeInstrument, self).__init__(target, **kwargs)
self.start_time = None
self.end_time = None
def on_run_start(self, context):
signal.connect(self.get_start_time, signal.BEFORE_WORKLOAD_EXECUTION, priority=self.priority)
signal.connect(self.get_stop_time, signal.AFTER_WORKLOAD_EXECUTION, priority=self.priority)
def get_start_time(self, context):
@very_fast
def start(self, context):
self.start_time = time.time()
def get_stop_time(self, context):
@very_fast
def stop(self, context):
self.end_time = time.time()
def update_result(self, context):
execution_time = self.end_time - self.start_time
context.result.add_metric('execution_time', execution_time, 'seconds')
context.add_metric('execution_time', execution_time, 'seconds')
class ApkVersion(Instrument):

View File

@ -232,6 +232,7 @@ def read_pod(source, fmt=None):
message = 'source must be a path or an open file handle; got {}'
raise ValueError(message.format(type(source)))
def write_pod(pod, dest, fmt=None):
if isinstance(dest, basestring):
with open(dest, 'w') as wfh:

View File

@ -78,9 +78,10 @@ def list_of_bools(value, interpret_strings=True):
"""
Value must be iterable. All elements will be converted to ``bool``\ s.
.. note:: By default, ``boolean()`` conversion function will be used, which means that
strings like ``"0"`` or ``"false"`` will be interpreted as ``False``. If this
is undesirable, set ``interpret_strings`` to ``False``.
.. note:: By default, ``boolean()`` conversion function will be used, which
means that strings like ``"0"`` or ``"false"`` will be
interpreted as ``False``. If this is undesirable, set
``interpret_strings`` to ``False``.
"""
if not isiterable(value):
@ -133,8 +134,8 @@ def list_or_string(value):
def list_or_caseless_string(value):
"""
Converts the value into a list of ``caseless_string``'s. If the value is not iterable
a one-element list with stringified value will be returned.
Converts the value into a list of ``caseless_string``'s. If the value is
not iterable a one-element list with stringified value will be returned.
"""
if isinstance(value, basestring):
@ -148,9 +149,10 @@ def list_or_caseless_string(value):
def list_or(type_):
"""
Generator for "list or" types. These take either a single value or a list values
and return a list of the specfied ``type_`` performing the conversion on the value
(if a single value is specified) or each of the elemented of the specified list.
Generator for "list or" types. These take either a single value or a list
values and return a list of the specfied ``type_`` performing the
conversion on the value (if a single value is specified) or each of the
elemented of the specified list.
"""
list_type = list_of(type_)
@ -176,8 +178,8 @@ none_type = type(None)
def regex(value):
"""
Regular expression. If value is a string, it will be complied with no flags. If you
want to specify flags, value must be precompiled.
Regular expression. If value is a string, it will be complied with no
flags. If you want to specify flags, value must be precompiled.
"""
if isinstance(value, regex_type):
@ -480,13 +482,13 @@ class obj_dict(MutableMapping):
class level(object):
"""
A level has a name and behaves like a string when printed,
however it also has a numeric value which is used in comparisons.
A level has a name and behaves like a string when printed, however it also
has a numeric value which is used in ordering comparisons.
"""
def __init__(self, name, value):
self.name = name
self.name = caseless_string(name)
self.value = value
def __str__(self):
@ -518,7 +520,7 @@ class level(object):
return self.value != other
def enum(args, start=0):
def enum(args, start=0, step=1):
"""
Creates a class with attributes named by the first argument.
Each attribute is a ``level`` so they behave is integers in comparisons.
@ -554,11 +556,13 @@ def enum(args, start=0):
raise ValueError('Invalid enum value: {}'.format(repr(name)))
levels = []
for i, v in enumerate(args, start):
name = string.upper(identifier(v))
lv = level(v, i)
n = start
for v in args:
name = caseless_string(identifier(v))
lv = level(v, n)
setattr(Enum, name, lv)
levels.append(lv)
n += step
setattr(Enum, 'values', levels)

View File

@ -47,19 +47,31 @@ class Dhrystone(Workload):
parameters = [
Parameter('duration', kind=int, default=0,
description='The duration, in seconds, for which dhrystone will be executed. '
'Either this or ``mloops`` should be specified but not both.'),
description='''
The duration, in seconds, for which dhrystone will be
executed. Either this or ``mloops`` should be specified but
not both.
'''),
Parameter('mloops', kind=int, default=0,
description='Millions of loops to run. Either this or ``duration`` should be '
'specified, but not both. If neither is specified, this will default '
'to ``{}``'.format(default_mloops)),
description='''
Millions of loops to run. Either this or ``duration`` should
be specified, but not both. If neither is specified, this
will default ' to ``{}``
'''.format(default_mloops)),
Parameter('threads', kind=int, default=4,
description='The number of separate dhrystone "threads" that will be forked.'),
description='''
The number of separate dhrystone "threads" that will be forked.
'''),
Parameter('delay', kind=int, default=0,
description=('The delay, in seconds, between kicking off of dhrystone '
'threads (if ``threads`` > 1).')),
description=('''
The delay, in seconds, between kicking off of dhrystone
threads (if ``threads`` > 1).
''')),
Parameter('taskset_mask', kind=int, default=0,
description='The processes spawned by sysbench will be pinned to cores as specified by this parameter'),
description='''
The processes spawned by dhrystone will be pinned to cores as
specified by this parameter.
'''),
]
def initialize(self, context):
@ -67,7 +79,10 @@ class Dhrystone(Workload):
Dhrystone.target_exe = self.target.install(host_exe)
def setup(self, context):
execution_mode = '-l {}'.format(self.mloops) if self.mloops else '-r {}'.format(self.duration)
if self.mloops:
execution_mode = '-l {}'.format(self.mloops)
else:
execution_mode = '-r {}'.format(self.duration)
if self.taskset_mask:
taskset_string = 'busybox taskset 0x{:x} '.format(self.taskset_mask)
else:
@ -76,12 +91,18 @@ class Dhrystone(Workload):
self.target_exe,
execution_mode,
self.threads, self.delay)
self.timeout = self.duration and self.duration + self.delay * self.threads + 10 or 300
if self.duration:
self.timeout = self.duration + self.delay * self.threads + 10
else:
self.timeout = 300
self.target.killall('dhrystone')
def run(self, context):
try:
self.output = self.target.execute(self.command, timeout=self.timeout, check_exit_code=False)
self.output = self.target.execute(self.command,
timeout=self.timeout,
check_exit_code=False)
except KeyboardInterrupt:
self.target.killall('dhrystone')
raise
@ -90,14 +111,18 @@ class Dhrystone(Workload):
outfile = os.path.join(context.output_directory, 'dhrystone.output')
with open(outfile, 'w') as wfh:
wfh.write(self.output)
context.add_artifact('dhrystone-output', outfile, 'raw', "dhrystone's stdout")
score_count = 0
dmips_count = 0
total_score = 0
total_dmips = 0
for line in self.output.split('\n'):
match = self.time_regex.search(line)
if match:
context.add_metric('time', float(match.group('time')), 'seconds', lower_is_better=True)
context.add_metric('time', float(match.group('time')), 'seconds',
lower_is_better=True)
else:
match = self.bm_regex.search(line)
if match:
@ -114,6 +139,7 @@ class Dhrystone(Workload):
context.add_metric(metric, value)
dmips_count += 1
total_dmips += value
context.add_metric('total DMIPS', total_dmips)
context.add_metric('total score', total_score)
@ -122,7 +148,9 @@ class Dhrystone(Workload):
def validate(self):
if self.mloops and self.duration: # pylint: disable=E0203
raise ConfigError('mloops and duration cannot be both specified at the same time for dhrystone.')
msg = 'mloops and duration cannot be both specified at the '\
'same time for dhrystone.'
raise ConfigError(msg)
if not self.mloops and not self.duration: # pylint: disable=E0203
self.mloops = self.default_mloops