1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-02-20 20:09:11 +00:00

Initial priority implementation

- Fixed up some of the signal map for instrumentation
- Changed how priorites are specified -- no longer method name prefixes
  but dedicated decorators, including an easy way of specifying a custom
  priority level (no longer need to manually connect signals)
- Updated ExecutionTimeInstrument to work with the new system
- Also removed some dead code
This commit is contained in:
Sergei Trofimov 2017-03-17 15:57:05 +00:00
parent 4287e90153
commit c5cd2b9298
8 changed files with 144 additions and 291 deletions

View File

@ -7,6 +7,7 @@ from wa.framework.exception import (ResultProcessorError, ResourceError,
from wa.framework.exception import (WAError, NotFoundError, ValidationError,
WorkloadError)
from wa.framework.exception import WorkerThreadError, PluginLoaderError
from wa.framework.instrumentation import Instrument
from wa.framework.instrumentation import (Instrument, very_slow, slow, normal, fast,
very_fast)
from wa.framework.plugin import Plugin, Parameter
from wa.framework.workload import Workload

View File

@ -189,132 +189,6 @@ class ExecutionContext(object):
classifiers=None):
self.run_output.add_artifact(name, path, kind, description, classifiers)
class OldExecutionContext(object):
"""
Provides a context for instrumentation. Keeps track of things like
current workload and iteration.
This class also provides two status members that can be used by workloads
and instrumentation to keep track of arbitrary state. ``result``
is reset on each new iteration of a workload; run_status is maintained
throughout a Workload Automation run.
"""
# These are the artifacts generated by the core framework.
default_run_artifacts = [
Artifact('runlog', 'run.log', 'log', mandatory=True,
description='The log for the entire run.'),
]
@property
def current_iteration(self):
if self.current_job:
spec_id = self.current_job.spec.id
return self.job_iteration_counts[spec_id]
else:
return None
@property
def job_status(self):
if not self.current_job:
return None
return self.current_job.result.status
@property
def workload(self):
return getattr(self.spec, 'workload', None)
@property
def spec(self):
return getattr(self.current_job, 'spec', None)
@property
def result(self):
return getattr(self.current_job, 'result', self.run_result)
def __init__(self, device_manager, config):
self.device_manager = device_manager
self.device = self.device_manager.target
self.config = config
self.reboot_policy = config.reboot_policy
self.output_directory = None
self.current_job = None
self.resolver = None
self.last_error = None
self.run_info = None
self.run_result = None
self.run_output_directory = self.config.output_directory
self.host_working_directory = self.config.meta_directory
self.iteration_artifacts = None
self.run_artifacts = copy(self.default_run_artifacts)
self.job_iteration_counts = defaultdict(int)
self.aborted = False
self.runner = None
def initialize(self):
if not os.path.isdir(self.run_output_directory):
os.makedirs(self.run_output_directory)
self.output_directory = self.run_output_directory
self.resolver = ResourceResolver(self.config)
self.run_info = RunInfo(self.config)
self.run_result = RunResult(self.run_info, self.run_output_directory)
def next_job(self, job):
"""Invoked by the runner when starting a new iteration of workload execution."""
self.current_job = job
self.job_iteration_counts[self.spec.id] += 1
if not self.aborted:
outdir_name = '_'.join(map(str, [self.spec.label, self.spec.id, self.current_iteration]))
self.output_directory = _d(os.path.join(self.run_output_directory, outdir_name))
self.iteration_artifacts = [wa for wa in self.workload.artifacts]
self.current_job.result.iteration = self.current_iteration
self.current_job.result.output_directory = self.output_directory
def end_job(self):
if self.current_job.result.status == JobStatus.ABORTED:
self.aborted = True
self.current_job = None
self.output_directory = self.run_output_directory
def add_metric(self, *args, **kwargs):
self.result.add_metric(*args, **kwargs)
def add_artifact(self, name, path, kind, *args, **kwargs):
if self.current_job is None:
self.add_run_artifact(name, path, kind, *args, **kwargs)
else:
self.add_iteration_artifact(name, path, kind, *args, **kwargs)
def add_run_artifact(self, name, path, kind, *args, **kwargs):
path = _check_artifact_path(path, self.run_output_directory)
self.run_artifacts.append(Artifact(name, path, kind, Artifact.ITERATION, *args, **kwargs))
def add_iteration_artifact(self, name, path, kind, *args, **kwargs):
path = _check_artifact_path(path, self.output_directory)
self.iteration_artifacts.append(Artifact(name, path, kind, Artifact.RUN, *args, **kwargs))
def get_artifact(self, name):
if self.iteration_artifacts:
for art in self.iteration_artifacts:
if art.name == name:
return art
for art in self.run_artifacts:
if art.name == name:
return art
return None
def _check_artifact_path(path, rootpath):
if path.startswith(rootpath):
return os.path.abspath(path)
rootpath = os.path.abspath(rootpath)
full_path = os.path.join(rootpath, path)
if not os.path.isfile(full_path):
msg = 'Cannot add artifact because {} does not exist.'
raise ValueError(msg.format(full_path))
return full_path
class Executor(object):
"""
@ -380,7 +254,10 @@ class Executor(object):
self.logger.info('Starting run')
runner = Runner(context)
signal.send(signal.RUN_STARTED, self)
runner.run()
#TODO: postamble goes here.
signal.send(signal.RUN_COMPLETED, self)
def execute_postamble(self):
"""
@ -430,7 +307,6 @@ class Runner(object):
self.config = self.context.cm
def run(self):
self.send(signal.RUN_STARTED)
try:
self.initialize_run()
self.send(signal.RUN_INITIALIZED)
@ -446,7 +322,7 @@ class Runner(object):
raise e
finally:
self.finalize_run()
self.send(signal.RUN_COMPLETED)
self.send(signal.RUN_FINALIZED)
def initialize_run(self):
self.logger.info('Initializing run')

View File

@ -105,8 +105,9 @@ from collections import OrderedDict
from wa.framework import signal
from wa.framework.plugin import Plugin
from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError
from wa.utils.log import log_error
from wa.utils.misc import get_traceback, isiterable
from wa.utils.types import identifier
from wa.utils.types import identifier, enum, level
logger = logging.getLogger('instrumentation')
@ -120,14 +121,14 @@ logger = logging.getLogger('instrumentation')
SIGNAL_MAP = OrderedDict([
# Below are "aliases" for some of the more common signals to allow
# instrumentation to have similar structure to workloads
('initialize', signal.SUCCESSFUL_RUN_INIT),
# ('setup', signal.SUCCESSFUL_WORKLOAD_SETUP),
# ('start', signal.BEFORE_WORKLOAD_EXECUTION),
# ('stop', signal.AFTER_WORKLOAD_EXECUTION),
# ('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE),
# ('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE),
# ('teardown', signal.AFTER_WORKLOAD_TEARDOWN),
# ('finalize', signal.RUN_FIN),
('initialize', signal.RUN_INITIALIZED),
('setup', signal.BEFORE_WORKLOAD_SETUP),
('start', signal.BEFORE_WORKLOAD_EXECUTION),
('stop', signal.AFTER_WORKLOAD_EXECUTION),
('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE),
('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE),
('teardown', signal.AFTER_WORKLOAD_TEARDOWN),
('finalize', signal.RUN_FINALIZED),
# ('on_run_start', signal.RUN_START),
# ('on_run_end', signal.RUN_END),
@ -171,13 +172,37 @@ SIGNAL_MAP = OrderedDict([
# ('on_warning', signal.WARNING_LOGGED),
])
PRIORITY_MAP = OrderedDict([
('very_fast_', 20),
('fast_', 10),
('normal_', 0),
('slow_', -10),
('very_slow_', -20),
])
Priority = enum(['very_slow', 'slow', 'normal', 'fast', 'very_fast'], -20, 10)
def get_priority(func):
return getattr(getattr(func, 'im_func', func),
'priority', Priority.normal)
def priority(priority):
def decorate(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.func_name = func.func_name
if priority in Priority.values:
wrapper.priority = Priority(priority)
else:
if not isinstance(priority, int):
msg = 'Invalid priorty "{}"; must be an int or one of {}'
raise ValueError(msg.format(priority, Priority.values))
wrapper.priority = level('custom', priority)
return wrapper
return decorate
very_slow = priority(Priority.very_slow)
slow = priority(Priority.slow)
normal = priority(Priority.normal)
fast = priority(Priority.fast)
very_fast = priority(Priority.very_fast)
installed = []
@ -244,18 +269,7 @@ class ManagedCallback(object):
logger.error('Error in insturment {}'.format(self.instrument.name))
global failures_detected # pylint: disable=W0603
failures_detected = True
if isinstance(e, WAError):
logger.error(e)
else:
tb = get_traceback()
logger.error(tb)
logger.error('{}({})'.format(e.__class__.__name__, e))
if not context.current_iteration:
# Error occureed outside of an iteration (most likely
# during intial setup or teardown). Since this would affect
# the rest of the run, mark the instument as broken so that
# it doesn't get re-enabled for subsequent iterations.
self.instrument.is_broken = True
log_error(e, logger)
disable(self.instrument)
@ -274,34 +288,38 @@ def install(instrument):
"""
logger.debug('Installing instrument %s.', instrument)
if is_installed(instrument):
raise ValueError('Instrument {} is already installed.'.format(instrument.name))
for attr_name in dir(instrument):
priority = 0
stripped_attr_name = attr_name
for key, value in PRIORITY_MAP.iteritems():
if attr_name.startswith(key):
stripped_attr_name = attr_name[len(key):]
priority = value
break
if stripped_attr_name in SIGNAL_MAP:
attr = getattr(instrument, attr_name)
if not callable(attr):
raise ValueError('Attribute {} not callable in {}.'.format(attr_name, instrument))
argspec = inspect.getargspec(attr)
arg_num = len(argspec.args)
# Instrument callbacks will be passed exactly two arguments: self
# (the instrument instance to which the callback is bound) and
# context. However, we also allow callbacks to capture the context
# in variable arguments (declared as "*args" in the definition).
if arg_num > 2 or (arg_num < 2 and argspec.varargs is None):
message = '{} must take exactly 2 positional arguments; {} given.'
raise ValueError(message.format(attr_name, arg_num))
logger.debug('\tConnecting %s to %s', attr.__name__, SIGNAL_MAP[stripped_attr_name])
mc = ManagedCallback(instrument, attr)
_callbacks.append(mc)
signal.connect(mc, SIGNAL_MAP[stripped_attr_name], priority=priority)
if is_installed(instrument):
msg = 'Instrument {} is already installed.'
raise ValueError(msg.format(instrument.name))
for attr_name in dir(instrument):
if attr_name not in SIGNAL_MAP:
continue
attr = getattr(instrument, attr_name)
if not callable(attr):
msg = 'Attribute {} not callable in {}.'
raise ValueError(msg.format(attr_name, instrument))
argspec = inspect.getargspec(attr)
arg_num = len(argspec.args)
# Instrument callbacks will be passed exactly two arguments: self
# (the instrument instance to which the callback is bound) and
# context. However, we also allow callbacks to capture the context
# in variable arguments (declared as "*args" in the definition).
if arg_num > 2 or (arg_num < 2 and argspec.varargs is None):
message = '{} must take exactly 2 positional arguments; {} given.'
raise ValueError(message.format(attr_name, arg_num))
priority = get_priority(attr)
logger.debug('\tConnecting %s to %s with priority %s(%d)', attr.__name__,
SIGNAL_MAP[attr_name], priority.name, priority.value)
mc = ManagedCallback(instrument, attr)
_callbacks.append(mc)
signal.connect(mc, SIGNAL_MAP[attr_name], priority=priority.value)
installed.append(instrument)
@ -385,15 +403,3 @@ class Instrument(Plugin):
self.target = target
self.is_enabled = True
self.is_broken = False
def initialize(self, context):
pass
def finalize(self, context):
pass
def __str__(self):
return self.name
def __repr__(self):
return 'Instrument({})'.format(self.name)

View File

@ -1,6 +1,6 @@
import logging
from wa.framework import pluginloader
from wa.framework import pluginloader, signal
from wa.framework.configuration.core import JobStatus
@ -38,7 +38,8 @@ class Job(object):
def initialize(self, context):
self.logger.info('Initializing job {}'.format(self.id))
self.workload.initialize(context)
with signal.wrap('WORKLOAD_INITIALIZED', self, context):
self.workload.initialize(context)
self.status = JobStatus.PENDING
context.update_job_state(self)
@ -47,21 +48,26 @@ class Job(object):
def setup(self, context):
self.logger.info('Setting up job {}'.format(self.id))
self.workload.setup(context)
with signal.wrap('WORKLOAD_SETUP', self, context):
self.workload.setup(context)
def run(self, context):
self.logger.info('Running job {}'.format(self.id))
self.workload.run(context)
with signal.wrap('WORKLOAD_EXECUTION', self, context):
self.workload.run(context)
def process_output(self, context):
self.logger.info('Processing output for job {}'.format(self.id))
self.workload.update_result(context)
with signal.wrap('WORKLOAD_RESULT_UPDATE', self, context):
self.workload.update_result(context)
def teardown(self, context):
self.logger.info('Tearing down job {}'.format(self.id))
self.workload.teardown(context)
with signal.wrap('WORKLOAD_TEARDOWN', self, context):
self.workload.teardown(context)
def finalize(self, context):
self.logger.info('Finalizing job {}'.format(self.id))
self.workload.finalize(context)
with signal.wrap('WORKLOAD_FINALIZED', self, context):
self.workload.finalize(context)

View File

@ -279,7 +279,6 @@ class PluginMeta(type):
mcs._propagate_attributes(bases, attrs, clsname)
cls = type.__new__(mcs, clsname, bases, attrs)
mcs._setup_aliases(cls)
mcs._implement_virtual(cls, bases)
return cls
@classmethod
@ -324,48 +323,6 @@ class PluginMeta(type):
alias.plugin_name = cls.name
cls.aliases.add(alias)
@classmethod
def _implement_virtual(mcs, cls, bases):
"""
This implements automatic method propagation to the bases, so
that you don't have to do something like
super(cls, self).vmname()
This also ensures that the methods that have beend identified as
"globally virtual" are executed exactly once per WA execution, even if
invoked through instances of different subclasses
"""
methods = {}
called_globals = set()
for vmname in mcs.virtual_methods:
clsmethod = getattr(cls, vmname, None)
if clsmethod:
basemethods = [getattr(b, vmname) for b in bases
if hasattr(b, vmname)]
methods[vmname] = [bm for bm in basemethods if bm != clsmethod]
methods[vmname].append(clsmethod)
def generate_method_wrapper(vname): # pylint: disable=unused-argument
# this creates a closure with the method name so that it
# does not need to be passed to the wrapper as an argument,
# leaving the wrapper to accept exactly the same set of
# arguments as the method it is wrapping.
name__ = vmname # pylint: disable=cell-var-from-loop
def wrapper(self, *args, **kwargs):
for dm in methods[name__]:
if name__ in mcs.global_virtuals:
if dm not in called_globals:
dm(self, *args, **kwargs)
called_globals.add(dm)
else:
dm(self, *args, **kwargs)
return wrapper
setattr(cls, vmname, generate_method_wrapper(vmname))
class Plugin(object):
"""
@ -444,12 +401,6 @@ class Plugin(object):
for param in self.parameters:
param.validate(self)
def initialize(self, context):
pass
def finalize(self, context):
pass
def check_artifacts(self, context, level):
"""
Make sure that all mandatory artifacts have been generated.

View File

@ -22,6 +22,7 @@ that has prioritization added to handler invocation.
import logging
from contextlib import contextmanager
import wrapt
from louie import dispatcher
from wa.utils.types import prioritylist
@ -67,21 +68,26 @@ class Signal(object):
return id(self.name)
# Signals associated with run-related events
RUN_STARTED = Signal('run-started', 'sent at the beginning of the run')
RUN_INITIALIZED = Signal('run-initialized', 'set after the run has been initialized')
RUN_ABORTED = Signal('run-aborted', 'set when the run has been aborted due to a keyboard interrupt')
RUN_FAILED = Signal('run-failed', 'set if the run has failed to complete all jobs.' )
RUN_COMPLETED = Signal('run-completed', 'set upon completion of the run (regardless of whether or not it has failed')
RUN_FINALIZED = Signal('run-finalized', 'set after the run has been finalized')
RUN_COMPLETED = Signal('run-completed', 'set upon completion of the run (regardless of whether or not it has failed')
# Signals associated with job-related events
JOB_STARTED = Signal('job-started', 'set when a a new job has been started')
JOB_ABORTED = Signal('job-aborted',
description='''
sent if a job has been aborted due to a keyboard interrupt.
.. note:: While the status of every job that has not had a chance to run
due to being interrupted will be set to "ABORTED", this signal will
only be sent for the job that was actually running at the time.
.. note:: While the status of every job that has not had a
chance to run due to being interrupted will be
set to "ABORTED", this signal will only be sent
for the job that was actually running at the
time.
''')
JOB_FAILED = Signal('job-failed', description='set if the job has failed')
@ -89,6 +95,34 @@ JOB_RESTARTED = Signal('job-restarted')
JOB_COMPLETED = Signal('job-completed')
JOB_FINALIZED = Signal('job-finalized')
# Signals associated with particular stages of workload execution
BEFORE_WORKLOAD_INITIALIZED = Signal('before-workload-initialized', invert_priority=True)
SUCCESSFUL_WORKLOAD_INITIALIZED = Signal('successful-workload-initialized')
AFTER_WORKLOAD_INITIALIZED = Signal('after-workload-initialized')
BEFORE_WORKLOAD_SETUP = Signal('before-workload-setup', invert_priority=True)
SUCCESSFUL_WORKLOAD_SETUP = Signal('successful-workload-setup')
AFTER_WORKLOAD_SETUP = Signal('after-workload-setup')
BEFORE_WORKLOAD_EXECUTION = Signal('before-workload-execution', invert_priority=True)
SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution')
AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution')
BEFORE_WORKLOAD_RESULT_UPDATE = Signal('before-workload-result-update', invert_priority=True)
SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal('successful-workload-result-update')
AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update')
BEFORE_WORKLOAD_TEARDOWN = Signal('before-workload-teardown', invert_priority=True)
SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown')
AFTER_WORKLOAD_TEARDOWN = Signal('after-workload-teardown')
BEFORE_WORKLOAD_FINALIZED = Signal('before-workload-finalized', invert_priority=True)
SUCCESSFUL_WORKLOAD_FINALIZED = Signal('successful-workload-finalized')
AFTER_WORKLOAD_FINALIZED = Signal('after-workload-finalized')
# Signals indicating exceptional conditions
ERROR_LOGGED = Signal('error-logged')
WARNING_LOGGED = Signal('warning-logged')
@ -138,26 +172,6 @@ BEFORE_TARGET_DISCONNECT = Signal('before-target-disconnect', invert_priority=Tr
SUCCESSFUL_TARGET_DISCONNECT = Signal('successful-target-disconnect')
AFTER_TARGET_DISCONNECT = Signal('after-target-disconnect')
BEFORE_WORKLOAD_SETUP = Signal(
'before-workload-setup', invert_priority=True)
SUCCESSFUL_WORKLOAD_SETUP = Signal('successful-workload-setup')
AFTER_WORKLOAD_SETUP = Signal('after-workload-setup')
BEFORE_WORKLOAD_EXECUTION = Signal(
'before-workload-execution', invert_priority=True)
SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution')
AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution')
BEFORE_WORKLOAD_RESULT_UPDATE = Signal(
'before-workload-result-update', invert_priority=True)
SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal(
'successful-workload-result-update')
AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update')
BEFORE_WORKLOAD_TEARDOWN = Signal(
'before-workload-teardown', invert_priority=True)
SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown')
AFTER_WORKLOAD_TEARDOWN = Signal('after-workload-teardown')
BEFORE_OVERALL_RESULTS_PROCESSING = Signal(
'before-overall-results-process', invert_priority=True)
@ -289,7 +303,7 @@ def safe_send(signal, sender=dispatcher.Anonymous,
"""
try:
logger.debug('Safe-sending {} from {}'.format(signal, sender))
send(singnal, sender, *args, **kwargs)
send(signal, sender, *args, **kwargs)
except Exception as e:
if any(isinstance(e, p) for p in propagate):
raise e
@ -297,9 +311,10 @@ def safe_send(signal, sender=dispatcher.Anonymous,
@contextmanager
def wrap(signal_name, sender=dispatcher.Anonymous, safe=False, *args, **kwargs):
def wrap(signal_name, sender=dispatcher.Anonymous,*args, **kwargs):
"""Wraps the suite in before/after signals, ensuring
that after signal is always sent."""
safe = kwargs.pop('safe', False)
signal_name = signal_name.upper().replace('-', '_')
send_func = safe_send if safe else send
try:

View File

@ -37,7 +37,7 @@ from devlib.exception import TargetError
from devlib.utils.android import ApkInfo
from wa import Instrument, Parameter
from wa import Instrument, Parameter, very_fast
from wa.framework import signal
from wa.framework.exception import ConfigError
from wa.utils.misc import diff_tokens, write_table, check_output, as_relative
@ -206,26 +206,22 @@ class ExecutionTimeInstrument(Instrument):
"""
priority = 15
def __init__(self, target, **kwargs):
super(ExecutionTimeInstrument, self).__init__(target, **kwargs)
self.start_time = None
self.end_time = None
def on_run_start(self, context):
signal.connect(self.get_start_time, signal.BEFORE_WORKLOAD_EXECUTION, priority=self.priority)
signal.connect(self.get_stop_time, signal.AFTER_WORKLOAD_EXECUTION, priority=self.priority)
def get_start_time(self, context):
@very_fast
def start(self, context):
self.start_time = time.time()
def get_stop_time(self, context):
@very_fast
def stop(self, context):
self.end_time = time.time()
def update_result(self, context):
execution_time = self.end_time - self.start_time
context.result.add_metric('execution_time', execution_time, 'seconds')
context.add_metric('execution_time', execution_time, 'seconds')
class ApkVersion(Instrument):

View File

@ -520,7 +520,7 @@ class level(object):
return self.value != other
def enum(args, start=0):
def enum(args, start=0, step=1):
"""
Creates a class with attributes named by the first argument.
Each attribute is a ``level`` so they behave is integers in comparisons.
@ -556,11 +556,13 @@ def enum(args, start=0):
raise ValueError('Invalid enum value: {}'.format(repr(name)))
levels = []
for i, v in enumerate(args, start):
n = start
for v in args:
name = caseless_string(identifier(v))
lv = level(v, i)
lv = level(v, n)
setattr(Enum, name, lv)
levels.append(lv)
n += step
setattr(Enum, 'values', levels)