mirror of
https://github.com/ARM-software/workload-automation.git
synced 2025-01-19 12:24:32 +00:00
c0049d0a86
Only run finalize() for a job if initialize has succeed. finalize() should be able to assume that initialize() has succeed, without needing to check that that file have been created, variables set, etc.
163 lines
5.8 KiB
Python
163 lines
5.8 KiB
Python
import logging
|
|
from copy import copy
|
|
from datetime import datetime
|
|
|
|
from wa.framework import pluginloader, signal, instrument
|
|
from wa.framework.configuration.core import Status
|
|
|
|
# Because of use of Enum (dynamic attrs)
|
|
# pylint: disable=no-member
|
|
|
|
class Job(object):
|
|
|
|
_workload_cache = {}
|
|
|
|
@property
|
|
def id(self):
|
|
return self.spec.id
|
|
|
|
@property
|
|
def label(self):
|
|
return self.spec.label
|
|
|
|
@property
|
|
def classifiers(self):
|
|
return self.spec.classifiers
|
|
|
|
@property
|
|
def status(self):
|
|
return self._status
|
|
|
|
@property
|
|
def has_been_initialized(self):
|
|
return self._has_been_initialized
|
|
|
|
@status.setter
|
|
def status(self, value):
|
|
self._status = value
|
|
if self.output:
|
|
self.output.status = value
|
|
|
|
def __init__(self, spec, iteration, context):
|
|
self.logger = logging.getLogger('job')
|
|
self.spec = spec
|
|
self.iteration = iteration
|
|
self.context = context
|
|
self.workload = None
|
|
self.output = None
|
|
self.run_time = None
|
|
self.retries = 0
|
|
self._has_been_initialized = False
|
|
self._status = Status.NEW
|
|
|
|
def load(self, target, loader=pluginloader):
|
|
self.logger.info('Loading job {}'.format(self))
|
|
if self.iteration == 1:
|
|
self.workload = loader.get_workload(self.spec.workload_name,
|
|
target,
|
|
**self.spec.workload_parameters)
|
|
self.workload.init_resources(self.context)
|
|
self.workload.validate()
|
|
self._workload_cache[self.id] = self.workload
|
|
else:
|
|
self.workload = self._workload_cache[self.id]
|
|
|
|
def set_output(self, output):
|
|
output.classifiers = copy(self.classifiers)
|
|
self.output = output
|
|
|
|
def initialize(self, context):
|
|
self.logger.info('Initializing job {}'.format(self))
|
|
with signal.wrap('WORKLOAD_INITIALIZED', self, context):
|
|
self.workload.logger.context = context
|
|
self.workload.initialize(context)
|
|
self.set_status(Status.PENDING)
|
|
self._has_been_initialized = True
|
|
context.update_job_state(self)
|
|
|
|
def configure_augmentations(self, context, pm):
|
|
instruments_to_enable = set()
|
|
output_processors_to_enable = set()
|
|
enabled_instruments = set(i.name for i in instrument.get_enabled())
|
|
enabled_output_processors = set(p.name for p in pm.get_enabled())
|
|
|
|
for augmentation in self.spec.augmentations.values():
|
|
augmentation_cls = context.cm.plugin_cache.get_plugin_class(augmentation)
|
|
if augmentation_cls.kind == 'instrument':
|
|
instruments_to_enable.add(augmentation)
|
|
elif augmentation_cls.kind == 'output_processor':
|
|
output_processors_to_enable.add(augmentation)
|
|
|
|
# Disable unrequired instruments
|
|
for instrument_name in enabled_instruments.difference(instruments_to_enable):
|
|
instrument.disable(instrument_name)
|
|
# Enable additional instruments
|
|
for instrument_name in instruments_to_enable.difference(enabled_instruments):
|
|
instrument.enable(instrument_name)
|
|
|
|
# Disable unrequired output_processors
|
|
for processor in enabled_output_processors.difference(output_processors_to_enable):
|
|
pm.disable(processor)
|
|
# Enable additional output_processors
|
|
for processor in output_processors_to_enable.difference(enabled_output_processors):
|
|
pm.enable(processor)
|
|
|
|
def configure_target(self, context):
|
|
self.logger.info('Configuring target for job {}'.format(self))
|
|
context.tm.commit_runtime_parameters(self.spec.runtime_parameters)
|
|
|
|
def setup(self, context):
|
|
self.logger.info('Setting up job {}'.format(self))
|
|
with signal.wrap('WORKLOAD_SETUP', self, context):
|
|
self.workload.setup(context)
|
|
|
|
def run(self, context):
|
|
self.logger.info('Running job {}'.format(self))
|
|
with signal.wrap('WORKLOAD_EXECUTION', self, context):
|
|
start_time = datetime.utcnow()
|
|
try:
|
|
self.workload.run(context)
|
|
finally:
|
|
self.run_time = datetime.utcnow() - start_time
|
|
|
|
def process_output(self, context):
|
|
if not context.tm.is_responsive:
|
|
self.logger.info('Target unresponsive; not processing job output.')
|
|
return
|
|
self.logger.info('Processing output for job {}'.format(self))
|
|
if self.status != Status.FAILED:
|
|
with signal.wrap('WORKLOAD_RESULT_EXTRACTION', self, context):
|
|
self.workload.extract_results(context)
|
|
context.extract_results()
|
|
with signal.wrap('WORKLOAD_OUTPUT_UPDATE', self, context):
|
|
self.workload.update_output(context)
|
|
|
|
def teardown(self, context):
|
|
if not context.tm.is_responsive:
|
|
self.logger.info('Target unresponsive; not tearing down.')
|
|
return
|
|
self.logger.info('Tearing down job {}'.format(self))
|
|
with signal.wrap('WORKLOAD_TEARDOWN', self, context):
|
|
self.workload.teardown(context)
|
|
|
|
def finalize(self, context):
|
|
if not self._has_been_initialized:
|
|
return
|
|
if not context.tm.is_responsive:
|
|
self.logger.info('Target unresponsive; not finalizing.')
|
|
return
|
|
self.logger.info('Finalizing job {} '.format(self))
|
|
with signal.wrap('WORKLOAD_FINALIZED', self, context):
|
|
self.workload.finalize(context)
|
|
|
|
def set_status(self, status, force=False):
|
|
status = Status(status)
|
|
if force or self.status < status:
|
|
self.status = status
|
|
|
|
def __str__(self):
|
|
return '{} ({}) [{}]'.format(self.id, self.label, self.iteration)
|
|
|
|
def __repr__(self):
|
|
return 'Job({})'.format(self)
|