1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-09-02 03:12:34 +01:00

Skeleton job execution

This commit is contained in:
Sergei Trofimov
2017-03-09 14:44:26 +00:00
parent 6eb5c3681d
commit 011fd684bd
14 changed files with 714 additions and 92 deletions

View File

@@ -555,7 +555,7 @@ class MetaConfiguration(Configuration):
plugin_packages = [
'wa.commands',
'wa.workloads',
#'wa.instrumentation',
'wa.instrumentation',
#'wa.result_processors',
#'wa.managers',
'wa.framework.target.descriptor',

View File

@@ -1,4 +1,5 @@
import random
import logging
from itertools import izip_longest, groupby, chain
from wa.framework import pluginloader
@@ -6,6 +7,8 @@ from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration
JobGenerator, settings)
from wa.framework.configuration.parsers import ConfigParser
from wa.framework.configuration.plugin_cache import PluginCache
from wa.framework.exception import NotFoundError
from wa.utils.types import enum
class CombinedConfig(object):
@@ -26,33 +29,54 @@ class CombinedConfig(object):
'run_config': self.run_config.to_pod()}
class JobStatus:
PENDING = 0
RUNNING = 1
OK = 2
FAILED = 3
PARTIAL = 4
ABORTED = 5
PASSED = 6
JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING',
'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED'])
class Job(object):
@property
def id(self):
return self.spec.id
def __init__(self, spec, iteration, context):
self.logger = logging.getLogger('job')
self.spec = spec
self.iteration = iteration
self.context = context
self.status = 'new'
self.status = JobStatus.NEW
self.workload = None
self.output = None
def load(self, target, loader=pluginloader):
self.logger.debug('Loading job {}'.format(self.id))
self.workload = loader.get_workload(self.spec.workload_name,
target,
**self.spec.workload_parameters)
self.workload.init_resources(self.context)
self.workload.validate()
self.status = JobStatus.LOADED
def initialize(self, context):
self.logger.info('Initializing job {}'.format(self.id))
self.status = JobStatus.PENDING
def configure_target(self, context):
self.logger.info('Configuring target for job {}'.format(self.id))
def setup(self, context):
self.logger.info('Setting up job {}'.format(self.id))
def run(self, context):
self.logger.info('Running job {}'.format(self.id))
def process_output(self, context):
self.looger.info('Processing output for job {}'.format(self.id))
def teardown(self, context):
self.logger.info('Tearing down job {}'.format(self.id))
def finalize(self, context):
self.logger.info('Finalizing job {}'.format(self.id))
class ConfigManager(object):
"""
@@ -108,8 +132,12 @@ class ConfigManager(object):
def get_instruments(self, target):
instruments = []
for name in self.enabled_instruments:
instruments.append(self.get_plugin(name, kind='instrument',
target=target))
try:
instruments.append(self.get_plugin(name, kind='instrument',
target=target))
except NotFoundError:
msg = 'Instrument "{}" not found'
raise NotFoundError(msg.format(name))
return instruments
def finalize(self):

View File

@@ -85,27 +85,7 @@ def main():
except KeyboardInterrupt:
logging.info('Got CTRL-C. Aborting.')
sys.exit(3)
except (WAError, DevlibError) as e:
logging.critical(e)
sys.exit(1)
except subprocess.CalledProcessError as e:
tb = get_traceback()
logging.critical(tb)
command = e.cmd
if e.args:
command = '{} {}'.format(command, ' '.join(e.args))
message = 'Command \'{}\' returned non-zero exit status {}\nOUTPUT:\n{}\n'
logging.critical(message.format(command, e.returncode, e.output))
sys.exit(2)
except SyntaxError as e:
tb = get_traceback()
logging.critical(tb)
message = 'Syntax Error in {}, line {}, offset {}:'
logging.critical(message.format(e.filename, e.lineno, e.offset))
logging.critical('\t{}'.format(e.msg))
sys.exit(2)
except Exception as e: # pylint: disable=broad-except
tb = get_traceback()
logging.critical(tb)
logging.critical('{}({})'.format(e.__class__.__name__, e))
if not getattr(e, 'logged', None):
log.log_error(e, logger)
sys.exit(2)

View File

@@ -57,6 +57,8 @@ from wa.framework.exception import (WAError, ConfigError, TimeoutError,
from wa.framework.plugin import Artifact
from wa.framework.resource import ResourceResolver
from wa.framework.target.info import TargetInfo
from wa.framework.target.manager import TargetManager
from wa.utils import log
from wa.utils.misc import (ensure_directory_exists as _d,
get_traceback, format_duration)
from wa.utils.serializer import json
@@ -74,15 +76,70 @@ REBOOT_DELAY = 3
class ExecutionContext(object):
@property
def previous_job(self):
if not self.job_queue:
return None
return self.job_queue[0]
@property
def next_job(self):
if not self.completed_jobs:
return None
return self.completed_jobs[-1]
@property
def spec_changed(self):
if self.previous_job is None and self.current_job is not None: # Start of run
return True
if self.previous_job is not None and self.current_job is None: # End of run
return True
return self.current_job.spec.id != self.previous_job.spec.id
@property
def spec_will_change(self):
if self.current_job is None and self.next_job is not None: # Start of run
return True
if self.current_job is not None and self.next_job is None: # End of run
return True
return self.current_job.spec.id != self.next_job.spec.id
def __init__(self, cm, tm, output):
self.logger = logging.getLogger('ExecContext')
self.logger = logging.getLogger('context')
self.cm = cm
self.tm = tm
self.output = output
self.logger.debug('Loading resource discoverers')
self.resolver = ResourceResolver(cm)
self.resolver.load()
self.job_queue = None
self.completed_jobs = None
self.current_job = None
def start_run(self):
self.output.info.start_time = datetime.now()
self.output.write_info()
self.job_queue = copy(self.cm.jobs)
self.completed_jobs = []
def end_run(self):
self.output.info.end_time = datetime.now()
self.output.info.duration = self.output.info.end_time -\
self.output.info.start_time
self.output.write_info()
def start_job(self):
if not self.job_queue:
raise RuntimeError('No jobs to run')
self.current_job = self.job_queue.pop(0)
return self.current_job
def end_job(self):
if not self.current_job:
raise RuntimeError('No jobs in progress')
self.completed_jobs.append(self.current_job)
self.current_job = None
class OldExecutionContext(object):
@@ -147,22 +204,6 @@ class OldExecutionContext(object):
self.job_iteration_counts = defaultdict(int)
self.aborted = False
self.runner = None
if config.agenda.filepath:
self.run_artifacts.append(Artifact('agenda',
os.path.join(self.host_working_directory,
os.path.basename(config.agenda.filepath)),
'meta',
mandatory=True,
description='Agenda for this run.'))
for i, filepath in enumerate(settings.config_paths, 1):
name = 'config_{}'.format(i)
path = os.path.join(self.host_working_directory,
name + os.path.splitext(filepath)[1])
self.run_artifacts.append(Artifact(name,
path,
kind='meta',
mandatory=True,
description='Config file used for the run.'))
def initialize(self):
if not os.path.isdir(self.run_output_directory):
@@ -228,30 +269,6 @@ def _check_artifact_path(path, rootpath):
return full_path
class FakeTargetManager(object):
# TODO: this is a FAKE
def __init__(self, name, config):
self.device_name = name
self.device_config = config
from devlib import LocalLinuxTarget
self.target = LocalLinuxTarget({'unrooted': True})
def get_target_info(self):
return TargetInfo(self.target)
def validate_runtime_parameters(self, params):
pass
def merge_runtime_parameters(self, params):
pass
def init_target_manager(config):
return FakeTargetManager(config.device, config.device_config)
class Executor(object):
"""
The ``Executor``'s job is to set up the execution context and pass to a
@@ -268,7 +285,7 @@ class Executor(object):
# pylint: disable=R0915
def __init__(self):
self.logger = logging.getLogger('Executor')
self.logger = logging.getLogger('executor')
self.error_logged = False
self.warning_logged = False
pluginloader = None
@@ -297,7 +314,8 @@ class Executor(object):
output.write_config(config)
self.logger.info('Connecting to target')
target_manager = init_target_manager(config.run_config)
target_manager = TargetManager(config.run_config.device,
config.run_config.device_config)
output.write_target_info(target_manager.get_target_info())
self.logger.info('Initializing execution conetext')
@@ -312,6 +330,11 @@ class Executor(object):
instrumentation.install(instrument)
instrumentation.validate()
self.logger.info('Starting run')
runner = Runner(context)
runner.run()
def execute_postamble(self):
"""
@@ -370,6 +393,92 @@ class Runner(object):
"""
def __init__(self, context):
self.logger = logging.getLogger('runner')
self.context = context
self.output = self.context.output
self.config = self.context.cm
def run(self):
self.send(signal.RUN_STARTED)
try:
self.initialize_run()
self.send(signal.RUN_INITIALIZED)
while self.context.job_queue:
with signal.wrap('JOB_EXECUTION', self):
self.run_next_job(self.context)
except Exception as e:
if (not getattr(e, 'logged', None) and
not isinstance(e, KeyboardInterrupt)):
log.log_error(e, self.logger)
e.logged = True
raise e
finally:
self.finalize_run()
self.send(signal.RUN_COMPLETED)
def initialize_run(self):
self.logger.info('Initializing run')
self.context.start_run()
log.indent()
for job in self.context.job_queue:
job.initialize(self.context)
log.dedent()
def finalize_run(self):
self.logger.info('Finalizing run')
self.context.end_run()
def run_next_job(self, context):
job = context.start_job()
self.logger.info('Running job {}'.format(job.id))
job.status = JobStatus.RUNNING
log.indent()
self.send(signal.JOB_STARTED)
with signal.wrap('JOB_TARGET_CONFIG', self):
job.configure_target(context)
with signal.wrap('JOB_SETUP', self):
job.setup(context)
try:
with signal.wrap('JOB_EXECUTION', self):
job.run(context)
try:
with signal.wrap('JOB_OUTPUT_PROCESSED', self):
job.run(context)
except Exception:
job.status = JobStatus.PARTIAL
raise
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
raise
except Exception as e:
job.status = JobStatus.FAILED
if not getattr(e, 'logged', None):
log.log_error(e, self.logger)
e.logged = True
raise e
finally:
# If setup was successfully completed, teardown must
# run even if the job failed
with signal.wrap('JOB_TEARDOWN', self):
job.teardown(context)
log.dedent()
self.logger.info('Completing job {}'.format(job.id))
self.send(signal.JOB_COMPLETED)
context.end_job()
def send(self, s):
signal.send(s, self, self.context)
def __str__(self):
return 'runner'
class RunnerJob(object):
"""

View File

@@ -102,7 +102,7 @@ import logging
import inspect
from collections import OrderedDict
import wa.framework.signal as signal
from wa.framework import signal
from wa.framework.plugin import Plugin
from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError
from wa.utils.misc import get_traceback, isiterable

View File

@@ -5,6 +5,7 @@ import string
import sys
import uuid
from copy import copy
from datetime import timedelta
from wlauto.core.configuration.configuration import JobSpec
from wlauto.core.configuration.manager import ConfigManager
@@ -25,10 +26,13 @@ class RunInfo(object):
@staticmethod
def from_pod(pod):
uid = pod.pop('uuid')
duration = pod.pop('duration')
if uid is not None:
uid = uuid.UUID(uid)
instance = RunInfo(**pod)
instance.uuid = uid
instance.duration = duration if duration is None else\
timedelta(seconds=duration)
return instance
def __init__(self, run_name=None, project=None, project_stage=None,
@@ -44,6 +48,10 @@ class RunInfo(object):
def to_pod(self):
d = copy(self.__dict__)
d['uuid'] = str(self.uuid)
if self.duration is None:
d['duration'] = self.duration
else:
d['duration'] = self.duration.total_seconds()
return d

View File

@@ -406,7 +406,7 @@ class Plugin(object):
return self.__class__.__name__
def __init__(self, **kwargs):
self.logger = logging.getLogger(self._classname)
self.logger = logging.getLogger(self.name)
self._modules = []
self.capabilities = getattr(self.__class__, 'capabilities', [])
for param in self.parameters:

View File

@@ -286,7 +286,7 @@ class ResourceResolver(object):
"""
def __init__(self, config):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger('resolver')
self.getters = defaultdict(prioritylist)
self.config = config

View File

@@ -27,7 +27,7 @@ from louie import dispatcher
from wa.utils.types import prioritylist
logger = logging.getLogger('dispatcher')
logger = logging.getLogger('signal')
class Signal(object):
@@ -101,6 +101,27 @@ BEFORE_RUN_INIT = Signal('before-run-init', invert_priority=True)
SUCCESSFUL_RUN_INIT = Signal('successful-run-init')
AFTER_RUN_INIT = Signal('after-run-init')
BEFORE_JOB_TARGET_CONFIG = Signal('before-job-target-config', invert_priority=True)
SUCCESSFUL_JOB_TARGET_CONFIG = Signal('successful-job-target-config')
AFTER_JOB_TARGET_CONFIG = Signal('after-job-target-config')
BEFORE_JOB_SETUP = Signal('before-job-setup', invert_priority=True)
SUCCESSFUL_JOB_SETUP = Signal('successful-job-setup')
AFTER_JOB_SETUP = Signal('after-job-setup')
BEFORE_JOB_EXECUTION = Signal('before-job-execution', invert_priority=True)
SUCCESSFUL_JOB_EXECUTION = Signal('successful-job-execution')
AFTER_JOB_EXECUTION = Signal('after-job-execution')
BEFORE_JOB_OUTPUT_PROCESSED = Signal('before-job-output-processed',
invert_priority=True)
SUCCESSFUL_JOB_OUTPUT_PROCESSED = Signal('successful-job-output-processed')
AFTER_JOB_OUTPUT_PROCESSED = Signal('after-job-output-processed')
BEFORE_JOB_TEARDOWN = Signal('before-job-teardown', invert_priority=True)
SUCCESSFUL_JOB_TEARDOWN = Signal('successful-job-teardown')
AFTER_JOB_TEARDOWN = Signal('after-job-teardown')
BEFORE_FLASHING = Signal('before-flashing', invert_priority=True)
SUCCESSFUL_FLASHING = Signal('successful-flashing')
AFTER_FLASHING = Signal('after-flashing')
@@ -250,6 +271,7 @@ def send(signal, sender=dispatcher.Anonymous, *args, **kwargs):
The rest of the parameters will be passed on as aruments to the handler.
"""
logger.debug('Sending {} from {}'.format(signal, sender))
return dispatcher.send(signal, sender, *args, **kwargs)
@@ -266,6 +288,7 @@ def safe_send(signal, sender=dispatcher.Anonymous,
to just ``[KeyboardInterrupt]``).
"""
try:
logger.debug('Safe-sending {} from {}'.format(signal, sender))
send(singnal, sender, *args, **kwargs)
except Exception as e:
if any(isinstance(e, p) for p in propagate):
@@ -292,3 +315,17 @@ def wrap(signal_name, sender=dispatcher.Anonymous, safe=False, *args, **kwargs):
finally:
send_func(after_signal, sender, *args, **kwargs)
def wrapped(signal_name, sender=dispatcher.Anonymous, safe=False):
"""A decorator for wrapping function in signal dispatch."""
@wrapt.decorator
def signal_wrapped(wrapped, instance, args, kwargs):
func_id = repr(wrapped)
def signal_wrapper(*args, **kwargs):
with wrap(signal_name, sender, safe):
return wrapped(*args, **kwargs)
return signal_wrapper(*args, **kwargs)
return signal_wrapped