1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-02-20 20:09:11 +00:00

The old Runner can die now; RIP

This commit is contained in:
Sergei Trofimov 2017-03-17 17:10:30 +00:00
parent 326ab827ed
commit 24ade78c36

View File

@ -15,27 +15,6 @@
# pylint: disable=no-member
"""
This module contains the execution logic for Workload Automation. It defines the
following actors:
WorkloadSpec: Identifies the workload to be run and defines parameters under
which it should be executed.
Executor: Responsible for the overall execution process. It instantiates
and/or intialises the other actors, does any necessary vaidation
and kicks off the whole process.
Execution Context: Provides information about the current state of run
execution to instrumentation.
RunInfo: Information about the current run.
Runner: This executes workload specs that are passed to it. It goes through
stages of execution, emitting an appropriate signal at each step to
allow instrumentation to do its stuff.
"""
import logging
import os
import random
@ -65,16 +44,6 @@ from wa.utils.misc import (ensure_directory_exists as _d, merge_config_values,
from wa.utils.serializer import json
# The maximum number of reboot attempts for an iteration.
MAX_REBOOT_ATTEMPTS = 3
# If something went wrong during device initialization, wait this
# long (in seconds) before retrying. This is necessary, as retrying
# immediately may not give the device enough time to recover to be able
# to reboot.
REBOOT_DELAY = 3
class ExecutionContext(object):
@property
@ -415,454 +384,3 @@ class Runner(object):
def __str__(self):
return 'runner'
class RunnerJob(object):
"""
Represents a single execution of a ``RunnerJobDescription``. There will be one created for each iteration
specified by ``RunnerJobDescription.number_of_iterations``.
"""
def __init__(self, spec, retry=0):
self.spec = spec
self.retry = retry
self.iteration = None
self.result = JobStatus(self.spec)
class OldRunner(object):
"""
This class is responsible for actually performing a workload automation
run. The main responsibility of this class is to emit appropriate signals
at the various stages of the run to allow things like traces an other
instrumentation to hook into the process.
This is an abstract base class that defines each step of the run, but not
the order in which those steps are executed, which is left to the concrete
derived classes.
"""
class _RunnerError(Exception):
"""Internal runner error."""
pass
@property
def config(self):
return self.context.config
@property
def current_job(self):
if self.job_queue:
return self.job_queue[0]
return None
@property
def previous_job(self):
if self.completed_jobs:
return self.completed_jobs[-1]
return None
@property
def next_job(self):
if self.job_queue:
if len(self.job_queue) > 1:
return self.job_queue[1]
return None
@property
def spec_changed(self):
if self.previous_job is None and self.current_job is not None: # Start of run
return True
if self.previous_job is not None and self.current_job is None: # End of run
return True
return self.current_job.spec.id != self.previous_job.spec.id
@property
def spec_will_change(self):
if self.current_job is None and self.next_job is not None: # Start of run
return True
if self.current_job is not None and self.next_job is None: # End of run
return True
return self.current_job.spec.id != self.next_job.spec.id
def __init__(self, device_manager, context, result_manager):
self.device_manager = device_manager
self.device = device_manager.target
self.context = context
self.result_manager = result_manager
self.logger = logging.getLogger('Runner')
self.job_queue = []
self.completed_jobs = []
self._initial_reset = True
def init_queue(self, specs):
raise NotImplementedError()
def run(self): # pylint: disable=too-many-branches
self._send(signal.RUN_START)
with signal.wrap('RUN_INIT'):
self._initialize_run()
try:
while self.job_queue:
try:
self._init_job()
self._run_job()
except KeyboardInterrupt:
self.current_job.result.status = JobStatus.ABORTED
raise
except Exception, e: # pylint: disable=broad-except
self.current_job.result.status = JobStatus.FAILED
self.current_job.result.add_event(e.message)
if isinstance(e, DeviceNotRespondingError):
self.logger.info('Device appears to be unresponsive.')
if self.context.reboot_policy.can_reboot and self.device.can('reset_power'):
self.logger.info('Attempting to hard-reset the device...')
try:
self.device.boot(hard=True)
self.device.connect()
except DeviceError: # hard_boot not implemented for the device.
raise e
else:
raise e
else: # not a DeviceNotRespondingError
self.logger.error(e)
finally:
self._finalize_job()
except KeyboardInterrupt:
self.logger.info('Got CTRL-C. Finalizing run... (CTRL-C again to abort).')
# Skip through the remaining jobs.
while self.job_queue:
self.context.next_job(self.current_job)
self.current_job.result.status = JobStatus.ABORTED
self._finalize_job()
except DeviceNotRespondingError:
self.logger.info('Device unresponsive and recovery not possible. Skipping the rest of the run.')
self.context.aborted = True
while self.job_queue:
self.context.next_job(self.current_job)
self.current_job.result.status = JobStatus.SKIPPED
self._finalize_job()
instrumentation.enable_all()
self._finalize_run()
self._process_results()
self.result_manager.finalize(self.context)
self._send(signal.RUN_END)
def _initialize_run(self):
self.context.runner = self
self.context.run_info.start_time = datetime.utcnow()
self._connect_to_device()
self.logger.info('Initializing device')
self.device_manager.initialize(self.context)
self.logger.info('Initializing workloads')
for workload_spec in self.context.config.workload_specs:
workload_spec.workload.initialize(self.context)
self.context.run_info.device_properties = self.device_manager.info
self.result_manager.initialize(self.context)
if instrumentation.check_failures():
raise InstrumentError('Detected failure(s) during instrumentation initialization.')
def _connect_to_device(self):
if self.context.reboot_policy.perform_initial_boot:
try:
self.device_manager.connect()
except DeviceError: # device may be offline
if self.device.can('reset_power'):
with self._signal_wrap('INITIAL_BOOT'):
self.device.boot(hard=True)
else:
raise DeviceError('Cannot connect to device for initial reboot; '
'and device does not support hard reset.')
else: # successfully connected
self.logger.info('\tBooting device')
with self._signal_wrap('INITIAL_BOOT'):
self._reboot_device()
else:
self.logger.info('Connecting to device')
self.device_manager.connect()
def _init_job(self):
self.current_job.result.status = JobStatus.RUNNING
self.context.next_job(self.current_job)
def _run_job(self): # pylint: disable=too-many-branches
spec = self.current_job.spec
if not spec.enabled:
self.logger.info('Skipping workload %s (iteration %s)', spec, self.context.current_iteration)
self.current_job.result.status = JobStatus.SKIPPED
return
self.logger.info('Running workload %s (iteration %s)', spec, self.context.current_iteration)
if spec.flash:
if not self.context.reboot_policy.can_reboot:
raise ConfigError('Cannot flash as reboot_policy does not permit rebooting.')
if not self.device.can('flash'):
raise DeviceError('Device does not support flashing.')
self._flash_device(spec.flash)
elif not self.completed_jobs:
# Never reboot on the very fist job of a run, as we would have done
# the initial reboot if a reboot was needed.
pass
elif self.context.reboot_policy.reboot_on_each_spec and self.spec_changed:
self.logger.debug('Rebooting on spec change.')
self._reboot_device()
elif self.context.reboot_policy.reboot_on_each_iteration:
self.logger.debug('Rebooting on iteration.')
self._reboot_device()
instrumentation.disable_all()
instrumentation.enable(spec.instrumentation)
self.device_manager.start()
if self.spec_changed:
self._send(signal.WORKLOAD_SPEC_START)
self._send(signal.ITERATION_START)
try:
setup_ok = False
with self._handle_errors('Setting up device parameters'):
self.device_manager.set_runtime_parameters(spec.runtime_parameters)
setup_ok = True
if setup_ok:
with self._handle_errors('running {}'.format(spec.workload.name)):
self.current_job.result.status = JobStatus.RUNNING
self._run_workload_iteration(spec.workload)
else:
self.logger.info('\tSkipping the rest of the iterations for this spec.')
spec.enabled = False
except KeyboardInterrupt:
self._send(signal.ITERATION_END)
self._send(signal.WORKLOAD_SPEC_END)
raise
else:
self._send(signal.ITERATION_END)
if self.spec_will_change or not spec.enabled:
self._send(signal.WORKLOAD_SPEC_END)
finally:
self.device_manager.stop()
def _finalize_job(self):
self.context.run_result.iteration_results.append(self.current_job.result)
job = self.job_queue.pop(0)
job.iteration = self.context.current_iteration
if job.result.status in self.config.retry_on_status:
if job.retry >= self.config.max_retries:
self.logger.error('Exceeded maxium number of retries. Abandoning job.')
else:
self.logger.info('Job status was {}. Retrying...'.format(job.result.status))
retry_job = RunnerJob(job.spec, job.retry + 1)
self.job_queue.insert(0, retry_job)
self.completed_jobs.append(job)
self.context.end_job()
def _finalize_run(self):
self.logger.info('Finalizing workloads')
for workload_spec in self.context.config.workload_specs:
workload_spec.workload.finalize(self.context)
self.logger.info('Finalizing.')
self._send(signal.RUN_FIN)
with self._handle_errors('Disconnecting from the device'):
self.device.disconnect()
info = self.context.run_info
info.end_time = datetime.utcnow()
info.duration = info.end_time - info.start_time
def _process_results(self):
self.logger.info('Processing overall results')
with self._signal_wrap('OVERALL_RESULTS_PROCESSING'):
if instrumentation.check_failures():
self.context.run_result.non_iteration_errors = True
self.result_manager.process_run_result(self.context.run_result, self.context)
def _run_workload_iteration(self, workload):
self.logger.info('\tSetting up')
with self._signal_wrap('WORKLOAD_SETUP'):
try:
workload.setup(self.context)
except:
self.logger.info('\tSkipping the rest of the iterations for this spec.')
self.current_job.spec.enabled = False
raise
try:
self.logger.info('\tExecuting')
with self._handle_errors('Running workload'):
with self._signal_wrap('WORKLOAD_EXECUTION'):
workload.run(self.context)
self.logger.info('\tProcessing result')
self._send(signal.BEFORE_WORKLOAD_RESULT_UPDATE)
try:
if self.current_job.result.status != JobStatus.FAILED:
with self._handle_errors('Processing workload result',
on_error_status=JobStatus.PARTIAL):
workload.update_result(self.context)
self._send(signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE)
if self.current_job.result.status == JobStatus.RUNNING:
self.current_job.result.status = JobStatus.OK
finally:
self._send(signal.AFTER_WORKLOAD_RESULT_UPDATE)
finally:
self.logger.info('\tTearing down')
with self._handle_errors('Tearing down workload',
on_error_status=JobStatus.NONCRITICAL):
with self._signal_wrap('WORKLOAD_TEARDOWN'):
workload.teardown(self.context)
self.result_manager.add_result(self.current_job.result, self.context)
def _flash_device(self, flashing_params):
with self._signal_wrap('FLASHING'):
self.device.flash(**flashing_params)
self.device.connect()
def _reboot_device(self):
with self._signal_wrap('BOOT'):
for reboot_attempts in xrange(MAX_REBOOT_ATTEMPTS):
if reboot_attempts:
self.logger.info('\tRetrying...')
with self._handle_errors('Rebooting device'):
self.device.boot(**self.current_job.spec.boot_parameters)
break
else:
raise DeviceError('Could not reboot device; max reboot attempts exceeded.')
self.device.connect()
def _send(self, s):
signal.send(s, self, self.context)
def _take_screenshot(self, filename):
if self.context.output_directory:
filepath = os.path.join(self.context.output_directory, filename)
else:
filepath = os.path.join(settings.output_directory, filename)
self.device.capture_screen(filepath)
@contextmanager
def _handle_errors(self, action, on_error_status=JobStatus.FAILED):
try:
if action is not None:
self.logger.debug(action)
yield
except (KeyboardInterrupt, DeviceNotRespondingError):
raise
except (WAError, TimeoutError), we:
self.device.check_responsive()
if self.current_job:
self.current_job.result.status = on_error_status
self.current_job.result.add_event(str(we))
try:
self._take_screenshot('error.png')
except Exception, e: # pylint: disable=W0703
# We're already in error state, so the fact that taking a
# screenshot failed is not surprising...
pass
if action:
action = action[0].lower() + action[1:]
self.logger.error('Error while {}:\n\t{}'.format(action, we))
except Exception, e: # pylint: disable=W0703
error_text = '{}("{}")'.format(e.__class__.__name__, e)
if self.current_job:
self.current_job.result.status = on_error_status
self.current_job.result.add_event(error_text)
self.logger.error('Error while {}'.format(action))
self.logger.error(error_text)
if isinstance(e, subprocess.CalledProcessError):
self.logger.error('Got:')
self.logger.error(e.output)
tb = get_traceback()
self.logger.error(tb)
@contextmanager
def _signal_wrap(self, signal_name):
"""Wraps the suite in before/after signals, ensuring
that after signal is always sent."""
before_signal = getattr(signal, 'BEFORE_' + signal_name)
success_signal = getattr(signal, 'SUCCESSFUL_' + signal_name)
after_signal = getattr(signal, 'AFTER_' + signal_name)
try:
self._send(before_signal)
yield
self._send(success_signal)
finally:
self._send(after_signal)
class BySpecRunner(Runner):
"""
This is that "classic" implementation that executes all iterations of a workload
spec before proceeding onto the next spec.
"""
def init_queue(self, specs):
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable
self.job_queue = [j for spec_jobs in jobs for j in spec_jobs]
class BySectionRunner(Runner):
"""
Runs the first iteration for all benchmarks first, before proceeding to the next iteration,
i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2...
If multiple sections where specified in the agenda, this will run all specs for the first section
followed by all specs for the seciod section, etc.
e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run
X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2
"""
def init_queue(self, specs):
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs]
self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j]
class ByIterationRunner(Runner):
"""
Runs the first iteration for all benchmarks first, before proceeding to the next iteration,
i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2...
If multiple sections where specified in the agenda, this will run all sections for the first global
spec first, followed by all sections for the second spec, etc.
e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run
X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2
"""
def init_queue(self, specs):
sections = OrderedDict()
for s in specs:
if s.section_id not in sections:
sections[s.section_id] = []
sections[s.section_id].append(s)
specs = [s for section_specs in izip_longest(*sections.values()) for s in section_specs if s]
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs]
self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j]
class RandomRunner(Runner):
"""
This will run specs in a random order.
"""
def init_queue(self, specs):
jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable
all_jobs = [j for spec_jobs in jobs for j in spec_jobs]
random.shuffle(all_jobs)
self.job_queue = all_jobs