From 4a001713bb72a197a5ccc7ef8b2ebe8a034ec0e2 Mon Sep 17 00:00:00 2001 From: Brendan Jackman Date: Mon, 23 Oct 2017 16:38:59 +0100 Subject: [PATCH 1/2] framework/execution: Factor out skip_job method This also fixes the missing housekeeping when skipping a job due to phones_home --- wa/framework/execution.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 4cf7c950..9cdb51b5 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -174,12 +174,15 @@ class ExecutionContext(object): self.run_state.update_job(job) self.run_output.write_state() + def skip_job(self, job): + job.status = Status.SKIPPED + self.run_state.update_job(job) + self.completed_jobs.append(job) + def skip_remaining_jobs(self): while self.job_queue: job = self.job_queue.pop(0) - job.status = Status.SKIPPED - self.run_state.update_job(job) - self.completed_jobs.append(job) + self.skip_job(job) self.write_state() def write_state(self): @@ -421,7 +424,7 @@ class Runner(object): if job.workload.phones_home and not rc.allow_phone_home: self.logger.warning('Skipping job {} ({}) due to allow_phone_home=False' .format(job.id, job.workload.name)) - job.set_status(Status.SKIPPED) + self.context.skip_job(job) return job.set_status(Status.RUNNING) From 9fa1b133dc528894d2c31e19731d2ca51dce5a90 Mon Sep 17 00:00:00 2001 From: Brendan Jackman Date: Mon, 23 Oct 2017 16:41:28 +0100 Subject: [PATCH 2/2] framework: Add bail_on_init_failure run configuration This maintains the default behaviour of bailing out immediately if any workload fails in initialize(), but adds a setting, bail_on_init_failure, to change this behaviour optionally. This can be useful where WA is being used more as a batch processor. --- wa/framework/configuration/core.py | 13 ++++++++++++ wa/framework/execution.py | 33 +++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index cfe5ea65..d1ad1eaf 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -779,6 +779,19 @@ class RunConfiguration(Configuration): .. note:: this number does not include the original attempt ''', ), + ConfigurationPoint( + 'bail_on_init_failure', + kind=bool, + default=True, + description=''' + When jobs fail during their main setup and run phases, WA will + continue attempting to run the remaining jobs. However, by default, + if they fail during their early initialization phase, the entire run + will end without continuing to run jobs. Setting this to ``False`` + means that WA will instead skip all the jobs from the job spec that + failed, but continue attempting to run others. + ''' + ), ConfigurationPoint( 'result_processors', kind=toggle_set, diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 9cdb51b5..d60f3165 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -31,7 +31,7 @@ from wa.framework import instrumentation, pluginloader from wa.framework.configuration.core import settings, Status from wa.framework.exception import (WAError, ConfigError, TimeoutError, InstrumentError, TargetError, HostError, - TargetNotRespondingError) + TargetNotRespondingError, WorkloadError) from wa.framework.job import Job from wa.framework.output import init_job_output from wa.framework.plugin import Artifact @@ -229,6 +229,34 @@ class ExecutionContext(object): def add_event(self, message): self.output.add_event(message) + def initialize_jobs(self): + new_queue = [] + failed_ids = [] + for job in self.job_queue: + if job.id in failed_ids: + # Don't try to initialize a job if another job with the same ID + # (i.e. same job spec) has failed - we can assume it will fail + # too. + self.skip_job(job) + continue + + try: + job.initialize(self) + except WorkloadError as e: + job.set_status(Status.FAILED) + self.add_event(e.message) + if not getattr(e, 'logged', None): + log.log_error(e, self.logger) + e.logged = True + failed_ids.append(job.id) + + if self.cm.run_config.bail_on_init_failure: + raise + else: + new_queue.append(job) + + self.job_queue = new_queue + class Executor(object): """ @@ -378,8 +406,7 @@ class Runner(object): self.context.start_run() self.pm.initialize() log.indent() - for job in self.context.job_queue: - job.initialize(self.context) + self.context.initialize_jobs() log.dedent() self.context.write_state()