From 8640f4f69a6423530a1c0a6cc27fdaefc592cc14 Mon Sep 17 00:00:00 2001 From: Jonathan Paynter Date: Wed, 1 Jul 2020 10:21:39 +0000 Subject: [PATCH] framework: Add serializing Job status setter When setting the job status through ExecutionContext, this change should be accompanied by an update to the state file, so that the state file accurately reflects execution state. As Jobs should not be aware of the output, this method is added to ExecutionContext, and couples setting job state with writing to the state file. --- wa/framework/execution.py | 42 +++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 7b99a99b..8f1a45eb 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -141,21 +141,24 @@ class ExecutionContext(object): self.current_job = self.job_queue.pop(0) job_output = init_job_output(self.run_output, self.current_job) self.current_job.set_output(job_output) - self.update_job_state(self.current_job) return self.current_job def end_job(self): if not self.current_job: raise RuntimeError('No jobs in progress') self.completed_jobs.append(self.current_job) - self.update_job_state(self.current_job) self.output.write_result() self.current_job = None - def set_status(self, status, force=False): + def set_status(self, status, force=False, write=True): if not self.current_job: raise RuntimeError('No jobs in progress') - self.current_job.set_status(status, force) + self.set_job_status(self.current_job, status, force, write) + + def set_job_status(self, job, status, force=False, write=True): + job.set_status(status, force) + if write: + self.run_output.write_state() def extract_results(self): self.tm.extract_results(self) @@ -163,13 +166,8 @@ class ExecutionContext(object): def move_failed(self, job): self.run_output.move_failed(job.output) - def update_job_state(self, job): - self.run_state.update_job(job) - self.run_output.write_state() - def skip_job(self, job): - job.status = Status.SKIPPED - self.run_state.update_job(job) + self.set_job_status(job, Status.SKIPPED, force=True) self.completed_jobs.append(job) def skip_remaining_jobs(self): @@ -293,7 +291,7 @@ class ExecutionContext(object): try: job.initialize(self) except WorkloadError as e: - job.set_status(Status.FAILED) + self.set_job_status(job, Status.FAILED, write=False) log.log_error(e, self.logger) failed_ids.append(job.id) @@ -303,6 +301,7 @@ class ExecutionContext(object): new_queue.append(job) self.job_queue = new_queue + self.write_state() def _load_resource_getters(self): self.logger.debug('Loading resource discoverers') @@ -557,15 +556,15 @@ class Runner(object): with signal.wrap('JOB', self, context): context.tm.start() self.do_run_job(job, context) - job.set_status(Status.OK) + context.set_job_status(job, Status.OK) except (Exception, KeyboardInterrupt) as e: # pylint: disable=broad-except log.log_error(e, self.logger) if isinstance(e, KeyboardInterrupt): context.run_interrupted = True - job.set_status(Status.ABORTED) + context.set_job_status(job, Status.ABORTED) raise e else: - job.set_status(Status.FAILED) + context.set_job_status(job, Status.FAILED) if isinstance(e, TargetNotRespondingError): raise e elif isinstance(e, TargetError): @@ -588,7 +587,7 @@ class Runner(object): self.context.skip_job(job) return - job.set_status(Status.RUNNING) + context.set_job_status(job, Status.RUNNING) self.send(signal.JOB_STARTED) job.configure_augmentations(context, self.pm) @@ -599,7 +598,7 @@ class Runner(object): try: job.setup(context) except Exception as e: - job.set_status(Status.FAILED) + context.set_job_status(job, Status.FAILED) log.log_error(e, self.logger) if isinstance(e, (TargetError, TimeoutError)): context.tm.verify_target_responsive(context) @@ -612,10 +611,10 @@ class Runner(object): job.run(context) except KeyboardInterrupt: context.run_interrupted = True - job.set_status(Status.ABORTED) + context.set_job_status(job, Status.ABORTED) raise except Exception as e: - job.set_status(Status.FAILED) + context.set_job_status(job, Status.FAILED) log.log_error(e, self.logger) if isinstance(e, (TargetError, TimeoutError)): context.tm.verify_target_responsive(context) @@ -628,7 +627,7 @@ class Runner(object): self.pm.process_job_output(context) self.pm.export_job_output(context) except Exception as e: - job.set_status(Status.PARTIAL) + context.set_job_status(job, Status.PARTIAL) if isinstance(e, (TargetError, TimeoutError)): context.tm.verify_target_responsive(context) self.context.record_ui_state('output-error') @@ -636,7 +635,7 @@ class Runner(object): except KeyboardInterrupt: context.run_interrupted = True - job.set_status(Status.ABORTED) + context.set_status(Status.ABORTED) raise finally: # If setup was successfully completed, teardown must @@ -669,8 +668,9 @@ class Runner(object): def retry_job(self, job): retry_job = Job(job.spec, job.iteration, self.context) retry_job.workload = job.workload + retry_job.state = job.state retry_job.retries = job.retries + 1 - retry_job.set_status(Status.PENDING) + self.context.set_job_status(retry_job, Status.PENDING, force=True) self.context.job_queue.insert(0, retry_job) self.send(signal.JOB_RESTARTED)