1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-09-02 19:32:34 +01:00

Initial implementation of state tracking and output handling.

This commit is contained in:
Sergei Trofimov
2017-03-16 17:54:48 +00:00
parent 6e13c96d9c
commit 24402660c4
10 changed files with 443 additions and 435 deletions

View File

@@ -18,338 +18,112 @@ from copy import copy
from datetime import datetime, timedelta
from collections import OrderedDict
from wa.framework import signal, pluginloader, log
from wa.framework.plugin import Plugin
from wa.framework.output import Status
from wa.framework.resource import ResourceResolver
from wa.framework.exception import JobError
from wa.utils import counter
from wa.utils.serializer import json
from wa.utils.misc import ensure_directory_exists as _d
from wa.utils.types import caseless_string
from wa.framework.configuration.core import RunStatus
class RunInfo(object):
"""
Information about the current run, such as its unique ID, run
time, etc.
class JobActor(object):
"""
@staticmethod
def from_pod(pod):
uid = pod.pop('uuid')
duration = pod.pop('duration')
if uid is not None:
uid = uuid.UUID(uid)
instance = RunInfo(**pod)
instance.uuid = uid
instance.duration = duration if duration is None else\
timedelta(seconds=duration)
return instance
def get_config(self):
return {}
def __init__(self, run_name=None, project=None, project_stage=None,
start_time=None, end_time=None, duration=None):
self.uuid = uuid.uuid4()
self.run_name = None
self.project = None
self.project_stage = None
self.start_time = None
self.end_time = None
self.duration = None
def initialize(self, context):
pass
def run(self):
pass
def finalize(self):
pass
def restart(self):
pass
def complete(self):
pass
def to_pod(self):
d = copy(self.__dict__)
d['uuid'] = str(self.uuid)
if self.duration is None:
d['duration'] = self.duration
else:
d['duration'] = self.duration.total_seconds()
return d
class RunnerJob(object):
class RunState(object):
"""
Represents the state of a WA run.
"""
@staticmethod
def from_pod(pod):
instance = RunState()
instance.status = RunStatus(pod['status'])
instance.timestamp = pod['timestamp']
jss = [JobState.from_pod(j) for j in pod['jobs']]
instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss)
return instance
def __init__(self):
self.jobs = OrderedDict()
self.status = RunStatus.NEW
self.timestamp = datetime.now()
def add_job(self, job):
job_state = JobState(job.id, job.label, job.iteration, job.status)
self.jobs[(job_state.id, job_state.iteration)] = job_state
def update_job(self, job):
state = self.jobs[(job.id, job.iteration)]
state.status = job.status
state.timestamp = datetime.now()
def to_pod(self):
return OrderedDict(
status=str(self.status),
timestamp=self.timestamp,
jobs=[j.to_pod() for j in self.jobs.itervalues()],
)
class JobState(object):
@staticmethod
def from_pod(pod):
instance = JobState(pod['id'], pod['label'], JobStatus(pod['status']))
instance.retries = pod['retries']
instance.iteration = pod['iteration']
instance.timestamp = pod['timestamp']
return instance
@property
def status(self):
return self.output.status
def output_name(self):
return '{}-{}-{}'.format(self.id, self.label, self.iteration)
@status.setter
def status(self, value):
self.output.status = value
@property
def should_retry(self):
return self.attempt <= self.max_retries
def __init__(self, id, actor, output, max_retries):
def __init__(self, id, label, iteration, status):
self.id = id
self.actor = actor
self.output = output
self.max_retries = max_retries
self.status = Status.NEW
self.attempt = 0
self.label = label
self.iteration = iteration
self.status = status
self.retries = 0
self.timestamp = datetime.now()
def initialize(self, context):
self.actor.initialize(context)
self.status = Status.PENDING
def run(self):
self.status = Status.RUNNING
self.attempt += 1
self.output.config = self.actor.get_config()
self.output.initialize()
self.actor.run()
self.status = Status.COMPLETE
def finalize(self):
self.actor.finalize()
def restart(self):
self.actor.restart()
def complete(self):
self.actor.complete()
__run_methods = set()
def runmethod(method):
"""
A method decorator that ensures that a method is invoked only once per run.
"""
def _method_wrapper(*args, **kwargs):
if method in __run_methods:
return
__run_methods.add(method)
ret = method(*args, **kwargs)
if ret is not None:
message = 'runmethod()\'s must return None; method "{}" returned "{}"'
raise RuntimeError(message.format(method, ret))
return _method_wrapper
def reset_runmethods():
global __run_methods
__run_methods = set()
class Runner(object):
@property
def info(self):
return self.output.info
@property
def status(self):
return self.output.status
@status.setter
def status(self, value):
self.output.status = value
@property
def jobs_pending(self):
return len(self.job_queue) > 0
@property
def current_job(self):
if self.job_queue:
return self.job_queue[0]
@property
def previous_job(self):
if self.completed_jobs:
return self.completed_jobs[-1]
@property
def next_job(self):
if len(self.job_queue) > 1:
return self.job_queue[1]
def __init__(self, output):
self.logger = logging.getLogger('runner')
self.output = output
self.context = RunContext(self)
self.status = Status.NEW
self.job_queue = []
self.completed_jobs = []
self._known_ids = set([])
def add_job(self, job_id, actor, max_retries=2):
job_id = caseless_string(job_id)
if job_id in self._known_ids:
raise JobError('Job with id "{}" already exists'.format(job_id))
output = self.output.create_job_output(job_id)
self.job_queue.append(RunnerJob(job_id, actor, output, max_retries))
self._known_ids.add(job_id)
def initialize(self):
self.logger.info('Initializing run')
self.start_time = datetime.now()
if not self.info.start_time:
self.info.start_time = self.start_time
self.info.duration = timedelta()
self.context.initialize()
for job in self.job_queue:
job.initialize(self.context)
self.persist_state()
self.logger.info('Run initialized')
def run(self):
self.status = Status.RUNNING
reset_runmethods()
signal.send(signal.RUN_STARTED, self, self.context)
self.initialize()
signal.send(signal.RUN_INITIALIZED, self, self.context)
self.run_jobs()
signal.send(signal.RUN_COMPLETED, self, self.context)
self.finalize()
signal.send(signal.RUN_FINALIZED, self, self.context)
def run_jobs(self):
try:
self.logger.info('Running jobs')
while self.jobs_pending:
self.begin_job()
log.indent()
try:
self.current_job.run()
except KeyboardInterrupt:
self.current_job.status = Status.ABORTED
signal.send(signal.JOB_ABORTED, self, self.current_job)
raise
except Exception as e:
self.current_job.status = Status.FAILED
log.log_error(e, self.logger)
signal.send(signal.JOB_FAILED, self, self.current_job)
else:
self.current_job.status = Status.COMPLETE
finally:
log.dedent()
self.complete_job()
except KeyboardInterrupt:
self.status = Status.ABORTED
while self.job_queue:
job = self.job_queue.pop(0)
job.status = RunnerJob.ABORTED
self.completed_jobs.append(job)
signal.send(signal.RUN_ABORTED, self, self)
raise
except Exception as e:
self.status = Status.FAILED
log.log_error(e, self.logger)
signal.send(signal.RUN_FAILED, self, self)
else:
self.status = Status.COMPLETE
def finalize(self):
self.logger.info('Finalizing run')
for job in self.job_queue:
job.finalize()
self.end_time = datetime.now()
self.info.end_time = self.end_time
self.info.duration += self.end_time - self.start_time
self.persist_state()
signal.send(signal.RUN_FINALIZED, self, self)
self.logger.info('Run completed')
def begin_job(self):
self.logger.info('Starting job {}'.format(self.current_job.id))
signal.send(signal.JOB_STARTED, self, self.current_job)
self.persist_state()
def complete_job(self):
if self.current_job.status == Status.FAILED:
self.output.move_failed(self.current_job.output)
if self.current_job.should_retry:
self.logger.info('Restarting job {}'.format(self.current_job.id))
self.persist_state()
self.current_job.restart()
signal.send(signal.JOB_RESTARTED, self, self.current_job)
return
self.logger.info('Completing job {}'.format(self.current_job.id))
self.current_job.complete()
self.persist_state()
signal.send(signal.JOB_COMPLETED, self, self.current_job)
job = self.job_queue.pop(0)
self.completed_jobs.append(job)
def persist_state(self):
self.output.persist()
class RunContext(object):
"""
Provides a context for instrumentation. Keeps track of things like
current workload and iteration.
"""
@property
def run_output(self):
return self.runner.output
@property
def current_job(self):
return self.runner.current_job
@property
def run_output_directory(self):
return self.run_output.output_directory
@property
def output_directory(self):
if self.runner.current_job:
return self.runner.current_job.output.output_directory
else:
return self.run_output.output_directory
@property
def info_directory(self):
return self.run_output.info_directory
@property
def config_directory(self):
return self.run_output.config_directory
@property
def failed_directory(self):
return self.run_output.failed_directory
@property
def log_file(self):
return os.path.join(self.output_directory, 'run.log')
def __init__(self, runner):
self.runner = runner
self.job = None
self.iteration = None
self.job_output = None
self.resolver = ResourceResolver()
def initialize(self):
self.resolver.load()
def get_path(self, subpath):
if self.current_job is None:
return self.run_output.get_path(subpath)
else:
return self.current_job.output.get_path(subpath)
def add_metric(self, *args, **kwargs):
if self.current_job is None:
self.run_output.add_metric(*args, **kwargs)
else:
self.current_job.output.add_metric(*args, **kwargs)
def add_artifact(self, name, path, kind, *args, **kwargs):
if self.current_job is None:
self.add_run_artifact(name, path, kind, *args, **kwargs)
else:
self.add_job_artifact(name, path, kind, *args, **kwargs)
def add_run_artifact(self, *args, **kwargs):
self.run_output.add_artifiact(*args, **kwargs)
def add_job_artifact(self, *args, **kwargs):
self.current_job.output.add_artifact(*args, **kwargs)
def get_artifact(self, name):
if self.iteration_artifacts:
for art in self.iteration_artifacts:
if art.name == name:
return art
for art in self.run_artifacts:
if art.name == name:
return art
return None
def to_pod(self):
return OrderedDict(
id=self.id,
label=self.label,
iteration=self.iteration,
status=str(self.status),
retries=0,
timestamp=self.timestamp,
)