diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 0e0bbf73..7321a32e 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -34,7 +34,7 @@ KIND_MAP = { Status = enum(['UNKNOWN', 'NEW', 'PENDING', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', - 'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) + 'OK', 'PARTIAL', 'FAILED', 'ABORTED', 'SKIPPED']) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index d4afe8c1..d012fa3f 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -32,6 +32,7 @@ from wa.framework.configuration.core import settings, Status from wa.framework.exception import (WAError, ConfigError, TimeoutError, InstrumentError, TargetError, HostError, TargetNotRespondingError) +from wa.framework.job import Job from wa.framework.output import init_job_output from wa.framework.plugin import Artifact from wa.framework.processor import ProcessorManager @@ -155,10 +156,10 @@ class ExecutionContext(object): self.output.write_result() self.current_job = None - def set_status(self, status): + def set_status(self, status, force=False): if not self.current_job: raise RuntimeError('No jobs in progress') - self.current_job.status = Status(status) + self.current_job.set_status(status, force) def extract_results(self): self.tm.extract_results(self) @@ -391,12 +392,12 @@ class Runner(object): try: log.indent() self.do_run_job(job, context) - job.status = Status.OK + job.set_status(Status.OK) except KeyboardInterrupt: - job.status = Status.ABORTED + job.set_status(Status.ABORTED) raise except Exception as e: - job.status = Status.FAILED + job.set_status(Status.FAILED) context.add_event(e.message) if not getattr(e, 'logged', None): log.log_error(e, self.logger) @@ -410,7 +411,7 @@ class Runner(object): self.check_job(job) def do_run_job(self, job, context): - job.status = Status.RUNNING + job.set_status(Status.RUNNING) self.send(signal.JOB_STARTED) with signal.wrap('JOB_TARGET_CONFIG', self): @@ -429,15 +430,15 @@ class Runner(object): self.pm.process_job_output(context) self.pm.export_job_output(context) except Exception: - job.status = Status.PARTIAL + job.set_status(Status.PARTIAL) raise except KeyboardInterrupt: - job.status = Status.ABORTED + job.set_status(Status.ABORTED) self.logger.info('Got CTRL-C. Aborting.') raise except Exception as e: - job.status = Status.FAILED + job.set_status(Status.FAILED) if not getattr(e, 'logged', None): log.log_error(e, self.logger) e.logged = True @@ -454,19 +455,24 @@ class Runner(object): if job.retries < rc.max_retries: msg = 'Job {} iteration {} completed with status {}. retrying...' self.logger.error(msg.format(job.id, job.status, job.iteration)) + self.retry_job(job) self.context.move_failed(job) - job.retries += 1 - job.status = Status.PENDING - self.context.job_queue.insert(0, job) self.context.write_state() else: msg = 'Job {} iteration {} completed with status {}. '\ 'Max retries exceeded.' - self.logger.error(msg.format(job.id, job.status, job.iteration)) + self.logger.error(msg.format(job.id, job.iteration, job.status)) self.context.failed_jobs += 1 else: # status not in retry_on_status self.logger.info('Job completed with status {}'.format(job.status)) self.context.successful_jobs += 1 + + def retry_job(self, job): + retry_job = Job(job.spec, job.iteration, self.context) + retry_job.workload = job.workload + retry_job.retries = job.retries + 1 + retry_job.set_status(Status.PENDING) + self.context.job_queue.insert(0, retry_job) def send(self, s): signal.send(s, self, self.context) diff --git a/wa/framework/instrumentation.py b/wa/framework/instrumentation.py index 009a2446..b3ce49af 100644 --- a/wa/framework/instrumentation.py +++ b/wa/framework/instrumentation.py @@ -104,7 +104,8 @@ from collections import OrderedDict from wa.framework import signal from wa.framework.plugin import Plugin -from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError +from wa.framework.exception import (WAError, TargetNotRespondingError, TimeoutError, + WorkloadError) from wa.utils.log import log_error from wa.utils.misc import get_traceback, isiterable from wa.utils.types import identifier, enum, level @@ -250,7 +251,7 @@ def check_failures(): class ManagedCallback(object): """ - This wraps instruments' callbacks to ensure that errors do interfer + This wraps instruments' callbacks to ensure that errors do not interfer with run execution. """ @@ -270,7 +271,11 @@ class ManagedCallback(object): global failures_detected # pylint: disable=W0603 failures_detected = True log_error(e, logger) - disable(self.instrument) + context.add_event(e.message) + if isinstance(e, WorkloadError): + context.set_status('FAILED') + else: + context.set_status('PARTIAL') # Need this to keep track of callbacks, because the dispatcher only keeps diff --git a/wa/framework/job.py b/wa/framework/job.py index d3c15090..24acb69f 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -58,7 +58,7 @@ class Job(object): self.logger.info('Initializing job {}'.format(self.id)) with signal.wrap('WORKLOAD_INITIALIZED', self, context): self.workload.initialize(context) - self.status = Status.PENDING + self.set_status(Status.PENDING) context.update_job_state(self) def configure_target(self, context): @@ -96,3 +96,8 @@ class Job(object): self.logger.info('Finalizing job {}'.format(self.id)) with signal.wrap('WORKLOAD_FINALIZED', self, context): self.workload.finalize(context) + + def set_status(self, status, force=False): + status = Status(status) + if force or self.status < status: + self.status = status