1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-31 02:01:16 +00:00

Handle retry failed

This commit is contained in:
Sergei Trofimov 2017-03-09 17:39:44 +00:00
parent 547ae1c10e
commit ccdc3492e7
8 changed files with 159 additions and 88 deletions

View File

@ -20,9 +20,8 @@ from collections import OrderedDict, defaultdict
from wa.framework.exception import ConfigError, NotFoundError
from wa.framework.configuration.tree import SectionNode
from wa.utils.misc import (get_article, merge_config_values)
from wa.utils.types import (identifier, integer, boolean,
list_of_strings, toggle_set,
obj_dict)
from wa.utils.types import (identifier, integer, boolean, list_of_strings,
list_of, toggle_set, obj_dict, enum)
from wa.utils.serializer import is_pod
# Mapping for kind conversion; see docs for convert_types below
@ -32,17 +31,9 @@ KIND_MAP = {
dict: OrderedDict,
}
ITERATION_STATUS = [
'NOT_STARTED',
'RUNNING',
JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING',
'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED'])
'OK',
'NONCRITICAL',
'PARTIAL',
'FAILED',
'ABORTED',
'SKIPPED',
]
##########################
### CONFIG POINT TYPES ###
@ -716,9 +707,9 @@ class RunConfiguration(Configuration):
This setting defines what specific Device subclass will be used to interact
the connected device. Obviously, this must match your setup.
'''),
ConfigurationPoint('retry_on_status', kind=status_list,
ConfigurationPoint('retry_on_status', kind=list_of(JobStatus),
default=['FAILED', 'PARTIAL'],
allowed_values=ITERATION_STATUS,
allowed_values=JobStatus.values,
description='''
This is list of statuses on which a job will be cosidered to have failed and
will be automatically retried up to ``max_retries`` times. This defaults to
@ -736,10 +727,10 @@ class RunConfiguration(Configuration):
``"ABORTED"``
The user interupted the workload
'''),
ConfigurationPoint('max_retries', kind=int, default=3,
ConfigurationPoint('max_retries', kind=int, default=2,
description='''
The maximum number of times failed jobs will be retried before giving up. If
not set, this will default to ``3``.
not set.
.. note:: this number does not include the original attempt
'''),

View File

@ -4,10 +4,11 @@ from itertools import izip_longest, groupby, chain
from wa.framework import pluginloader
from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration,
JobGenerator, settings)
JobGenerator, JobStatus, settings)
from wa.framework.configuration.parsers import ConfigParser
from wa.framework.configuration.plugin_cache import PluginCache
from wa.framework.exception import NotFoundError
from wa.framework.job import Job
from wa.utils.types import enum
@ -29,59 +30,6 @@ class CombinedConfig(object):
'run_config': self.run_config.to_pod()}
JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING',
'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED'])
class Job(object):
@property
def id(self):
return self.spec.id
@property
def output_name(self):
return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration)
def __init__(self, spec, iteration, context):
self.logger = logging.getLogger('job')
self.spec = spec
self.iteration = iteration
self.context = context
self.status = JobStatus.NEW
self.workload = None
self.output = None
def load(self, target, loader=pluginloader):
self.logger.debug('Loading job {}'.format(self.id))
self.workload = loader.get_workload(self.spec.workload_name,
target,
**self.spec.workload_parameters)
self.workload.init_resources(self.context)
self.workload.validate()
self.status = JobStatus.LOADED
def initialize(self, context):
self.logger.info('Initializing job {}'.format(self.id))
self.status = JobStatus.PENDING
def configure_target(self, context):
self.logger.info('Configuring target for job {}'.format(self.id))
def setup(self, context):
self.logger.info('Setting up job {}'.format(self.id))
def run(self, context):
self.logger.info('Running job {}'.format(self.id))
def process_output(self, context):
self.looger.info('Processing output for job {}'.format(self.id))
def teardown(self, context):
self.logger.info('Tearing down job {}'.format(self.id))
def finalize(self, context):
self.logger.info('Finalizing job {}'.format(self.id))
class ConfigManager(object):
"""
Represents run-time state of WA. Mostly used as a container for loaded

View File

@ -147,6 +147,11 @@ class ExecutionContext(object):
self.completed_jobs.append(self.current_job)
self.current_job = None
def move_failed(self, job):
attempt = job.retries + 1
failed_name = '{}-attempt{:02}'.format(job.output_name, attempt)
self.output.move_failed(job.output_name, failed_name)
class OldExecutionContext(object):
"""
@ -439,8 +444,28 @@ class Runner(object):
def run_next_job(self, context):
job = context.start_job()
self.logger.info('Running job {}'.format(job.id))
try:
log.indent()
self.do_run_job(job, context)
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
raise
except Exception as e:
job.status = JobStatus.FAILED
if not getattr(e, 'logged', None):
log.log_error(e, self.logger)
e.logged = True
finally:
self.logger.info('Completing job {}'.format(job.id))
self.send(signal.JOB_COMPLETED)
context.end_job()
log.dedent()
self.check_job(job)
def do_run_job(self, job, context):
job.status = JobStatus.RUNNING
log.indent()
self.send(signal.JOB_STARTED)
with signal.wrap('JOB_TARGET_CONFIG', self):
@ -455,7 +480,7 @@ class Runner(object):
try:
with signal.wrap('JOB_OUTPUT_PROCESSED', self):
job.run(context)
job.process_output(context)
except Exception:
job.status = JobStatus.PARTIAL
raise
@ -474,10 +499,22 @@ class Runner(object):
with signal.wrap('JOB_TEARDOWN', self):
job.teardown(context)
log.dedent()
self.logger.info('Completing job {}'.format(job.id))
self.send(signal.JOB_COMPLETED)
context.end_job()
def check_job(self, job):
rc = self.context.cm.run_config
if job.status in rc.retry_on_status:
if job.retries < rc.max_retries:
msg = 'Job {} iteration {} complted with status {}. retrying...'
self.logger.error(msg.format(job.id, job.status, job.iteration))
self.context.move_failed(job)
job.retries += 1
job.status = JobStatus.PENDING
self.context.job_queue.insert(0, job)
else:
msg = 'Job {} iteration {} completed with status {}. '\
'Max retries exceeded.'
self.logger.error(msg.format(job.id, job.status, job.iteration))
else: # status not in retry_on_status
self.logger.info('Job completed with status {}'.format(job.status))
def send(self, s):
signal.send(s, self, self.context)

57
wa/framework/job.py Normal file
View File

@ -0,0 +1,57 @@
import logging
from wa.framework import pluginloader
from wa.framework.configuration.core import JobStatus
class Job(object):
@property
def id(self):
return self.spec.id
@property
def output_name(self):
return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration)
def __init__(self, spec, iteration, context):
self.logger = logging.getLogger('job')
self.spec = spec
self.iteration = iteration
self.context = context
self.status = JobStatus.NEW
self.workload = None
self.output = None
self.retries = 0
def load(self, target, loader=pluginloader):
self.logger.debug('Loading job {}'.format(self.id))
self.workload = loader.get_workload(self.spec.workload_name,
target,
**self.spec.workload_parameters)
self.workload.init_resources(self.context)
self.workload.validate()
self.status = JobStatus.LOADED
def initialize(self, context):
self.logger.info('Initializing job {}'.format(self.id))
self.status = JobStatus.PENDING
def configure_target(self, context):
self.logger.info('Configuring target for job {}'.format(self.id))
def setup(self, context):
self.logger.info('Setting up job {}'.format(self.id))
def run(self, context):
self.logger.info('Running job {}'.format(self.id))
def process_output(self, context):
self.logger.info('Processing output for job {}'.format(self.id))
def teardown(self, context):
self.logger.info('Tearing down job {}'.format(self.id))
def finalize(self, context):
self.logger.info('Finalizing job {}'.format(self.id))

View File

@ -7,11 +7,11 @@ import uuid
from copy import copy
from datetime import timedelta
from wlauto.core.configuration.configuration import JobSpec
from wlauto.core.configuration.manager import ConfigManager
from wlauto.core.device_manager import TargetInfo
from wlauto.utils.misc import touch
from wlauto.utils.serializer import write_pod, read_pod
from wa.framework.configuration.core import JobSpec
from wa.framework.configuration.manager import ConfigManager
from wa.framework.target.info import TargetInfo
from wa.utils.misc import touch, ensure_directory_exists
from wa.utils.serializer import write_pod, read_pod
logger = logging.getLogger('output')
@ -105,6 +105,11 @@ class RunOutput(object):
def raw_config_dir(self):
return os.path.join(self.metadir, 'raw_config')
@property
def failed_dir(self):
path = os.path.join(self.basepath, '__failed')
return ensure_directory_exists(path)
def __init__(self, path):
self.basepath = path
self.info = None
@ -152,6 +157,15 @@ class RunOutput(object):
pod = read_pod(self.jobsfile)
return [JobSpec.from_pod(jp) for jp in pod['jobs']]
def move_failed(self, name, failed_name):
path = os.path.join(self.basepath, name)
failed_path = os.path.join(self.failed_dir, failed_name)
if not os.path.exists(path):
raise ValueError('Path {} does not exist'.format(path))
if os.path.exists(failed_path):
raise ValueError('Path {} already exists'.format(failed_path))
shutil.move(path, failed_path)
def init_wa_output(path, wa_state, force=False):
if os.path.exists(path):

View File

@ -26,8 +26,8 @@ from itertools import chain
from copy import copy
from wa.framework.configuration.core import settings, ConfigurationPoint as Parameter
from wa.framework.exception import (NotFoundError, PluginLoaderError, ValidationError,
ConfigError, HostError)
from wa.framework.exception import (NotFoundError, PluginLoaderError,
ValidationError, ConfigError, HostError)
from wa.utils import log
from wa.utils.misc import (ensure_directory_exists as _d, walk_modules, load_class,
merge_dicts_simple, get_article)

View File

@ -22,6 +22,8 @@ import subprocess
import colorama
from devlib import DevlibError
from wa.framework import signal
from wa.framework.exception import WAError
from wa.utils.misc import get_traceback
@ -142,7 +144,7 @@ def log_error(e, logger, critical=False):
if isinstance(e, KeyboardInterrupt):
log_func('Got CTRL-C. Aborting.')
elif isinstance(e, WAError):
elif isinstance(e, WAError) or isinstance(e, DevlibError):
log_func(e)
elif isinstance(e, subprocess.CalledProcessError):
tb = get_traceback()

View File

@ -504,12 +504,16 @@ class level(object):
def __eq__(self, other):
if isinstance(other, level):
return self.value == other.value
elif isinstance(other, basestring):
return self.name == other
else:
return self.value == other
def __ne__(self, other):
if isinstance(other, level):
return self.value != other.value
elif isinstance(other, basestring):
return self.name != other
else:
return self.value != other
@ -524,21 +528,39 @@ def enum(args, start=0):
::
MyEnum = enum(['A', 'B', 'C'])
is equivalent of::
is roughly equivalent of::
class MyEnum(object):
A = 0
B = 1
C = 2
however it also implement some specialized behaviors for comparisons and
instantiation.
"""
class Enum(object):
pass
def __new__(cls, name):
for attr_name in dir(cls):
if attr_name.startswith('__'):
continue
attr = getattr(cls, attr_name)
if name == attr:
return attr
raise ValueError('Invalid enum value: {}'.format(repr(name)))
levels = []
for i, v in enumerate(args, start):
name = string.upper(identifier(v))
setattr(Enum, name, level(v, i))
lv = level(v, i)
setattr(Enum, name, lv)
levels.append(lv)
setattr(Enum, 'values', levels)
return Enum