mirror of
https://github.com/ARM-software/workload-automation.git
synced 2025-01-19 12:24:32 +00:00
512bacc1be
- Adding "classifiers" field to Metric objects. This is a dict mapping classifier names (arbitrary strings) to corresponding values for that specific metrics. This is to allow extensions to add extension-specific annotations to metric that could be handled in a generic way (e.g. by result processors). - Updating telemetry workload to add classifiers for the url and internal iteration (or "time") for a particular result. - Updating csv result processor with the option to use classifiers to add columns to results.csv (either using all classifiers found, or only for the specific ones listed).
328 lines
11 KiB
Python
328 lines
11 KiB
Python
# Copyright 2013-2015 ARM Limited
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
# pylint: disable=no-member
|
|
|
|
"""
|
|
This module defines the classes used to handle result
|
|
processing inside Workload Automation. There will be a
|
|
:class:`wlauto.core.workload.WorkloadResult` object generated for
|
|
every workload iteration executed. This object will have a list of
|
|
:class:`wlauto.core.workload.WorkloadMetric` objects. This list will be
|
|
populated by the workload itself and may also be updated by instrumentation
|
|
(e.g. to add power measurements). Once the result object has been fully
|
|
populated, it will be passed into the ``process_iteration_result`` method of
|
|
:class:`ResultProcessor`. Once the entire run has completed, a list containing
|
|
result objects from all iterations will be passed into ``process_results``
|
|
method of :class`ResultProcessor`.
|
|
|
|
Which result processors will be active is defined by the ``result_processors``
|
|
list in the ``~/.workload_automation/config.py``. Only the result_processors
|
|
who's names appear in this list will be used.
|
|
|
|
A :class:`ResultsManager` keeps track of active results processors.
|
|
|
|
"""
|
|
import logging
|
|
import traceback
|
|
from copy import copy
|
|
from contextlib import contextmanager
|
|
from datetime import datetime
|
|
|
|
from wlauto.core.extension import Extension
|
|
from wlauto.exceptions import WAError
|
|
from wlauto.utils.types import numeric
|
|
from wlauto.utils.misc import enum_metaclass
|
|
|
|
|
|
class ResultManager(object):
|
|
"""
|
|
Keeps track of result processors and passes on the results onto the individual processors.
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.logger = logging.getLogger('ResultsManager')
|
|
self.processors = []
|
|
self._bad = []
|
|
|
|
def install(self, processor):
|
|
self.logger.debug('Installing results processor %s', processor.name)
|
|
self.processors.append(processor)
|
|
|
|
def uninstall(self, processor):
|
|
if processor in self.processors:
|
|
self.logger.debug('Uninstalling results processor %s', processor.name)
|
|
self.processors.remove(processor)
|
|
else:
|
|
self.logger.warning('Attempting to uninstall results processor %s, which is not installed.',
|
|
processor.name)
|
|
|
|
def initialize(self, context):
|
|
# Errors aren't handled at this stage, because this gets executed
|
|
# before workload execution starts and we just want to propagte them
|
|
# and terminate (so that error can be corrected and WA restarted).
|
|
for processor in self.processors:
|
|
processor.initialize(context)
|
|
|
|
def add_result(self, result, context):
|
|
with self._manage_processors(context):
|
|
for processor in self.processors:
|
|
with self._handle_errors(processor):
|
|
processor.process_iteration_result(result, context)
|
|
for processor in self.processors:
|
|
with self._handle_errors(processor):
|
|
processor.export_iteration_result(result, context)
|
|
|
|
def process_run_result(self, result, context):
|
|
with self._manage_processors(context):
|
|
for processor in self.processors:
|
|
with self._handle_errors(processor):
|
|
processor.process_run_result(result, context)
|
|
for processor in self.processors:
|
|
with self._handle_errors(processor):
|
|
processor.export_run_result(result, context)
|
|
|
|
def finalize(self, context):
|
|
with self._manage_processors(context):
|
|
for processor in self.processors:
|
|
with self._handle_errors(processor):
|
|
processor.finalize(context)
|
|
|
|
def validate(self):
|
|
for processor in self.processors:
|
|
processor.validate()
|
|
|
|
@contextmanager
|
|
def _manage_processors(self, context, finalize_bad=True):
|
|
yield
|
|
for processor in self._bad:
|
|
if finalize_bad:
|
|
processor.finalize(context)
|
|
self.uninstall(processor)
|
|
self._bad = []
|
|
|
|
@contextmanager
|
|
def _handle_errors(self, processor):
|
|
try:
|
|
yield
|
|
except KeyboardInterrupt, e:
|
|
raise e
|
|
except WAError, we:
|
|
self.logger.error('"{}" result processor has encountered an error'.format(processor.name))
|
|
self.logger.error('{}("{}")'.format(we.__class__.__name__, we.message))
|
|
self._bad.append(processor)
|
|
except Exception, e: # pylint: disable=W0703
|
|
self.logger.error('"{}" result processor has encountered an error'.format(processor.name))
|
|
self.logger.error('{}("{}")'.format(e.__class__.__name__, e))
|
|
self.logger.error(traceback.format_exc())
|
|
self._bad.append(processor)
|
|
|
|
|
|
class ResultProcessor(Extension):
|
|
"""
|
|
Base class for result processors. Defines an interface that should be implemented
|
|
by the subclasses. A result processor can be used to do any kind of post-processing
|
|
of the results, from writing them out to a file, to uploading them to a database,
|
|
performing calculations, generating plots, etc.
|
|
|
|
"""
|
|
|
|
def initialize(self, context):
|
|
pass
|
|
|
|
def process_iteration_result(self, result, context):
|
|
pass
|
|
|
|
def export_iteration_result(self, result, context):
|
|
pass
|
|
|
|
def process_run_result(self, result, context):
|
|
pass
|
|
|
|
def export_run_result(self, result, context):
|
|
pass
|
|
|
|
def finalize(self, context):
|
|
pass
|
|
|
|
|
|
class RunResult(object):
|
|
"""
|
|
Contains overall results for a run.
|
|
|
|
"""
|
|
|
|
__metaclass__ = enum_metaclass('values', return_name=True)
|
|
|
|
values = [
|
|
'OK',
|
|
'OKISH',
|
|
'PARTIAL',
|
|
'FAILED',
|
|
'UNKNOWN',
|
|
]
|
|
|
|
@property
|
|
def status(self):
|
|
if not self.iteration_results or all([s.status == IterationResult.FAILED for s in self.iteration_results]):
|
|
return self.FAILED
|
|
elif any([s.status == IterationResult.FAILED for s in self.iteration_results]):
|
|
return self.PARTIAL
|
|
elif any([s.status == IterationResult.ABORTED for s in self.iteration_results]):
|
|
return self.PARTIAL
|
|
elif (any([s.status == IterationResult.PARTIAL for s in self.iteration_results]) or
|
|
self.non_iteration_errors):
|
|
return self.OKISH
|
|
elif all([s.status == IterationResult.OK for s in self.iteration_results]):
|
|
return self.OK
|
|
else:
|
|
return self.UNKNOWN # should never happen
|
|
|
|
def __init__(self, run_info, output_directory=None):
|
|
self.info = run_info
|
|
self.iteration_results = []
|
|
self.artifacts = []
|
|
self.events = []
|
|
self.non_iteration_errors = False
|
|
self.output_directory = output_directory
|
|
|
|
|
|
class RunEvent(object):
|
|
"""
|
|
An event that occured during a run.
|
|
|
|
"""
|
|
def __init__(self, message):
|
|
self.timestamp = datetime.utcnow()
|
|
self.message = message
|
|
|
|
def to_dict(self):
|
|
return copy(self.__dict__)
|
|
|
|
def __str__(self):
|
|
return '{} {}'.format(self.timestamp, self.message)
|
|
|
|
__repr__ = __str__
|
|
|
|
|
|
class IterationResult(object):
|
|
"""
|
|
Contains the result of running a single iteration of a workload. It is the
|
|
responsibility of a workload to instantiate a IterationResult, populate it,
|
|
and return it form its get_result() method.
|
|
|
|
Status explanations:
|
|
|
|
:NOT_STARTED: This iteration has not yet started.
|
|
:RUNNING: This iteration is currently running and no errors have been detected.
|
|
:OK: This iteration has completed and no errors have been detected
|
|
:PARTIAL: One or more instruments have failed (the iteration may still be running).
|
|
:FAILED: The workload itself has failed.
|
|
:ABORTED: The user interupted the workload
|
|
:SKIPPED: The iteration was skipped due to a previous failure
|
|
|
|
"""
|
|
|
|
__metaclass__ = enum_metaclass('values', return_name=True)
|
|
|
|
values = [
|
|
'NOT_STARTED',
|
|
'RUNNING',
|
|
|
|
'OK',
|
|
'NONCRITICAL',
|
|
'PARTIAL',
|
|
'FAILED',
|
|
'ABORTED',
|
|
'SKIPPED',
|
|
]
|
|
|
|
def __init__(self, spec):
|
|
self.spec = spec
|
|
self.id = spec.id
|
|
self.workload = spec.workload
|
|
self.iteration = None
|
|
self.status = self.NOT_STARTED
|
|
self.output_directory = None
|
|
self.events = []
|
|
self.metrics = []
|
|
self.artifacts = []
|
|
|
|
def add_metric(self, name, value, units=None, lower_is_better=False, classifiers=None):
|
|
self.metrics.append(Metric(name, value, units, lower_is_better, classifiers))
|
|
|
|
def has_metric(self, name):
|
|
for metric in self.metrics:
|
|
if metric.name == name:
|
|
return True
|
|
return False
|
|
|
|
def add_event(self, message):
|
|
self.events.append(RunEvent(message))
|
|
|
|
def to_dict(self):
|
|
d = copy(self.__dict__)
|
|
d['events'] = [e.to_dict() for e in self.events]
|
|
return d
|
|
|
|
def __iter__(self):
|
|
return iter(self.metrics)
|
|
|
|
def __getitem__(self, name):
|
|
for metric in self.metrics:
|
|
if metric.name == name:
|
|
return metric
|
|
raise KeyError('Metric {} not found.'.format(name))
|
|
|
|
|
|
class Metric(object):
|
|
"""
|
|
This is a single metric collected from executing a workload.
|
|
|
|
:param name: the name of the metric. Uniquely identifies the metric
|
|
within the results.
|
|
:param value: The numerical value of the metric for this execution of
|
|
a workload. This can be either an int or a float.
|
|
:param units: Units for the collected value. Can be None if the value
|
|
has no units (e.g. it's a count or a standardised score).
|
|
:param lower_is_better: Boolean flag indicating where lower values are
|
|
better than higher ones. Defaults to False.
|
|
:param classifiers: A set of key-value pairs to further classify this metric
|
|
beyond current iteration (e.g. this can be used to identify
|
|
sub-tests).
|
|
|
|
"""
|
|
|
|
def __init__(self, name, value, units=None, lower_is_better=False, classifiers=None):
|
|
self.name = name
|
|
self.value = numeric(value)
|
|
self.units = units
|
|
self.lower_is_better = lower_is_better
|
|
self.classifiers = classifiers or {}
|
|
|
|
def to_dict(self):
|
|
return self.__dict__
|
|
|
|
def __str__(self):
|
|
result = '{}: {}'.format(self.name, self.value)
|
|
if self.units:
|
|
result += ' ' + self.units
|
|
result += ' ({})'.format('-' if self.lower_is_better else '+')
|
|
return '<{}>'.format(result)
|
|
|
|
__repr__ = __str__
|
|
|