diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 7b99a99b..8f1a45eb 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -141,21 +141,24 @@ class ExecutionContext(object): self.current_job = self.job_queue.pop(0) job_output = init_job_output(self.run_output, self.current_job) self.current_job.set_output(job_output) - self.update_job_state(self.current_job) return self.current_job def end_job(self): if not self.current_job: raise RuntimeError('No jobs in progress') self.completed_jobs.append(self.current_job) - self.update_job_state(self.current_job) self.output.write_result() self.current_job = None - def set_status(self, status, force=False): + def set_status(self, status, force=False, write=True): if not self.current_job: raise RuntimeError('No jobs in progress') - self.current_job.set_status(status, force) + self.set_job_status(self.current_job, status, force, write) + + def set_job_status(self, job, status, force=False, write=True): + job.set_status(status, force) + if write: + self.run_output.write_state() def extract_results(self): self.tm.extract_results(self) @@ -163,13 +166,8 @@ class ExecutionContext(object): def move_failed(self, job): self.run_output.move_failed(job.output) - def update_job_state(self, job): - self.run_state.update_job(job) - self.run_output.write_state() - def skip_job(self, job): - job.status = Status.SKIPPED - self.run_state.update_job(job) + self.set_job_status(job, Status.SKIPPED, force=True) self.completed_jobs.append(job) def skip_remaining_jobs(self): @@ -293,7 +291,7 @@ class ExecutionContext(object): try: job.initialize(self) except WorkloadError as e: - job.set_status(Status.FAILED) + self.set_job_status(job, Status.FAILED, write=False) log.log_error(e, self.logger) failed_ids.append(job.id) @@ -303,6 +301,7 @@ class ExecutionContext(object): new_queue.append(job) self.job_queue = new_queue + self.write_state() def _load_resource_getters(self): self.logger.debug('Loading resource discoverers') @@ -557,15 +556,15 @@ class Runner(object): with signal.wrap('JOB', self, context): context.tm.start() self.do_run_job(job, context) - job.set_status(Status.OK) + context.set_job_status(job, Status.OK) except (Exception, KeyboardInterrupt) as e: # pylint: disable=broad-except log.log_error(e, self.logger) if isinstance(e, KeyboardInterrupt): context.run_interrupted = True - job.set_status(Status.ABORTED) + context.set_job_status(job, Status.ABORTED) raise e else: - job.set_status(Status.FAILED) + context.set_job_status(job, Status.FAILED) if isinstance(e, TargetNotRespondingError): raise e elif isinstance(e, TargetError): @@ -588,7 +587,7 @@ class Runner(object): self.context.skip_job(job) return - job.set_status(Status.RUNNING) + context.set_job_status(job, Status.RUNNING) self.send(signal.JOB_STARTED) job.configure_augmentations(context, self.pm) @@ -599,7 +598,7 @@ class Runner(object): try: job.setup(context) except Exception as e: - job.set_status(Status.FAILED) + context.set_job_status(job, Status.FAILED) log.log_error(e, self.logger) if isinstance(e, (TargetError, TimeoutError)): context.tm.verify_target_responsive(context) @@ -612,10 +611,10 @@ class Runner(object): job.run(context) except KeyboardInterrupt: context.run_interrupted = True - job.set_status(Status.ABORTED) + context.set_job_status(job, Status.ABORTED) raise except Exception as e: - job.set_status(Status.FAILED) + context.set_job_status(job, Status.FAILED) log.log_error(e, self.logger) if isinstance(e, (TargetError, TimeoutError)): context.tm.verify_target_responsive(context) @@ -628,7 +627,7 @@ class Runner(object): self.pm.process_job_output(context) self.pm.export_job_output(context) except Exception as e: - job.set_status(Status.PARTIAL) + context.set_job_status(job, Status.PARTIAL) if isinstance(e, (TargetError, TimeoutError)): context.tm.verify_target_responsive(context) self.context.record_ui_state('output-error') @@ -636,7 +635,7 @@ class Runner(object): except KeyboardInterrupt: context.run_interrupted = True - job.set_status(Status.ABORTED) + context.set_status(Status.ABORTED) raise finally: # If setup was successfully completed, teardown must @@ -669,8 +668,9 @@ class Runner(object): def retry_job(self, job): retry_job = Job(job.spec, job.iteration, self.context) retry_job.workload = job.workload + retry_job.state = job.state retry_job.retries = job.retries + 1 - retry_job.set_status(Status.PENDING) + self.context.set_job_status(retry_job, Status.PENDING, force=True) self.context.job_queue.insert(0, retry_job) self.send(signal.JOB_RESTARTED)