1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-18 20:11:20 +00:00

framework: Add serializing Job status setter

When setting the job status through ExecutionContext, this change
should be accompanied by an update to the state file, so that the state
file accurately reflects execution state.

As Jobs should not be aware of the output, this method is added to
ExecutionContext, and couples setting job state with writing to the
state file.
This commit is contained in:
Jonathan Paynter 2020-07-01 10:21:39 +00:00 committed by Marc Bonnici
parent 460965363f
commit 8640f4f69a

View File

@ -141,21 +141,24 @@ class ExecutionContext(object):
self.current_job = self.job_queue.pop(0) self.current_job = self.job_queue.pop(0)
job_output = init_job_output(self.run_output, self.current_job) job_output = init_job_output(self.run_output, self.current_job)
self.current_job.set_output(job_output) self.current_job.set_output(job_output)
self.update_job_state(self.current_job)
return self.current_job return self.current_job
def end_job(self): def end_job(self):
if not self.current_job: if not self.current_job:
raise RuntimeError('No jobs in progress') raise RuntimeError('No jobs in progress')
self.completed_jobs.append(self.current_job) self.completed_jobs.append(self.current_job)
self.update_job_state(self.current_job)
self.output.write_result() self.output.write_result()
self.current_job = None self.current_job = None
def set_status(self, status, force=False): def set_status(self, status, force=False, write=True):
if not self.current_job: if not self.current_job:
raise RuntimeError('No jobs in progress') raise RuntimeError('No jobs in progress')
self.current_job.set_status(status, force) self.set_job_status(self.current_job, status, force, write)
def set_job_status(self, job, status, force=False, write=True):
job.set_status(status, force)
if write:
self.run_output.write_state()
def extract_results(self): def extract_results(self):
self.tm.extract_results(self) self.tm.extract_results(self)
@ -163,13 +166,8 @@ class ExecutionContext(object):
def move_failed(self, job): def move_failed(self, job):
self.run_output.move_failed(job.output) self.run_output.move_failed(job.output)
def update_job_state(self, job):
self.run_state.update_job(job)
self.run_output.write_state()
def skip_job(self, job): def skip_job(self, job):
job.status = Status.SKIPPED self.set_job_status(job, Status.SKIPPED, force=True)
self.run_state.update_job(job)
self.completed_jobs.append(job) self.completed_jobs.append(job)
def skip_remaining_jobs(self): def skip_remaining_jobs(self):
@ -293,7 +291,7 @@ class ExecutionContext(object):
try: try:
job.initialize(self) job.initialize(self)
except WorkloadError as e: except WorkloadError as e:
job.set_status(Status.FAILED) self.set_job_status(job, Status.FAILED, write=False)
log.log_error(e, self.logger) log.log_error(e, self.logger)
failed_ids.append(job.id) failed_ids.append(job.id)
@ -303,6 +301,7 @@ class ExecutionContext(object):
new_queue.append(job) new_queue.append(job)
self.job_queue = new_queue self.job_queue = new_queue
self.write_state()
def _load_resource_getters(self): def _load_resource_getters(self):
self.logger.debug('Loading resource discoverers') self.logger.debug('Loading resource discoverers')
@ -557,15 +556,15 @@ class Runner(object):
with signal.wrap('JOB', self, context): with signal.wrap('JOB', self, context):
context.tm.start() context.tm.start()
self.do_run_job(job, context) self.do_run_job(job, context)
job.set_status(Status.OK) context.set_job_status(job, Status.OK)
except (Exception, KeyboardInterrupt) as e: # pylint: disable=broad-except except (Exception, KeyboardInterrupt) as e: # pylint: disable=broad-except
log.log_error(e, self.logger) log.log_error(e, self.logger)
if isinstance(e, KeyboardInterrupt): if isinstance(e, KeyboardInterrupt):
context.run_interrupted = True context.run_interrupted = True
job.set_status(Status.ABORTED) context.set_job_status(job, Status.ABORTED)
raise e raise e
else: else:
job.set_status(Status.FAILED) context.set_job_status(job, Status.FAILED)
if isinstance(e, TargetNotRespondingError): if isinstance(e, TargetNotRespondingError):
raise e raise e
elif isinstance(e, TargetError): elif isinstance(e, TargetError):
@ -588,7 +587,7 @@ class Runner(object):
self.context.skip_job(job) self.context.skip_job(job)
return return
job.set_status(Status.RUNNING) context.set_job_status(job, Status.RUNNING)
self.send(signal.JOB_STARTED) self.send(signal.JOB_STARTED)
job.configure_augmentations(context, self.pm) job.configure_augmentations(context, self.pm)
@ -599,7 +598,7 @@ class Runner(object):
try: try:
job.setup(context) job.setup(context)
except Exception as e: except Exception as e:
job.set_status(Status.FAILED) context.set_job_status(job, Status.FAILED)
log.log_error(e, self.logger) log.log_error(e, self.logger)
if isinstance(e, (TargetError, TimeoutError)): if isinstance(e, (TargetError, TimeoutError)):
context.tm.verify_target_responsive(context) context.tm.verify_target_responsive(context)
@ -612,10 +611,10 @@ class Runner(object):
job.run(context) job.run(context)
except KeyboardInterrupt: except KeyboardInterrupt:
context.run_interrupted = True context.run_interrupted = True
job.set_status(Status.ABORTED) context.set_job_status(job, Status.ABORTED)
raise raise
except Exception as e: except Exception as e:
job.set_status(Status.FAILED) context.set_job_status(job, Status.FAILED)
log.log_error(e, self.logger) log.log_error(e, self.logger)
if isinstance(e, (TargetError, TimeoutError)): if isinstance(e, (TargetError, TimeoutError)):
context.tm.verify_target_responsive(context) context.tm.verify_target_responsive(context)
@ -628,7 +627,7 @@ class Runner(object):
self.pm.process_job_output(context) self.pm.process_job_output(context)
self.pm.export_job_output(context) self.pm.export_job_output(context)
except Exception as e: except Exception as e:
job.set_status(Status.PARTIAL) context.set_job_status(job, Status.PARTIAL)
if isinstance(e, (TargetError, TimeoutError)): if isinstance(e, (TargetError, TimeoutError)):
context.tm.verify_target_responsive(context) context.tm.verify_target_responsive(context)
self.context.record_ui_state('output-error') self.context.record_ui_state('output-error')
@ -636,7 +635,7 @@ class Runner(object):
except KeyboardInterrupt: except KeyboardInterrupt:
context.run_interrupted = True context.run_interrupted = True
job.set_status(Status.ABORTED) context.set_status(Status.ABORTED)
raise raise
finally: finally:
# If setup was successfully completed, teardown must # If setup was successfully completed, teardown must
@ -669,8 +668,9 @@ class Runner(object):
def retry_job(self, job): def retry_job(self, job):
retry_job = Job(job.spec, job.iteration, self.context) retry_job = Job(job.spec, job.iteration, self.context)
retry_job.workload = job.workload retry_job.workload = job.workload
retry_job.state = job.state
retry_job.retries = job.retries + 1 retry_job.retries = job.retries + 1
retry_job.set_status(Status.PENDING) self.context.set_job_status(retry_job, Status.PENDING, force=True)
self.context.job_queue.insert(0, retry_job) self.context.job_queue.insert(0, retry_job)
self.send(signal.JOB_RESTARTED) self.send(signal.JOB_RESTARTED)