From 24ade78c3650859507967665fe2efb151e20f9f5 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Fri, 17 Mar 2017 17:10:30 +0000 Subject: [PATCH] The old Runner can die now; RIP --- wa/framework/execution.py | 482 -------------------------------------- 1 file changed, 482 deletions(-) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 77d66a63..711c753c 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -15,27 +15,6 @@ # pylint: disable=no-member -""" -This module contains the execution logic for Workload Automation. It defines the -following actors: - - WorkloadSpec: Identifies the workload to be run and defines parameters under - which it should be executed. - - Executor: Responsible for the overall execution process. It instantiates - and/or intialises the other actors, does any necessary vaidation - and kicks off the whole process. - - Execution Context: Provides information about the current state of run - execution to instrumentation. - - RunInfo: Information about the current run. - - Runner: This executes workload specs that are passed to it. It goes through - stages of execution, emitting an appropriate signal at each step to - allow instrumentation to do its stuff. - -""" import logging import os import random @@ -65,16 +44,6 @@ from wa.utils.misc import (ensure_directory_exists as _d, merge_config_values, from wa.utils.serializer import json -# The maximum number of reboot attempts for an iteration. -MAX_REBOOT_ATTEMPTS = 3 - -# If something went wrong during device initialization, wait this -# long (in seconds) before retrying. This is necessary, as retrying -# immediately may not give the device enough time to recover to be able -# to reboot. -REBOOT_DELAY = 3 - - class ExecutionContext(object): @property @@ -415,454 +384,3 @@ class Runner(object): def __str__(self): return 'runner' - -class RunnerJob(object): - """ - Represents a single execution of a ``RunnerJobDescription``. There will be one created for each iteration - specified by ``RunnerJobDescription.number_of_iterations``. - - """ - - def __init__(self, spec, retry=0): - self.spec = spec - self.retry = retry - self.iteration = None - self.result = JobStatus(self.spec) - - -class OldRunner(object): - """ - This class is responsible for actually performing a workload automation - run. The main responsibility of this class is to emit appropriate signals - at the various stages of the run to allow things like traces an other - instrumentation to hook into the process. - - This is an abstract base class that defines each step of the run, but not - the order in which those steps are executed, which is left to the concrete - derived classes. - - """ - class _RunnerError(Exception): - """Internal runner error.""" - pass - - @property - def config(self): - return self.context.config - - @property - def current_job(self): - if self.job_queue: - return self.job_queue[0] - return None - - @property - def previous_job(self): - if self.completed_jobs: - return self.completed_jobs[-1] - return None - - @property - def next_job(self): - if self.job_queue: - if len(self.job_queue) > 1: - return self.job_queue[1] - return None - - @property - def spec_changed(self): - if self.previous_job is None and self.current_job is not None: # Start of run - return True - if self.previous_job is not None and self.current_job is None: # End of run - return True - return self.current_job.spec.id != self.previous_job.spec.id - - @property - def spec_will_change(self): - if self.current_job is None and self.next_job is not None: # Start of run - return True - if self.current_job is not None and self.next_job is None: # End of run - return True - return self.current_job.spec.id != self.next_job.spec.id - - def __init__(self, device_manager, context, result_manager): - self.device_manager = device_manager - self.device = device_manager.target - self.context = context - self.result_manager = result_manager - self.logger = logging.getLogger('Runner') - self.job_queue = [] - self.completed_jobs = [] - self._initial_reset = True - - def init_queue(self, specs): - raise NotImplementedError() - - def run(self): # pylint: disable=too-many-branches - self._send(signal.RUN_START) - with signal.wrap('RUN_INIT'): - self._initialize_run() - - try: - while self.job_queue: - try: - self._init_job() - self._run_job() - except KeyboardInterrupt: - self.current_job.result.status = JobStatus.ABORTED - raise - except Exception, e: # pylint: disable=broad-except - self.current_job.result.status = JobStatus.FAILED - self.current_job.result.add_event(e.message) - if isinstance(e, DeviceNotRespondingError): - self.logger.info('Device appears to be unresponsive.') - if self.context.reboot_policy.can_reboot and self.device.can('reset_power'): - self.logger.info('Attempting to hard-reset the device...') - try: - self.device.boot(hard=True) - self.device.connect() - except DeviceError: # hard_boot not implemented for the device. - raise e - else: - raise e - else: # not a DeviceNotRespondingError - self.logger.error(e) - finally: - self._finalize_job() - except KeyboardInterrupt: - self.logger.info('Got CTRL-C. Finalizing run... (CTRL-C again to abort).') - # Skip through the remaining jobs. - while self.job_queue: - self.context.next_job(self.current_job) - self.current_job.result.status = JobStatus.ABORTED - self._finalize_job() - except DeviceNotRespondingError: - self.logger.info('Device unresponsive and recovery not possible. Skipping the rest of the run.') - self.context.aborted = True - while self.job_queue: - self.context.next_job(self.current_job) - self.current_job.result.status = JobStatus.SKIPPED - self._finalize_job() - - instrumentation.enable_all() - self._finalize_run() - self._process_results() - - self.result_manager.finalize(self.context) - self._send(signal.RUN_END) - - def _initialize_run(self): - self.context.runner = self - self.context.run_info.start_time = datetime.utcnow() - self._connect_to_device() - self.logger.info('Initializing device') - self.device_manager.initialize(self.context) - - self.logger.info('Initializing workloads') - for workload_spec in self.context.config.workload_specs: - workload_spec.workload.initialize(self.context) - - self.context.run_info.device_properties = self.device_manager.info - self.result_manager.initialize(self.context) - - if instrumentation.check_failures(): - raise InstrumentError('Detected failure(s) during instrumentation initialization.') - - def _connect_to_device(self): - if self.context.reboot_policy.perform_initial_boot: - try: - self.device_manager.connect() - except DeviceError: # device may be offline - if self.device.can('reset_power'): - with self._signal_wrap('INITIAL_BOOT'): - self.device.boot(hard=True) - else: - raise DeviceError('Cannot connect to device for initial reboot; ' - 'and device does not support hard reset.') - else: # successfully connected - self.logger.info('\tBooting device') - with self._signal_wrap('INITIAL_BOOT'): - self._reboot_device() - else: - self.logger.info('Connecting to device') - self.device_manager.connect() - - def _init_job(self): - self.current_job.result.status = JobStatus.RUNNING - self.context.next_job(self.current_job) - - def _run_job(self): # pylint: disable=too-many-branches - spec = self.current_job.spec - if not spec.enabled: - self.logger.info('Skipping workload %s (iteration %s)', spec, self.context.current_iteration) - self.current_job.result.status = JobStatus.SKIPPED - return - - self.logger.info('Running workload %s (iteration %s)', spec, self.context.current_iteration) - if spec.flash: - if not self.context.reboot_policy.can_reboot: - raise ConfigError('Cannot flash as reboot_policy does not permit rebooting.') - if not self.device.can('flash'): - raise DeviceError('Device does not support flashing.') - self._flash_device(spec.flash) - elif not self.completed_jobs: - # Never reboot on the very fist job of a run, as we would have done - # the initial reboot if a reboot was needed. - pass - elif self.context.reboot_policy.reboot_on_each_spec and self.spec_changed: - self.logger.debug('Rebooting on spec change.') - self._reboot_device() - elif self.context.reboot_policy.reboot_on_each_iteration: - self.logger.debug('Rebooting on iteration.') - self._reboot_device() - - instrumentation.disable_all() - instrumentation.enable(spec.instrumentation) - self.device_manager.start() - - if self.spec_changed: - self._send(signal.WORKLOAD_SPEC_START) - self._send(signal.ITERATION_START) - - try: - setup_ok = False - with self._handle_errors('Setting up device parameters'): - self.device_manager.set_runtime_parameters(spec.runtime_parameters) - setup_ok = True - - if setup_ok: - with self._handle_errors('running {}'.format(spec.workload.name)): - self.current_job.result.status = JobStatus.RUNNING - self._run_workload_iteration(spec.workload) - else: - self.logger.info('\tSkipping the rest of the iterations for this spec.') - spec.enabled = False - except KeyboardInterrupt: - self._send(signal.ITERATION_END) - self._send(signal.WORKLOAD_SPEC_END) - raise - else: - self._send(signal.ITERATION_END) - if self.spec_will_change or not spec.enabled: - self._send(signal.WORKLOAD_SPEC_END) - finally: - self.device_manager.stop() - - def _finalize_job(self): - self.context.run_result.iteration_results.append(self.current_job.result) - job = self.job_queue.pop(0) - job.iteration = self.context.current_iteration - if job.result.status in self.config.retry_on_status: - if job.retry >= self.config.max_retries: - self.logger.error('Exceeded maxium number of retries. Abandoning job.') - else: - self.logger.info('Job status was {}. Retrying...'.format(job.result.status)) - retry_job = RunnerJob(job.spec, job.retry + 1) - self.job_queue.insert(0, retry_job) - self.completed_jobs.append(job) - self.context.end_job() - - def _finalize_run(self): - self.logger.info('Finalizing workloads') - for workload_spec in self.context.config.workload_specs: - workload_spec.workload.finalize(self.context) - - self.logger.info('Finalizing.') - self._send(signal.RUN_FIN) - - with self._handle_errors('Disconnecting from the device'): - self.device.disconnect() - - info = self.context.run_info - info.end_time = datetime.utcnow() - info.duration = info.end_time - info.start_time - - def _process_results(self): - self.logger.info('Processing overall results') - with self._signal_wrap('OVERALL_RESULTS_PROCESSING'): - if instrumentation.check_failures(): - self.context.run_result.non_iteration_errors = True - self.result_manager.process_run_result(self.context.run_result, self.context) - - def _run_workload_iteration(self, workload): - self.logger.info('\tSetting up') - with self._signal_wrap('WORKLOAD_SETUP'): - try: - workload.setup(self.context) - except: - self.logger.info('\tSkipping the rest of the iterations for this spec.') - self.current_job.spec.enabled = False - raise - try: - - self.logger.info('\tExecuting') - with self._handle_errors('Running workload'): - with self._signal_wrap('WORKLOAD_EXECUTION'): - workload.run(self.context) - - self.logger.info('\tProcessing result') - self._send(signal.BEFORE_WORKLOAD_RESULT_UPDATE) - try: - if self.current_job.result.status != JobStatus.FAILED: - with self._handle_errors('Processing workload result', - on_error_status=JobStatus.PARTIAL): - workload.update_result(self.context) - self._send(signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE) - - if self.current_job.result.status == JobStatus.RUNNING: - self.current_job.result.status = JobStatus.OK - finally: - self._send(signal.AFTER_WORKLOAD_RESULT_UPDATE) - - finally: - self.logger.info('\tTearing down') - with self._handle_errors('Tearing down workload', - on_error_status=JobStatus.NONCRITICAL): - with self._signal_wrap('WORKLOAD_TEARDOWN'): - workload.teardown(self.context) - self.result_manager.add_result(self.current_job.result, self.context) - - def _flash_device(self, flashing_params): - with self._signal_wrap('FLASHING'): - self.device.flash(**flashing_params) - self.device.connect() - - def _reboot_device(self): - with self._signal_wrap('BOOT'): - for reboot_attempts in xrange(MAX_REBOOT_ATTEMPTS): - if reboot_attempts: - self.logger.info('\tRetrying...') - with self._handle_errors('Rebooting device'): - self.device.boot(**self.current_job.spec.boot_parameters) - break - else: - raise DeviceError('Could not reboot device; max reboot attempts exceeded.') - self.device.connect() - - def _send(self, s): - signal.send(s, self, self.context) - - def _take_screenshot(self, filename): - if self.context.output_directory: - filepath = os.path.join(self.context.output_directory, filename) - else: - filepath = os.path.join(settings.output_directory, filename) - self.device.capture_screen(filepath) - - @contextmanager - def _handle_errors(self, action, on_error_status=JobStatus.FAILED): - try: - if action is not None: - self.logger.debug(action) - yield - except (KeyboardInterrupt, DeviceNotRespondingError): - raise - except (WAError, TimeoutError), we: - self.device.check_responsive() - if self.current_job: - self.current_job.result.status = on_error_status - self.current_job.result.add_event(str(we)) - try: - self._take_screenshot('error.png') - except Exception, e: # pylint: disable=W0703 - # We're already in error state, so the fact that taking a - # screenshot failed is not surprising... - pass - if action: - action = action[0].lower() + action[1:] - self.logger.error('Error while {}:\n\t{}'.format(action, we)) - except Exception, e: # pylint: disable=W0703 - error_text = '{}("{}")'.format(e.__class__.__name__, e) - if self.current_job: - self.current_job.result.status = on_error_status - self.current_job.result.add_event(error_text) - self.logger.error('Error while {}'.format(action)) - self.logger.error(error_text) - if isinstance(e, subprocess.CalledProcessError): - self.logger.error('Got:') - self.logger.error(e.output) - tb = get_traceback() - self.logger.error(tb) - - @contextmanager - def _signal_wrap(self, signal_name): - """Wraps the suite in before/after signals, ensuring - that after signal is always sent.""" - before_signal = getattr(signal, 'BEFORE_' + signal_name) - success_signal = getattr(signal, 'SUCCESSFUL_' + signal_name) - after_signal = getattr(signal, 'AFTER_' + signal_name) - try: - self._send(before_signal) - yield - self._send(success_signal) - finally: - self._send(after_signal) - - -class BySpecRunner(Runner): - """ - This is that "classic" implementation that executes all iterations of a workload - spec before proceeding onto the next spec. - - """ - - def init_queue(self, specs): - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable - self.job_queue = [j for spec_jobs in jobs for j in spec_jobs] - - -class BySectionRunner(Runner): - """ - Runs the first iteration for all benchmarks first, before proceeding to the next iteration, - i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2... - - If multiple sections where specified in the agenda, this will run all specs for the first section - followed by all specs for the seciod section, etc. - - e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run - - X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2 - - """ - - def init_queue(self, specs): - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] - self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j] - - -class ByIterationRunner(Runner): - """ - Runs the first iteration for all benchmarks first, before proceeding to the next iteration, - i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2, C1, C2... - - If multiple sections where specified in the agenda, this will run all sections for the first global - spec first, followed by all sections for the second spec, etc. - - e.g. given sections X and Y, and global specs A and B, with 2 iterations, this will run - - X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2 - - """ - - def init_queue(self, specs): - sections = OrderedDict() - for s in specs: - if s.section_id not in sections: - sections[s.section_id] = [] - sections[s.section_id].append(s) - specs = [s for section_specs in izip_longest(*sections.values()) for s in section_specs if s] - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] - self.job_queue = [j for spec_jobs in izip_longest(*jobs) for j in spec_jobs if j] - - -class RandomRunner(Runner): - """ - This will run specs in a random order. - - """ - - def init_queue(self, specs): - jobs = [[RunnerJob(s) for _ in xrange(s.number_of_iterations)] for s in specs] # pylint: disable=unused-variable - all_jobs = [j for spec_jobs in jobs for j in spec_jobs] - random.shuffle(all_jobs) - self.job_queue = all_jobs