diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index d274b6c0..8bd8b3e2 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -20,9 +20,8 @@ from collections import OrderedDict, defaultdict from wa.framework.exception import ConfigError, NotFoundError from wa.framework.configuration.tree import SectionNode from wa.utils.misc import (get_article, merge_config_values) -from wa.utils.types import (identifier, integer, boolean, - list_of_strings, toggle_set, - obj_dict) +from wa.utils.types import (identifier, integer, boolean, list_of_strings, + list_of, toggle_set, obj_dict, enum) from wa.utils.serializer import is_pod # Mapping for kind conversion; see docs for convert_types below @@ -32,17 +31,9 @@ KIND_MAP = { dict: OrderedDict, } -ITERATION_STATUS = [ - 'NOT_STARTED', - 'RUNNING', +JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', + 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) - 'OK', - 'NONCRITICAL', - 'PARTIAL', - 'FAILED', - 'ABORTED', - 'SKIPPED', -] ########################## ### CONFIG POINT TYPES ### @@ -716,9 +707,9 @@ class RunConfiguration(Configuration): This setting defines what specific Device subclass will be used to interact the connected device. Obviously, this must match your setup. '''), - ConfigurationPoint('retry_on_status', kind=status_list, + ConfigurationPoint('retry_on_status', kind=list_of(JobStatus), default=['FAILED', 'PARTIAL'], - allowed_values=ITERATION_STATUS, + allowed_values=JobStatus.values, description=''' This is list of statuses on which a job will be cosidered to have failed and will be automatically retried up to ``max_retries`` times. This defaults to @@ -736,10 +727,10 @@ class RunConfiguration(Configuration): ``"ABORTED"`` The user interupted the workload '''), - ConfigurationPoint('max_retries', kind=int, default=3, + ConfigurationPoint('max_retries', kind=int, default=2, description=''' The maximum number of times failed jobs will be retried before giving up. If - not set, this will default to ``3``. + not set. .. note:: this number does not include the original attempt '''), diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index dd8df4e0..4cfddd53 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -4,10 +4,11 @@ from itertools import izip_longest, groupby, chain from wa.framework import pluginloader from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration, - JobGenerator, settings) + JobGenerator, JobStatus, settings) from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache from wa.framework.exception import NotFoundError +from wa.framework.job import Job from wa.utils.types import enum @@ -29,59 +30,6 @@ class CombinedConfig(object): 'run_config': self.run_config.to_pod()} -JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', - 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) - -class Job(object): - - @property - def id(self): - return self.spec.id - - @property - def output_name(self): - return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) - - def __init__(self, spec, iteration, context): - self.logger = logging.getLogger('job') - self.spec = spec - self.iteration = iteration - self.context = context - self.status = JobStatus.NEW - self.workload = None - self.output = None - - def load(self, target, loader=pluginloader): - self.logger.debug('Loading job {}'.format(self.id)) - self.workload = loader.get_workload(self.spec.workload_name, - target, - **self.spec.workload_parameters) - self.workload.init_resources(self.context) - self.workload.validate() - self.status = JobStatus.LOADED - - def initialize(self, context): - self.logger.info('Initializing job {}'.format(self.id)) - self.status = JobStatus.PENDING - - def configure_target(self, context): - self.logger.info('Configuring target for job {}'.format(self.id)) - - def setup(self, context): - self.logger.info('Setting up job {}'.format(self.id)) - - def run(self, context): - self.logger.info('Running job {}'.format(self.id)) - - def process_output(self, context): - self.looger.info('Processing output for job {}'.format(self.id)) - - def teardown(self, context): - self.logger.info('Tearing down job {}'.format(self.id)) - - def finalize(self, context): - self.logger.info('Finalizing job {}'.format(self.id)) - class ConfigManager(object): """ Represents run-time state of WA. Mostly used as a container for loaded diff --git a/wa/framework/execution.py b/wa/framework/execution.py index cedc5714..45b78a53 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -147,6 +147,11 @@ class ExecutionContext(object): self.completed_jobs.append(self.current_job) self.current_job = None + def move_failed(self, job): + attempt = job.retries + 1 + failed_name = '{}-attempt{:02}'.format(job.output_name, attempt) + self.output.move_failed(job.output_name, failed_name) + class OldExecutionContext(object): """ @@ -439,8 +444,28 @@ class Runner(object): def run_next_job(self, context): job = context.start_job() self.logger.info('Running job {}'.format(job.id)) + + try: + log.indent() + self.do_run_job(job, context) + except KeyboardInterrupt: + job.status = JobStatus.ABORTED + raise + except Exception as e: + job.status = JobStatus.FAILED + if not getattr(e, 'logged', None): + log.log_error(e, self.logger) + e.logged = True + finally: + self.logger.info('Completing job {}'.format(job.id)) + self.send(signal.JOB_COMPLETED) + context.end_job() + + log.dedent() + self.check_job(job) + + def do_run_job(self, job, context): job.status = JobStatus.RUNNING - log.indent() self.send(signal.JOB_STARTED) with signal.wrap('JOB_TARGET_CONFIG', self): @@ -455,7 +480,7 @@ class Runner(object): try: with signal.wrap('JOB_OUTPUT_PROCESSED', self): - job.run(context) + job.process_output(context) except Exception: job.status = JobStatus.PARTIAL raise @@ -474,10 +499,22 @@ class Runner(object): with signal.wrap('JOB_TEARDOWN', self): job.teardown(context) - log.dedent() - self.logger.info('Completing job {}'.format(job.id)) - self.send(signal.JOB_COMPLETED) - context.end_job() + def check_job(self, job): + rc = self.context.cm.run_config + if job.status in rc.retry_on_status: + if job.retries < rc.max_retries: + msg = 'Job {} iteration {} complted with status {}. retrying...' + self.logger.error(msg.format(job.id, job.status, job.iteration)) + self.context.move_failed(job) + job.retries += 1 + job.status = JobStatus.PENDING + self.context.job_queue.insert(0, job) + else: + msg = 'Job {} iteration {} completed with status {}. '\ + 'Max retries exceeded.' + self.logger.error(msg.format(job.id, job.status, job.iteration)) + else: # status not in retry_on_status + self.logger.info('Job completed with status {}'.format(job.status)) def send(self, s): signal.send(s, self, self.context) diff --git a/wa/framework/job.py b/wa/framework/job.py new file mode 100644 index 00000000..598d6072 --- /dev/null +++ b/wa/framework/job.py @@ -0,0 +1,57 @@ +import logging + +from wa.framework import pluginloader +from wa.framework.configuration.core import JobStatus + + +class Job(object): + + @property + def id(self): + return self.spec.id + + @property + def output_name(self): + return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) + + def __init__(self, spec, iteration, context): + self.logger = logging.getLogger('job') + self.spec = spec + self.iteration = iteration + self.context = context + self.status = JobStatus.NEW + self.workload = None + self.output = None + self.retries = 0 + + def load(self, target, loader=pluginloader): + self.logger.debug('Loading job {}'.format(self.id)) + self.workload = loader.get_workload(self.spec.workload_name, + target, + **self.spec.workload_parameters) + self.workload.init_resources(self.context) + self.workload.validate() + self.status = JobStatus.LOADED + + def initialize(self, context): + self.logger.info('Initializing job {}'.format(self.id)) + self.status = JobStatus.PENDING + + def configure_target(self, context): + self.logger.info('Configuring target for job {}'.format(self.id)) + + def setup(self, context): + self.logger.info('Setting up job {}'.format(self.id)) + + def run(self, context): + self.logger.info('Running job {}'.format(self.id)) + + def process_output(self, context): + self.logger.info('Processing output for job {}'.format(self.id)) + + def teardown(self, context): + self.logger.info('Tearing down job {}'.format(self.id)) + + def finalize(self, context): + self.logger.info('Finalizing job {}'.format(self.id)) + diff --git a/wa/framework/output.py b/wa/framework/output.py index c8d5dd4a..16853539 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -7,11 +7,11 @@ import uuid from copy import copy from datetime import timedelta -from wlauto.core.configuration.configuration import JobSpec -from wlauto.core.configuration.manager import ConfigManager -from wlauto.core.device_manager import TargetInfo -from wlauto.utils.misc import touch -from wlauto.utils.serializer import write_pod, read_pod +from wa.framework.configuration.core import JobSpec +from wa.framework.configuration.manager import ConfigManager +from wa.framework.target.info import TargetInfo +from wa.utils.misc import touch, ensure_directory_exists +from wa.utils.serializer import write_pod, read_pod logger = logging.getLogger('output') @@ -105,6 +105,11 @@ class RunOutput(object): def raw_config_dir(self): return os.path.join(self.metadir, 'raw_config') + @property + def failed_dir(self): + path = os.path.join(self.basepath, '__failed') + return ensure_directory_exists(path) + def __init__(self, path): self.basepath = path self.info = None @@ -152,6 +157,15 @@ class RunOutput(object): pod = read_pod(self.jobsfile) return [JobSpec.from_pod(jp) for jp in pod['jobs']] + def move_failed(self, name, failed_name): + path = os.path.join(self.basepath, name) + failed_path = os.path.join(self.failed_dir, failed_name) + if not os.path.exists(path): + raise ValueError('Path {} does not exist'.format(path)) + if os.path.exists(failed_path): + raise ValueError('Path {} already exists'.format(failed_path)) + shutil.move(path, failed_path) + def init_wa_output(path, wa_state, force=False): if os.path.exists(path): diff --git a/wa/framework/plugin.py b/wa/framework/plugin.py index 9f06de5e..f6ef4e2e 100644 --- a/wa/framework/plugin.py +++ b/wa/framework/plugin.py @@ -26,8 +26,8 @@ from itertools import chain from copy import copy from wa.framework.configuration.core import settings, ConfigurationPoint as Parameter -from wa.framework.exception import (NotFoundError, PluginLoaderError, ValidationError, - ConfigError, HostError) +from wa.framework.exception import (NotFoundError, PluginLoaderError, + ValidationError, ConfigError, HostError) from wa.utils import log from wa.utils.misc import (ensure_directory_exists as _d, walk_modules, load_class, merge_dicts_simple, get_article) diff --git a/wa/utils/log.py b/wa/utils/log.py index 567943e5..906116e6 100644 --- a/wa/utils/log.py +++ b/wa/utils/log.py @@ -22,6 +22,8 @@ import subprocess import colorama +from devlib import DevlibError + from wa.framework import signal from wa.framework.exception import WAError from wa.utils.misc import get_traceback @@ -142,7 +144,7 @@ def log_error(e, logger, critical=False): if isinstance(e, KeyboardInterrupt): log_func('Got CTRL-C. Aborting.') - elif isinstance(e, WAError): + elif isinstance(e, WAError) or isinstance(e, DevlibError): log_func(e) elif isinstance(e, subprocess.CalledProcessError): tb = get_traceback() diff --git a/wa/utils/types.py b/wa/utils/types.py index 87071205..fed3004f 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -504,12 +504,16 @@ class level(object): def __eq__(self, other): if isinstance(other, level): return self.value == other.value + elif isinstance(other, basestring): + return self.name == other else: return self.value == other def __ne__(self, other): if isinstance(other, level): return self.value != other.value + elif isinstance(other, basestring): + return self.name != other else: return self.value != other @@ -524,21 +528,39 @@ def enum(args, start=0): :: MyEnum = enum(['A', 'B', 'C']) - is equivalent of:: + is roughly equivalent of:: class MyEnum(object): A = 0 B = 1 C = 2 + however it also implement some specialized behaviors for comparisons and + instantiation. + """ class Enum(object): - pass + def __new__(cls, name): + for attr_name in dir(cls): + if attr_name.startswith('__'): + continue + + attr = getattr(cls, attr_name) + if name == attr: + return attr + + raise ValueError('Invalid enum value: {}'.format(repr(name))) + + levels = [] for i, v in enumerate(args, start): name = string.upper(identifier(v)) - setattr(Enum, name, level(v, i)) + lv = level(v, i) + setattr(Enum, name, lv) + levels.append(lv) + + setattr(Enum, 'values', levels) return Enum