mirror of
https://github.com/ARM-software/workload-automation.git
synced 2025-01-19 04:21:17 +00:00
363 lines
13 KiB
Python
363 lines
13 KiB
Python
|
import os
|
||
|
import shutil
|
||
|
import logging
|
||
|
import uuid
|
||
|
from copy import copy
|
||
|
from datetime import datetime, timedelta
|
||
|
|
||
|
from wa.framework import signal, log
|
||
|
from wa.framework.configuration.core import merge_config_values
|
||
|
from wa.utils import serializer
|
||
|
from wa.utils.misc import enum_metaclass, ensure_directory_exists as _d
|
||
|
from wa.utils.types import numeric
|
||
|
|
||
|
|
||
|
class Status(object):
|
||
|
|
||
|
__metaclass__ = enum_metaclass('values', return_name=True)
|
||
|
|
||
|
values = [
|
||
|
'NEW',
|
||
|
'PENDING',
|
||
|
'RUNNING',
|
||
|
'COMPLETE',
|
||
|
'OK',
|
||
|
'OKISH',
|
||
|
'NONCRITICAL',
|
||
|
'PARTIAL',
|
||
|
'FAILED',
|
||
|
'ABORTED',
|
||
|
'SKIPPED',
|
||
|
'UNKNOWN',
|
||
|
]
|
||
|
|
||
|
|
||
|
class WAOutput(object):
|
||
|
|
||
|
basename = '.wa-output'
|
||
|
|
||
|
@classmethod
|
||
|
def load(cls, source):
|
||
|
if os.path.isfile(source):
|
||
|
pod = serializer.load(source)
|
||
|
elif os.path.isdir(source):
|
||
|
pod = serializer.load(os.path.join(source, cls.basename))
|
||
|
else:
|
||
|
message = 'Cannot load {} from {}'
|
||
|
raise ValueError(message.format(cls.__name__, source))
|
||
|
return cls.from_pod(pod)
|
||
|
|
||
|
@classmethod
|
||
|
def from_pod(cls, pod):
|
||
|
instance = cls(pod['output_directory'])
|
||
|
instance.status = pod['status']
|
||
|
instance.metrics = [Metric.from_pod(m) for m in pod['metrics']]
|
||
|
instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']]
|
||
|
instance.events = [RunEvent.from_pod(e) for e in pod['events']]
|
||
|
instance.classifiers = pod['classifiers']
|
||
|
return instance
|
||
|
|
||
|
def __init__(self, output_directory):
|
||
|
self.logger = logging.getLogger('output')
|
||
|
self.output_directory = output_directory
|
||
|
self.status = Status.UNKNOWN
|
||
|
self.classifiers = {}
|
||
|
self.metrics = []
|
||
|
self.artifacts = []
|
||
|
self.events = []
|
||
|
|
||
|
def initialize(self, overwrite=False):
|
||
|
if os.path.exists(self.output_directory):
|
||
|
if not overwrite:
|
||
|
raise RuntimeError('"{}" already exists.'.format(self.output_directory))
|
||
|
self.logger.info('Removing existing output directory.')
|
||
|
shutil.rmtree(self.output_directory)
|
||
|
self.logger.debug('Creating output directory {}'.format(self.output_directory))
|
||
|
os.makedirs(self.output_directory)
|
||
|
|
||
|
def add_metric(self, name, value, units=None, lower_is_better=False, classifiers=None):
|
||
|
classifiers = merge_config_values(self.classifiers, classifiers or {})
|
||
|
self.metrics.append(Metric(name, value, units, lower_is_better, classifiers))
|
||
|
|
||
|
def add_artifact(self, name, path, kind, *args, **kwargs):
|
||
|
path = _check_artifact_path(path, self.output_directory)
|
||
|
self.artifacts.append(Artifact(name, path, kind, Artifact.RUN, *args, **kwargs))
|
||
|
|
||
|
def get_path(self, subpath):
|
||
|
return os.path.join(self.output_directory, subpath)
|
||
|
|
||
|
def to_pod(self):
|
||
|
return {
|
||
|
'output_directory': self.output_directory,
|
||
|
'status': self.status,
|
||
|
'metrics': [m.to_pod() for m in self.metrics],
|
||
|
'artifacts': [a.to_pod() for a in self.artifacts],
|
||
|
'events': [e.to_pod() for e in self.events],
|
||
|
'classifiers': copy(self.classifiers),
|
||
|
}
|
||
|
|
||
|
def persist(self):
|
||
|
statefile = os.path.join(self.output_directory, self.basename)
|
||
|
with open(statefile, 'wb') as wfh:
|
||
|
serializer.dump(self, wfh)
|
||
|
|
||
|
|
||
|
class RunInfo(object):
|
||
|
|
||
|
default_name_format = 'wa-run-%y%m%d-%H%M%S'
|
||
|
|
||
|
def __init__(self, project=None, project_stage=None, name=None):
|
||
|
self.uuid = uuid.uuid4()
|
||
|
self.project = project
|
||
|
self.project_stage = project_stage
|
||
|
self.name = name or datetime.now().strftime(self.default_name_format)
|
||
|
self.start_time = None
|
||
|
self.end_time = None
|
||
|
self.duration = None
|
||
|
|
||
|
@staticmethod
|
||
|
def from_pod(pod):
|
||
|
instance = RunInfo()
|
||
|
instance.uuid = uuid.UUID(pod['uuid'])
|
||
|
instance.project = pod['project']
|
||
|
instance.project_stage = pod['project_stage']
|
||
|
instance.name = pod['name']
|
||
|
instance.start_time = pod['start_time']
|
||
|
instance.end_time = pod['end_time']
|
||
|
instance.duration = timedelta(seconds=pod['duration'])
|
||
|
return instance
|
||
|
|
||
|
def to_pod(self):
|
||
|
d = copy(self.__dict__)
|
||
|
d['uuid'] = str(self.uuid)
|
||
|
d['duration'] = self.duration.days * 3600 * 24 + self.duration.seconds
|
||
|
return d
|
||
|
|
||
|
|
||
|
class RunOutput(WAOutput):
|
||
|
|
||
|
@property
|
||
|
def info_directory(self):
|
||
|
return _d(os.path.join(self.output_directory, '_info'))
|
||
|
|
||
|
@property
|
||
|
def config_directory(self):
|
||
|
return _d(os.path.join(self.output_directory, '_config'))
|
||
|
|
||
|
@property
|
||
|
def failed_directory(self):
|
||
|
return _d(os.path.join(self.output_directory, '_failed'))
|
||
|
|
||
|
@property
|
||
|
def log_file(self):
|
||
|
return os.path.join(self.output_directory, 'run.log')
|
||
|
|
||
|
@classmethod
|
||
|
def from_pod(cls, pod):
|
||
|
instance = WAOutput.from_pod(pod)
|
||
|
instance.info = RunInfo.from_pod(pod['info'])
|
||
|
instance.jobs = [JobOutput.from_pod(i) for i in pod['jobs']]
|
||
|
instance.failed = [JobOutput.from_pod(i) for i in pod['failed']]
|
||
|
return instance
|
||
|
|
||
|
def __init__(self, output_directory):
|
||
|
super(RunOutput, self).__init__(output_directory)
|
||
|
self.logger = logging.getLogger('output')
|
||
|
self.info = RunInfo()
|
||
|
self.jobs = []
|
||
|
self.failed = []
|
||
|
|
||
|
def initialize(self, overwrite=False):
|
||
|
super(RunOutput, self).initialize(overwrite)
|
||
|
log.add_file(self.log_file)
|
||
|
self.add_artifact('runlog', self.log_file, 'log')
|
||
|
|
||
|
def create_job_output(self, id):
|
||
|
outdir = os.path.join(self.output_directory, id)
|
||
|
job_output = JobOutput(outdir)
|
||
|
self.jobs.append(job_output)
|
||
|
return job_output
|
||
|
|
||
|
def move_failed(self, job_output):
|
||
|
basename = os.path.basename(job_output.output_directory)
|
||
|
i = 1
|
||
|
dest = os.path.join(self.failed_directory, basename + '-{}'.format(i))
|
||
|
while os.path.exists(dest):
|
||
|
i += 1
|
||
|
dest = '{}-{}'.format(dest[:-2], i)
|
||
|
shutil.move(job_output.output_directory, dest)
|
||
|
|
||
|
def to_pod(self):
|
||
|
pod = super(RunOutput, self).to_pod()
|
||
|
pod['info'] = self.info.to_pod()
|
||
|
pod['jobs'] = [i.to_pod() for i in self.jobs]
|
||
|
pod['failed'] = [i.to_pod() for i in self.failed]
|
||
|
return pod
|
||
|
|
||
|
|
||
|
class JobOutput(WAOutput):
|
||
|
|
||
|
def add_artifact(self, name, path, kind, *args, **kwargs):
|
||
|
path = _check_artifact_path(path, self.output_directory)
|
||
|
self.artifacts.append(Artifact(name, path, kind, Artifact.ITERATION, *args, **kwargs))
|
||
|
|
||
|
|
||
|
class Artifact(object):
|
||
|
"""
|
||
|
This is an artifact generated during execution/post-processing of a workload.
|
||
|
Unlike metrics, this represents an actual artifact, such as a file, generated.
|
||
|
This may be "result", such as trace, or it could be "meta data" such as logs.
|
||
|
These are distinguished using the ``kind`` attribute, which also helps WA decide
|
||
|
how it should be handled. Currently supported kinds are:
|
||
|
|
||
|
:log: A log file. Not part of "results" as such but contains information about the
|
||
|
run/workload execution that be useful for diagnostics/meta analysis.
|
||
|
:meta: A file containing metadata. This is not part of "results", but contains
|
||
|
information that may be necessary to reproduce the results (contrast with
|
||
|
``log`` artifacts which are *not* necessary).
|
||
|
:data: This file contains new data, not available otherwise and should be considered
|
||
|
part of the "results" generated by WA. Most traces would fall into this category.
|
||
|
:export: Exported version of results or some other artifact. This signifies that
|
||
|
this artifact does not contain any new data that is not available
|
||
|
elsewhere and that it may be safely discarded without losing information.
|
||
|
:raw: Signifies that this is a raw dump/log that is normally processed to extract
|
||
|
useful information and is then discarded. In a sense, it is the opposite of
|
||
|
``export``, but in general may also be discarded.
|
||
|
|
||
|
.. note:: whether a file is marked as ``log``/``data`` or ``raw`` depends on
|
||
|
how important it is to preserve this file, e.g. when archiving, vs
|
||
|
how much space it takes up. Unlike ``export`` artifacts which are
|
||
|
(almost) always ignored by other exporters as that would never result
|
||
|
in data loss, ``raw`` files *may* be processed by exporters if they
|
||
|
decided that the risk of losing potentially (though unlikely) useful
|
||
|
data is greater than the time/space cost of handling the artifact (e.g.
|
||
|
a database uploader may choose to ignore ``raw`` artifacts, where as a
|
||
|
network filer archiver may choose to archive them).
|
||
|
|
||
|
.. note: The kind parameter is intended to represent the logical function of a particular
|
||
|
artifact, not it's intended means of processing -- this is left entirely up to the
|
||
|
result processors.
|
||
|
|
||
|
"""
|
||
|
|
||
|
RUN = 'run'
|
||
|
ITERATION = 'iteration'
|
||
|
|
||
|
valid_kinds = ['log', 'meta', 'data', 'export', 'raw']
|
||
|
|
||
|
@staticmethod
|
||
|
def from_pod(pod):
|
||
|
return Artifact(**pod)
|
||
|
|
||
|
def __init__(self, name, path, kind, level=RUN, mandatory=False, description=None):
|
||
|
""""
|
||
|
:param name: Name that uniquely identifies this artifact.
|
||
|
:param path: The *relative* path of the artifact. Depending on the ``level``
|
||
|
must be either relative to the run or iteration output directory.
|
||
|
Note: this path *must* be delimited using ``/`` irrespective of the
|
||
|
operating system.
|
||
|
:param kind: The type of the artifact this is (e.g. log file, result, etc.) this
|
||
|
will be used a hit to result processors. This must be one of ``'log'``,
|
||
|
``'meta'``, ``'data'``, ``'export'``, ``'raw'``.
|
||
|
:param level: The level at which the artifact will be generated. Must be either
|
||
|
``'iteration'`` or ``'run'``.
|
||
|
:param mandatory: Boolean value indicating whether this artifact must be present
|
||
|
at the end of result processing for its level.
|
||
|
:param description: A free-form description of what this artifact is.
|
||
|
|
||
|
"""
|
||
|
if kind not in self.valid_kinds:
|
||
|
raise ValueError('Invalid Artifact kind: {}; must be in {}'.format(kind, self.valid_kinds))
|
||
|
self.name = name
|
||
|
self.path = path.replace('/', os.sep) if path is not None else path
|
||
|
self.kind = kind
|
||
|
self.level = level
|
||
|
self.mandatory = mandatory
|
||
|
self.description = description
|
||
|
|
||
|
def exists(self, context):
|
||
|
"""Returns ``True`` if artifact exists within the specified context, and
|
||
|
``False`` otherwise."""
|
||
|
fullpath = os.path.join(context.output_directory, self.path)
|
||
|
return os.path.exists(fullpath)
|
||
|
|
||
|
def to_pod(self):
|
||
|
return copy(self.__dict__)
|
||
|
|
||
|
|
||
|
class RunEvent(object):
|
||
|
"""
|
||
|
An event that occured during a run.
|
||
|
|
||
|
"""
|
||
|
|
||
|
@staticmethod
|
||
|
def from_pod(pod):
|
||
|
instance = RunEvent(pod['message'])
|
||
|
instance.timestamp = pod['timestamp']
|
||
|
return instance
|
||
|
|
||
|
def __init__(self, message):
|
||
|
self.timestamp = datetime.utcnow()
|
||
|
self.message = message
|
||
|
|
||
|
def to_pod(self):
|
||
|
return copy(self.__dict__)
|
||
|
|
||
|
def __str__(self):
|
||
|
return '{} {}'.format(self.timestamp, self.message)
|
||
|
|
||
|
__repr__ = __str__
|
||
|
|
||
|
|
||
|
class Metric(object):
|
||
|
"""
|
||
|
This is a single metric collected from executing a workload.
|
||
|
|
||
|
:param name: the name of the metric. Uniquely identifies the metric
|
||
|
within the results.
|
||
|
:param value: The numerical value of the metric for this execution of
|
||
|
a workload. This can be either an int or a float.
|
||
|
:param units: Units for the collected value. Can be None if the value
|
||
|
has no units (e.g. it's a count or a standardised score).
|
||
|
:param lower_is_better: Boolean flag indicating where lower values are
|
||
|
better than higher ones. Defaults to False.
|
||
|
:param classifiers: A set of key-value pairs to further classify this metric
|
||
|
beyond current iteration (e.g. this can be used to identify
|
||
|
sub-tests).
|
||
|
|
||
|
"""
|
||
|
|
||
|
@staticmethod
|
||
|
def from_pod(pod):
|
||
|
return Metric(**pod)
|
||
|
|
||
|
def __init__(self, name, value, units=None, lower_is_better=False, classifiers=None):
|
||
|
self.name = name
|
||
|
self.value = numeric(value)
|
||
|
self.units = units
|
||
|
self.lower_is_better = lower_is_better
|
||
|
self.classifiers = classifiers or {}
|
||
|
|
||
|
def to_pod(self):
|
||
|
return copy(self.__dict__)
|
||
|
|
||
|
def __str__(self):
|
||
|
result = '{}: {}'.format(self.name, self.value)
|
||
|
if self.units:
|
||
|
result += ' ' + self.units
|
||
|
result += ' ({})'.format('-' if self.lower_is_better else '+')
|
||
|
return '<{}>'.format(result)
|
||
|
|
||
|
__repr__ = __str__
|
||
|
|
||
|
|
||
|
def _check_artifact_path(path, rootpath):
|
||
|
if path.startswith(rootpath):
|
||
|
return os.path.abspath(path)
|
||
|
rootpath = os.path.abspath(rootpath)
|
||
|
full_path = os.path.join(rootpath, path)
|
||
|
if not os.path.isfile(full_path):
|
||
|
raise ValueError('Cannot add artifact because {} does not exist.'.format(full_path))
|
||
|
return full_path
|