1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-31 02:01:16 +00:00

Implment output processing

- Implemented result processor infrastructured
- Corrected some status tracking issues (differed between states
  and output).
- Added "csv" and "status" result processors (these will be the default
  enabled).
This commit is contained in:
Sergei Trofimov 2017-03-20 16:24:22 +00:00
parent ff990da96c
commit 31a5e5b5fe
11 changed files with 424 additions and 38 deletions

View File

@ -1,6 +1,7 @@
from wa.framework import pluginloader, log, signal
from wa.framework.command import Command
from wa.framework.configuration import settings
from wa.framework.configuration.core import Status
from wa.framework.exception import HostError, JobError, InstrumentError, ConfigError
from wa.framework.exception import (ResultProcessorError, ResourceError,
CommandError, ToolError)
@ -10,4 +11,5 @@ from wa.framework.exception import WorkerThreadError, PluginLoaderError
from wa.framework.instrumentation import (Instrument, very_slow, slow, normal, fast,
very_fast)
from wa.framework.plugin import Plugin, Parameter
from wa.framework.processor import ResultProcessor
from wa.framework.workload import Workload

View File

@ -32,11 +32,10 @@ KIND_MAP = {
dict: OrderedDict,
}
RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING',
'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
Status = enum(['UNKNOWN', 'NEW', 'PENDING',
'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING',
'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
JobStatus = enum(['NEW', 'PENDING', 'RUNNING',
'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
##########################
@ -551,8 +550,7 @@ class MetaConfiguration(Configuration):
'wa.commands',
'wa.workloads',
'wa.instrumentation',
#'wa.result_processors',
#'wa.managers',
'wa.processors',
'wa.framework.target.descriptor',
'wa.framework.resource_getters',
]
@ -741,9 +739,9 @@ class RunConfiguration(Configuration):
),
ConfigurationPoint(
'retry_on_status',
kind=list_of(JobStatus),
kind=list_of(Status),
default=['FAILED', 'PARTIAL'],
allowed_values=JobStatus.values,
allowed_values=Status.values[Status.RUNNING.value:],
description='''
This is list of statuses on which a job will be cosidered to have
failed and will be automatically retried up to ``max_retries``
@ -774,6 +772,17 @@ class RunConfiguration(Configuration):
.. note:: this number does not include the original attempt
''',
),
ConfigurationPoint(
'result_processors',
kind=toggle_set,
default=['csv', 'status'],
description='''
The list of output processors to be used for this run. Output processors
post-process data generated by workloads and instruments, e.g. to
generate additional reports, format the output in a certain way, or
export the output to an exeternal location.
''',
),
]
configuration = {cp.name: cp for cp in config_points + meta_data}

View File

@ -4,7 +4,7 @@ from itertools import izip_longest, groupby, chain
from wa.framework import pluginloader
from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration,
JobGenerator, JobStatus, settings)
JobGenerator, Status, settings)
from wa.framework.configuration.parsers import ConfigParser
from wa.framework.configuration.plugin_cache import PluginCache
from wa.framework.exception import NotFoundError
@ -88,12 +88,23 @@ class ConfigManager(object):
for name in self.enabled_instruments:
try:
instruments.append(self.get_plugin(name, kind='instrument',
target=target))
target=target))
except NotFoundError:
msg = 'Instrument "{}" not found'
raise NotFoundError(msg.format(name))
return instruments
def get_processors(self):
processors = []
for name in self.run_config.result_processors:
try:
proc = self.plugin_cache.get_plugin(name, kind='result_processor')
except NotFoundError:
msg = 'Result processor "{}" not found'
raise NotFoundError(msg.format(name))
processors.append(proc)
return processors
def finalize(self):
if not self.agenda:
msg = 'Attempting to finalize config before agenda has been set'

View File

@ -28,12 +28,13 @@ from itertools import izip_longest
import wa.framework.signal as signal
from wa.framework import instrumentation, pluginloader
from wa.framework.configuration.core import settings, RunStatus, JobStatus
from wa.framework.configuration.core import settings, Status
from wa.framework.exception import (WAError, ConfigError, TimeoutError,
InstrumentError, TargetError,
TargetNotRespondingError)
from wa.framework.output import init_job_output
from wa.framework.plugin import Artifact
from wa.framework.processor import ProcessorManager
from wa.framework.resource import ResourceResolver
from wa.framework.run import RunState
from wa.framework.target.info import TargetInfo
@ -95,22 +96,35 @@ class ExecutionContext(object):
self.tm = tm
self.run_output = output
self.run_state = output.state
self.target_info = self.tm.get_target_info()
self.logger.debug('Loading resource discoverers')
self.resolver = ResourceResolver(cm)
self.resolver.load()
self.job_queue = None
self.completed_jobs = None
self.current_job = None
self.successful_jobs = 0
self.failed_jobs = 0
def start_run(self):
self.output.info.start_time = datetime.now()
self.output.write_info()
self.job_queue = copy(self.cm.jobs)
self.completed_jobs = []
self.run_state.status = RunStatus.STARTED
self.run_state.status = Status.STARTED
self.output.status = Status.STARTED
self.output.write_state()
def end_run(self):
if self.successful_jobs:
if self.failed_jobs:
status = Satus.PARTIAL
else:
status = Status.OK
else:
status = Status.FAILED
self.run_state.status = status
self.output.status = status
self.output.info.end_time = datetime.now()
self.output.info.duration = self.output.info.end_time -\
self.output.info.start_time
@ -144,7 +158,7 @@ class ExecutionContext(object):
def skip_remaining_jobs(self):
while self.job_queue:
job = self.job_queue.pop(0)
job.status = JobStatus.SKIPPED
job.status = Status.SKIPPED
self.run_state.update_job(job)
self.completed_jobs.append(job)
self.write_state()
@ -166,6 +180,9 @@ class ExecutionContext(object):
classifiers=None):
self.run_output.add_artifact(name, path, kind, description, classifiers)
def add_event(self, message):
self.output.add_event(message)
class Executor(object):
"""
@ -228,8 +245,14 @@ class Executor(object):
instrumentation.install(instrument)
instrumentation.validate()
self.logger.info('Installing result processors')
pm = ProcessorManager()
for proc in config_manager.get_processors():
pm.install(proc)
pm.validate()
self.logger.info('Starting run')
runner = Runner(context)
runner = Runner(context, pm)
signal.send(signal.RUN_STARTED, self)
runner.run()
self.execute_postamble(context, output)
@ -244,7 +267,7 @@ class Executor(object):
counter = context.run_state.get_status_counts()
parts = []
for status in reversed(JobStatus.values):
for status in reversed(Status.values):
if status in counter:
parts.append('{} {}'.format(counter[status], status))
self.logger.info(status_summary + ', '.join(parts))
@ -272,9 +295,10 @@ class Runner(object):
"""
def __init__(self, context):
def __init__(self, context, pm):
self.logger = logging.getLogger('runner')
self.context = context
self.pm = pm
self.output = self.context.output
self.config = self.context.cm
@ -290,6 +314,7 @@ class Runner(object):
except KeyboardInterrupt:
self.context.skip_remaining_jobs()
except Exception as e:
self.context.add_event(e.message)
if (not getattr(e, 'logged', None) and
not isinstance(e, KeyboardInterrupt)):
log.log_error(e, self.logger)
@ -302,6 +327,7 @@ class Runner(object):
def initialize_run(self):
self.logger.info('Initializing run')
self.context.start_run()
self.pm.initialize()
log.indent()
for job in self.context.job_queue:
job.initialize(self.context)
@ -311,6 +337,9 @@ class Runner(object):
def finalize_run(self):
self.logger.info('Finalizing run')
self.context.end_run()
self.pm.process_run_output(self.context)
self.pm.export_run_output(self.context)
self.pm.finalize()
def run_next_job(self, context):
job = context.start_job()
@ -319,12 +348,13 @@ class Runner(object):
try:
log.indent()
self.do_run_job(job, context)
job.status = JobStatus.OK
job.status = Status.OK
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
job.status = Status.ABORTED
raise
except Exception as e:
job.status = JobStatus.FAILED
job.status = Status.FAILED
context.add_event(e.message)
if not getattr(e, 'logged', None):
log.log_error(e, self.logger)
e.logged = True
@ -337,7 +367,7 @@ class Runner(object):
self.check_job(job)
def do_run_job(self, job, context):
job.status = JobStatus.RUNNING
job.status = Status.RUNNING
self.send(signal.JOB_STARTED)
with signal.wrap('JOB_TARGET_CONFIG', self):
@ -353,15 +383,18 @@ class Runner(object):
try:
with signal.wrap('JOB_OUTPUT_PROCESSED', self):
job.process_output(context)
self.pm.process_job_output(context)
self.pm.export_job_output(context)
except Exception:
job.status = JobStatus.PARTIAL
job.status = Status.PARTIAL
raise
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
job.status = Status.ABORTED
self.logger.info('Got CTRL-C. Aborting.')
raise
except Exception as e:
job.status = JobStatus.FAILED
job.status = Status.FAILED
if not getattr(e, 'logged', None):
log.log_error(e, self.logger)
e.logged = True
@ -380,15 +413,17 @@ class Runner(object):
self.logger.error(msg.format(job.id, job.status, job.iteration))
self.context.move_failed(job)
job.retries += 1
job.status = JobStatus.PENDING
job.status = Status.PENDING
self.context.job_queue.insert(0, job)
self.context.write_state()
else:
msg = 'Job {} iteration {} completed with status {}. '\
'Max retries exceeded.'
self.logger.error(msg.format(job.id, job.status, job.iteration))
self.context.failed_jobs += 1
else: # status not in retry_on_status
self.logger.info('Job completed with status {}'.format(job.status))
self.context.successful_jobs += 1
def send(self, s):
signal.send(s, self, self.context)

View File

@ -1,7 +1,7 @@
import logging
from wa.framework import pluginloader, signal
from wa.framework.configuration.core import JobStatus
from wa.framework.configuration.core import Status
class Job(object):
@ -18,15 +18,25 @@ class Job(object):
def classifiers(self):
return self.spec.classifiers
@property
def status(self):
return self._status
@status.setter
def status(self, value):
self._status = value
if self.output:
self.output.status = value
def __init__(self, spec, iteration, context):
self.logger = logging.getLogger('job')
self.spec = spec
self.iteration = iteration
self.context = context
self.status = JobStatus.NEW
self.workload = None
self.output = None
self.retries = 0
self._status = Status.NEW
def load(self, target, loader=pluginloader):
self.logger.info('Loading job {}'.format(self.id))
@ -40,7 +50,7 @@ class Job(object):
self.logger.info('Initializing job {}'.format(self.id))
with signal.wrap('WORKLOAD_INITIALIZED', self, context):
self.workload.initialize(context)
self.status = JobStatus.PENDING
self.status = Status.PENDING
context.update_job_state(self)
def configure_target(self, context):

View File

@ -5,10 +5,11 @@ import string
import sys
import uuid
from copy import copy
from datetime import timedelta
from datetime import datetime, timedelta
from wa.framework.configuration.core import JobSpec, RunStatus
from wa.framework.configuration.manager import ConfigManager
from wa.framework.configuration.core import JobSpec, Status
from wa.framework.configuration.execution import ConfigManager
from wa.framework.exception import HostError
from wa.framework.run import RunState, RunInfo
from wa.framework.target.info import TargetInfo
from wa.utils.misc import touch, ensure_directory_exists
@ -21,13 +22,37 @@ logger = logging.getLogger('output')
class Output(object):
kind = None
@property
def resultfile(self):
return os.path.join(self.basepath, 'result.json')
@property
def event_summary(self):
num_events = len(self.events)
if num_events:
lines = self.events[0].message.split('\n')
message = '({} event(s)): {}'
if num_events > 1 or len(lines) > 1:
message += '[...]'
return message.format(num_events, lines[0])
return ''
@property
def status(self):
if self.result is None:
return None
return self.result.status
@status.setter
def status(self, value):
self.result.status = value
def __init__(self, path):
self.basepath = path
self.result = None
self.events = []
def reload(self):
pod = read_pod(self.resultfile)
@ -36,11 +61,16 @@ class Output(object):
def write_result(self):
write_pod(self.result.to_pod(), self.resultfile)
def get_path(self, subpath):
return os.path.join(self.basepath, subpath.strip(os.sep))
def add_metric(self, name, value, units=None, lower_is_better=False,
classifiers=None):
self.result.add_metric(name, value, units, lower_is_better, classifiers)
def add_artifact(self, name, path, kind, description=None, classifiers=None):
if not os.path.exists(path):
path = self.get_path(path)
if not os.path.exists(path):
msg = 'Attempting to add non-existing artifact: {}'
raise HostError(msg.format(path))
@ -51,9 +81,14 @@ class Output(object):
self.result.add_artifact(name, path, kind, description, classifiers)
def add_event(self, message):
self.result.add_event(message)
class RunOutput(Output):
kind = 'run'
@property
def logfile(self):
return os.path.join(self.basepath, 'run.log')
@ -96,7 +131,7 @@ class RunOutput(Output):
self.info = None
self.state = None
self.result = None
self.jobs = None
self.jobs = []
if (not os.path.isfile(self.statefile) or
not os.path.isfile(self.infofile)):
msg = '"{}" does not exist or is not a valid WA output directory.'
@ -155,8 +190,10 @@ class RunOutput(Output):
class JobOutput(Output):
kind = 'job'
def __init__(self, path, id, label, iteration, retry):
self.basepath = path
super(JobOutput, self).__init__(path)
self.id = id
self.label = label
self.iteration = iteration
@ -170,13 +207,17 @@ class Result(object):
@staticmethod
def from_pod(pod):
instance = Result()
instance.status = Status(pod['status'])
instance.metrics = [Metric.from_pod(m) for m in pod['metrics']]
instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']]
instance.events = [Event.from_pod(e) for e in pod['events']]
return instance
def __init__(self):
self.status = Status.NEW
self.metrics = []
self.artifacts = []
self.events = []
def add_metric(self, name, value, units=None, lower_is_better=False,
classifiers=None):
@ -190,10 +231,15 @@ class Result(object):
logger.debug('Adding artifact: {}'.format(artifact))
self.artifacts.append(artifact)
def add_event(self, message):
self.events.append(Event(message))
def to_pod(self):
return dict(
status=str(self.status),
metrics=[m.to_pod() for m in self.metrics],
artifacts=[a.to_pod() for a in self.artifacts],
events=[e.to_pod() for e in self.events],
)
@ -349,6 +395,43 @@ class Metric(object):
return '<{}>'.format(text)
class Event(object):
"""
An event that occured during a run.
"""
__slots__ = ['timestamp', 'message']
@staticmethod
def from_pod(pod):
instance = Event(pod['message'])
instance.timestamp = pod['timestamp']
return instance
@property
def summary(self):
lines = self.message.split('\n')
result = lines[0]
if len(lines) > 1:
result += '[...]'
return result
def __init__(self, message):
self.timestamp = datetime.utcnow()
self.message = message
def to_pod(self):
return dict(
timestamp=self.timestamp,
message=self.message,
)
def __str__(self):
return '[{}] {}'.format(self.timestamp, self.message)
__repr__ = __str__
def init_run_output(path, wa_state, force=False):
if os.path.exists(path):
@ -382,7 +465,10 @@ def init_job_output(run_output, job):
path = os.path.join(run_output.basepath, output_name)
ensure_directory_exists(path)
write_pod(Result().to_pod(), os.path.join(path, 'result.json'))
return JobOutput(path, job.id, job.iteration, job.label, job.retries)
job_output = JobOutput(path, job.id, job.iteration, job.label, job.retries)
job_output.status = job.status
run_output.jobs.append(job_output)
return job_output
def _save_raw_config(meta_dir, state):

87
wa/framework/processor.py Normal file
View File

@ -0,0 +1,87 @@
import logging
from wa.framework import pluginloader
from wa.framework.exception import ConfigError
from wa.framework.instrumentation import is_installed
from wa.framework.plugin import Plugin
from wa.utils.log import log_error, indent, dedent
class ResultProcessor(Plugin):
kind = 'result_processor'
requires = []
def validate(self):
super(ResultProcessor, self).validate()
for instrument in self.requires:
if not is_installed(instrument):
msg = 'Instrument "{}" is required by {}, but is not installed.'
raise ConfigError(msg.format(instrument, self.name))
def initialize(self):
pass
def finalize(self):
pass
class ProcessorManager(object):
def __init__(self, loader=pluginloader):
self.loader = loader
self.logger = logging.getLogger('processor')
self.processors = []
def install(self, processor):
if not isinstance(processor, ResultProcessor):
processor = self.loader.get_result_processor(processor)
self.logger.debug('Installing {}'.format(processor.name))
self.processors.append(processor)
def validate(self):
for proc in self.processors:
proc.validate()
def initialize(self):
for proc in self.processors:
proc.initialize()
def finalize(self):
for proc in self.processors:
proc.finalize()
def process_job_output(self, context):
self.do_for_each_proc('process_job_output', 'processing using "{}"',
context.job_output, context.target_info,
context.run_output)
def export_job_output(self, context):
self.do_for_each_proc('export_job_output', 'Exporting using "{}"',
context.job_output, context.target_info,
context.run_output)
def process_run_output(self, context):
self.do_for_each_proc('process_run_output', 'Processing using "{}"',
context.run_output, context.target_info)
def export_run_output(self, context):
self.do_for_each_proc('export_run_output', 'Exporting using "{}"',
context.run_output, context.target_info)
def do_for_each_proc(self, method_name, message, *args):
try:
indent()
for proc in self.processors:
proc_func = getattr(proc, method_name, None)
if proc_func is None:
continue
try:
self.logger.info(message.format(proc.name))
proc_func(*args)
except Exception as e:
if isinstance(e, KeyboardInterrupt):
raise
log_error(e, self.logger)
finally:
dedent()

View File

@ -18,7 +18,7 @@ from collections import OrderedDict, Counter
from copy import copy
from datetime import datetime, timedelta
from wa.framework.configuration.core import RunStatus, JobStatus
from wa.framework.configuration.core import Status
class RunInfo(object):
@ -67,7 +67,7 @@ class RunState(object):
@staticmethod
def from_pod(pod):
instance = RunState()
instance.status = RunStatus(pod['status'])
instance.status = Status(pod['status'])
instance.timestamp = pod['timestamp']
jss = [JobState.from_pod(j) for j in pod['jobs']]
instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss)
@ -76,11 +76,11 @@ class RunState(object):
@property
def num_completed_jobs(self):
return sum(1 for js in self.jobs.itervalues()
if js.status > JobStatus.SKIPPED)
if js.status > Status.SKIPPED)
def __init__(self):
self.jobs = OrderedDict()
self.status = RunStatus.NEW
self.status = Status.NEW
self.timestamp = datetime.now()
def add_job(self, job):
@ -110,7 +110,7 @@ class JobState(object):
@staticmethod
def from_pod(pod):
instance = JobState(pod['id'], pod['label'], JobStatus(pod['status']))
instance = JobState(pod['id'], pod['label'], Status(pod['status']))
instance.retries = pod['retries']
instance.iteration = pod['iteration']
instance.timestamp = pod['timestamp']

View File

87
wa/processors/csvproc.py Normal file
View File

@ -0,0 +1,87 @@
import csv
from wa import ResultProcessor, Parameter
from wa.framework.exception import ConfigError
from wa.utils.types import list_of_strings
class CsvReportProcessor(ResultProcessor):
name = 'csv'
description = """
Creates a ``results.csv`` in the output directory containing results for
all iterations in CSV format, each line containing a single metric.
"""
parameters = [
Parameter('use_all_classifiers', kind=bool, default=False,
global_alias='use_all_classifiers',
description="""
If set to ``True``, this will add a column for every classifier
that features in at least one collected metric.
.. note:: This cannot be ``True`` if ``extra_columns`` is set.
"""),
Parameter('extra_columns', kind=list_of_strings,
description="""
List of classifiers to use as columns.
.. note:: This cannot be set if ``use_all_classifiers`` is
``True``.
"""),
]
def validate(self):
super(CsvReportProcessor, self).validate()
if self.use_all_classifiers and self.extra_columns:
msg = 'extra_columns cannot be specified when '\
'use_all_classifiers is True'
raise ConfigError(msg)
def initialize(self):
self.results_so_far = [] # pylint: disable=attribute-defined-outside-init
self.artifact_added = False
def process_job_output(self, output, target_info, run_output):
self.results_so_far.append(output)
self._write_results(self.results_so_far, run_output)
if not self.artifact_added:
run_output.add_artifact('run_result_csv', 'results.csv', 'export')
self.artifact_added = True
def process_run_result(self, output, target_info):
self.results_so_far.append(output.result)
self._write_results(self.rsults_so_far, output)
if not self.artifact_added:
output.add_artifact('run_result_csv', 'results.csv', 'export')
self.artifact_added = True
def _write_results(self, results, output):
if self.use_all_classifiers:
classifiers = set([])
for result in results:
for metric in result.metrics:
classifiers.update(metric.classifiers.keys())
extra_columns = list(classifiers)
elif self.extra_columns:
extra_columns = self.extra_columns
else:
extra_columns = []
outfile = output.get_path('results.csv')
with open(outfile, 'wb') as wfh:
writer = csv.writer(wfh)
writer.writerow(['id', 'workload', 'iteration', 'metric', ] +
extra_columns + ['value', 'units'])
for o in results:
header = [o.id, o.label, o.iteration]
for metric in o.result.metrics:
row = (header + [metric.name] +
[str(metric.classifiers.get(c, ''))
for c in extra_columns] +
[str(metric.value), metric.units or ''])
writer.writerow(row)

59
wa/processors/status.py Normal file
View File

@ -0,0 +1,59 @@
# Copyright 2013-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=R0201
import os
import time
from collections import Counter
from wa import ResultProcessor, Status
from wa.utils.misc import write_table
class StatusTxtReporter(ResultProcessor):
name = 'status'
description = """
Outputs a txt file containing general status information about which runs
failed and which were successful
"""
def process_run_output(self, output, target_info):
counter = Counter()
for jo in output.jobs:
counter[jo.status] += 1
outfile = output.get_path('status.txt')
self.logger.info('Status available in {}'.format(outfile))
with open(outfile, 'w') as wfh:
wfh.write('Run name: {}\n'.format(output.info.run_name))
wfh.write('Run status: {}\n'.format(output.status))
wfh.write('Date: {}\n'.format(time.strftime("%c")))
if output.events:
wfh.write('Events:\n')
for event in output.events:
wfh.write('\t{}\n'.format(event.summary))
txt = '{}/{} iterations completed without error\n'
wfh.write(txt.format(counter[Status.OK], len(output.jobs)))
wfh.write('\n')
status_lines = [map(str, [o.id, o.label, o.iteration, o.status,
o.event_summary])
for o in output.jobs]
write_table(status_lines, wfh, align='<<>><')
output.add_artifact('run_status_summary', 'status.txt', 'export')