From 0032e347fe9b6f2678f86caa9e235592b854f41e Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Mon, 27 Mar 2017 17:31:44 +0100 Subject: [PATCH] Implemeting target assistants - Workload's update_result stage has now been broken up into two parts: extract_results and update_output. This is to allow the assistant to pull output from the target in between the two stages. - Updated assistant implementations for Linux and Android targets from the exisiting code. - Extended target descriptor code to handle assistants and their parameters as well. - Updated the target manager to actually make use of the assistants. --- wa/framework/execution.py | 5 + wa/framework/instrumentation.py | 4 +- wa/framework/job.py | 7 +- wa/framework/signal.py | 15 +- wa/framework/target/assistant.py | 144 +++++++++++++++++ wa/framework/target/descriptor.py | 30 +++- wa/framework/target/manager.py | 245 ++--------------------------- wa/framework/workload.py | 12 +- wa/workloads/dhrystone/__init__.py | 3 +- 9 files changed, 219 insertions(+), 246 deletions(-) create mode 100644 wa/framework/target/assistant.py diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 12b7be8a..ebdf0145 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -138,16 +138,21 @@ class ExecutionContext(object): self.current_job = self.job_queue.pop(0) self.current_job.output = init_job_output(self.run_output, self.current_job) self.update_job_state(self.current_job) + self.tm.start() return self.current_job def end_job(self): if not self.current_job: raise RuntimeError('No jobs in progress') + self.tm.stop() self.completed_jobs.append(self.current_job) self.update_job_state(self.current_job) self.output.write_result() self.current_job = None + def extract_results(self): + self.tm.extract_results(self) + def move_failed(self, job): self.run_output.move_failed(job.output) diff --git a/wa/framework/instrumentation.py b/wa/framework/instrumentation.py index f96c0cec..3fe67de3 100644 --- a/wa/framework/instrumentation.py +++ b/wa/framework/instrumentation.py @@ -125,8 +125,8 @@ SIGNAL_MAP = OrderedDict([ ('setup', signal.BEFORE_WORKLOAD_SETUP), ('start', signal.BEFORE_WORKLOAD_EXECUTION), ('stop', signal.AFTER_WORKLOAD_EXECUTION), - ('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE), - ('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE), + ('process_workload_output', signal.SUCCESSFUL_WORKLOAD_OUTPUT_UPDATE), + ('update_result', signal.AFTER_WORKLOAD_OUTPUT_UPDATE), ('teardown', signal.AFTER_WORKLOAD_TEARDOWN), ('finalize', signal.RUN_FINALIZED), diff --git a/wa/framework/job.py b/wa/framework/job.py index 9b7bc7da..880cfff4 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -68,8 +68,11 @@ class Job(object): def process_output(self, context): self.logger.info('Processing output for job {}'.format(self.id)) - with signal.wrap('WORKLOAD_RESULT_UPDATE', self, context): - self.workload.update_result(context) + with signal.wrap('WORKLOAD_RESULT_EXTRACTION', self, context): + self.workload.extract_results(context) + context.extract_results() + with signal.wrap('WORKLOAD_OUTPUT_UPDATE', self, context): + self.workload.update_output(context) def teardown(self, context): self.logger.info('Tearing down job {}'.format(self.id)) diff --git a/wa/framework/signal.py b/wa/framework/signal.py index 09fe7b4e..20c6a0b2 100644 --- a/wa/framework/signal.py +++ b/wa/framework/signal.py @@ -97,7 +97,8 @@ JOB_FINALIZED = Signal('job-finalized') # Signals associated with particular stages of workload execution -BEFORE_WORKLOAD_INITIALIZED = Signal('before-workload-initialized', invert_priority=True) +BEFORE_WORKLOAD_INITIALIZED = Signal('before-workload-initialized', + invert_priority=True) SUCCESSFUL_WORKLOAD_INITIALIZED = Signal('successful-workload-initialized') AFTER_WORKLOAD_INITIALIZED = Signal('after-workload-initialized') @@ -109,9 +110,15 @@ BEFORE_WORKLOAD_EXECUTION = Signal('before-workload-execution', invert_priority= SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution') AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution') -BEFORE_WORKLOAD_RESULT_UPDATE = Signal('before-workload-result-update', invert_priority=True) -SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal('successful-workload-result-update') -AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update') +BEFORE_WORKLOAD_RESULT_EXTRACTION = Signal('before-workload-result-exptracton', + invert_priority=True) +SUCCESSFUL_WORKLOAD_RESULT_EXTRACTION = Signal('successful-workload-result-exptracton') +AFTER_WORKLOAD_RESULT_EXTRACTION = Signal('after-workload-result-exptracton') + +BEFORE_WORKLOAD_OUTPUT_UPDATE = Signal('before-workload-output-update', + invert_priority=True) +SUCCESSFUL_WORKLOAD_OUTPUT_UPDATE = Signal('successful-workload-output-update') +AFTER_WORKLOAD_OUTPUT_UPDATE = Signal('after-workload-output-update') BEFORE_WORKLOAD_TEARDOWN = Signal('before-workload-teardown', invert_priority=True) SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown') diff --git a/wa/framework/target/assistant.py b/wa/framework/target/assistant.py new file mode 100644 index 00000000..99411059 --- /dev/null +++ b/wa/framework/target/assistant.py @@ -0,0 +1,144 @@ +import logging +import os +import shutil +import sys +import tempfile +import threading +import time + +from wa import Parameter +from wa.framework.exception import WorkerThreadError + + +class LinuxAssistant(object): + + parameters = [] + + def __init__(self, target): + self.target = target + + def start(self): + pass + + def extract_results(self, context): + pass + + def stop(self): + pass + + +class AndroidAssistant(object): + + parameters = [ + Parameter('logcat_poll_period', kind=int, + constraint=lambda x: x > 0, + description=""" + Polling period for logcat in seconds. If not specified, + no polling will be used. + + Logcat buffer on android is of limited size and it cannot be + adjusted at run time. Depending on the amount of logging activity, + the buffer may not be enought to capture comlete trace for a + workload execution. For those situations, logcat may be polled + periodically during the course of the run and stored in a + temporary locaiton on the host. Setting the value of the poll + period enables this behavior. + """), + ] + + def __init__(self, target, logcat_poll_period=None): + self.target = target + if logcat_poll_period: + self.logcat_poller = LogcatPoller(target, logcat_poll_period) + else: + self.logcat_poller = None + + def start(self): + if self.logcat_poller: + self.logcat_poller.start() + + def stop(self): + if self.logcat_poller: + self.logcat_poller.stop() + + def extract_results(self, context): + logcat_file = os.path.join(context.output_directory, 'logcat.log') + self.dump_logcat(logcat_file) + context.add_artifact('logcat', logcat_file, kind='log') + self.clear_logcat() + + def dump_logcat(self, outfile): + if self.logcat_poller: + self.logcat_poller.write_log(outfile) + else: + self.target.dump_logcat(outfile) + + def clear_logcat(self): + if self.logcat_poller: + self.logcat_poller.clear_buffer() + + +class LogcatPoller(threading.Thread): + + def __init__(self, target, period=60, timeout=30): + super(LogcatPoller, self).__init__() + self.target = target + self.logger = logging.getLogger('logcat') + self.period = period + self.timeout = timeout + self.stop_signal = threading.Event() + self.lock = threading.Lock() + self.buffer_file = tempfile.mktemp() + self.last_poll = 0 + self.daemon = True + self.exc = None + + def start(self): + self.logger.debug('starting polling') + try: + while True: + if self.stop_signal.is_set(): + break + with self.lock: + current_time = time.time() + if (current_time - self.last_poll) >= self.period: + self.poll() + time.sleep(0.5) + except Exception: # pylint: disable=W0703 + self.exc = WorkerThreadError(self.name, sys.exc_info()) + self.logger.debug('polling stopped') + + def stop(self): + self.logger.debug('Stopping logcat polling') + self.stop_signal.set() + self.join(self.timeout) + if self.is_alive(): + self.logger.error('Could not join logcat poller thread.') + if self.exc: + raise self.exc # pylint: disable=E0702 + + def clear_buffer(self): + self.logger.debug('clearing logcat buffer') + with self.lock: + self.target.clear_logcat() + with open(self.buffer_file, 'w') as _: # NOQA + pass + + def write_log(self, outfile): + with self.lock: + self.poll() + if os.path.isfile(self.buffer_file): + shutil.copy(self.buffer_file, outfile) + else: # there was no logcat trace at this time + with open(outfile, 'w') as _: # NOQA + pass + + def close(self): + self.logger.debug('closing poller') + if os.path.isfile(self.buffer_file): + os.remove(self.buffer_file) + + def poll(self): + self.last_poll = time.time() + self.target.dump_logcat(self.buffer_file, append=True, timeout=self.timeout) + self.target.clear_logcat() diff --git a/wa/framework/target/descriptor.py b/wa/framework/target/descriptor.py index 717df67d..9e01ebac 100644 --- a/wa/framework/target/descriptor.py +++ b/wa/framework/target/descriptor.py @@ -7,6 +7,7 @@ from devlib import (LinuxTarget, AndroidTarget, LocalLinuxTarget, from wa.framework import pluginloader from wa.framework.exception import PluginLoaderError from wa.framework.plugin import Plugin, Parameter +from wa.framework.target.assistant import LinuxAssistant, AndroidAssistant from wa.utils.types import list_of_strings, list_of_ints from wa.utils.misc import isiterable @@ -28,6 +29,7 @@ def instantiate_target(tdesc, params, connect=None): target_params = {p.name: p for p in tdesc.target_params} platform_params = {p.name: p for p in tdesc.platform_params} conn_params = {p.name: p for p in tdesc.conn_params} + assistant_params = {p.name: p for p in tdesc.assistant_params} tp, pp, cp = {}, {}, {} @@ -38,6 +40,8 @@ def instantiate_target(tdesc, params, connect=None): pp[name] = value elif name in conn_params: cp[name] = value + elif name in assistant_params: + pass else: msg = 'Unexpected parameter for {}: {}' raise ValueError(msg.format(tdesc.name, name)) @@ -53,17 +57,27 @@ def instantiate_target(tdesc, params, connect=None): return tdesc.target(**tp) +def instantiate_assistant(tdesc, params, target): + assistant_params = {} + for param in tdesc.assistant_params: + if param.name in params: + assistant_params[param.name] = params[param.name] + return tdesc.assistant(target, **assistant_params) + + class TargetDescription(object): def __init__(self, name, source, description=None, target=None, platform=None, - conn=None, target_params=None, platform_params=None, - conn_params=None): + conn=None, assistant=None, target_params=None, platform_params=None, + conn_params=None, assistant_params=None): self.name = name self.source = source self.description = description self.target = target self.platform = platform self.connection = conn + self.assistant = assistant + self.assistant_params = assistant_params self._set('target_params', target_params) self._set('platform_params', platform_params) self._set('conn_params', conn_params) @@ -218,7 +232,7 @@ GEM5_PLATFORM_PARAMS = [ '''), ] -# name --> (target_class, params_list, defaults) +# name --> (target_class, params_list, defaults, assistant_class) TARGETS = { 'linux': (LinuxTarget, COMMON_TARGET_PARAMS, None), 'android': (AndroidTarget, COMMON_TARGET_PARAMS + @@ -230,6 +244,13 @@ TARGETS = { 'local': (LocalLinuxTarget, COMMON_TARGET_PARAMS, None), } +# name --> assistant +ASSISTANTS = { + 'linux': LinuxAssistant, + 'android': AndroidAssistant, + 'local': LinuxAssistant, +} + # name --> (platform_class, params_list, defaults) PLATFORMS = { 'generic': (Platform, COMMON_PLATFORM_PARAMS, None), @@ -267,6 +288,7 @@ class DefaultTargetDescriptor(TargetDescriptor): result = [] for target_name, target_tuple in TARGETS.iteritems(): target, target_params = self._get_item(target_tuple) + assistant = ASSISTANTS[target_name] for platform_name, platform_tuple in PLATFORMS.iteritems(): platform, platform_params = self._get_item(platform_tuple) @@ -274,8 +296,10 @@ class DefaultTargetDescriptor(TargetDescriptor): td = TargetDescription(name, self) td.target = target td.platform = platform + td.assistant = assistant td.target_params = target_params td.platform_params = platform_params + td.assistant_params = assistant.parameters result.append(td) return result diff --git a/wa/framework/target/manager.py b/wa/framework/target/manager.py index 043dbcb4..f1f233c9 100644 --- a/wa/framework/target/manager.py +++ b/wa/framework/target/manager.py @@ -10,7 +10,8 @@ from wa.framework import signal from wa.framework.exception import WorkerThreadError, ConfigError from wa.framework.plugin import Parameter from wa.framework.target.descriptor import (get_target_descriptions, - instantiate_target) + instantiate_target, + instantiate_assistant) from wa.framework.target.info import TargetInfo from wa.framework.target.runtime_config import (SysfileValuesRuntimeConfig, HotplugRuntimeConfig, @@ -62,7 +63,6 @@ class TargetManager(object): self.info = TargetInfo() self._init_target() - self._init_assistant() self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls] def finalize(self): @@ -83,6 +83,15 @@ class TargetManager(object): if any(parameter in name for parameter in cfg.supported_parameters): cfg.add(name, self.parameters.pop(name)) + def start(self): + self.assistant.start() + + def stop(self): + self.assistant.stop() + + def extract_results(self, context): + self.assistant.extract_results(context) + @memoized def get_target_info(self): return TargetInfo(self.target) @@ -107,238 +116,12 @@ class TargetManager(object): if self.target_name not in target_map: raise ValueError('Unknown Target: {}'.format(self.target_name)) tdesc = target_map[self.target_name] + self.target = instantiate_target(tdesc, self.parameters, connect=False) + with signal.wrap('TARGET_CONNECT'): self.target.connect() self.logger.info('Setting up target') self.target.setup() - def _init_assistant(self): - # Create a corresponding target and target-assistant to help with - # platformy stuff? - if self.target.os == 'android': - self.assistant = AndroidAssistant(self.target) - elif self.target.os == 'linux': - self.assistant = LinuxAssistant(self.target) # pylint: disable=redefined-variable-type - else: - raise ValueError('Unknown Target OS: {}'.format(self.target.os)) - - -class LinuxAssistant(object): - - name = 'linux-assistant' - - description = """ - Performs configuration, instrumentation, etc. during runs on Linux targets. - """ - - def __init__(self, target, **kwargs): - self.target = target - # parameters = [ - - # Parameter('disconnect', kind=bool, default=False, - # description=""" - # Specifies whether the target should be disconnected from - # at the end of the run. - # """), - # ] - - # runtime_config_cls = [ - # # order matters - # SysfileValuesRuntimeConfig, - # HotplugRuntimeConfig, - # CpufreqRuntimeConfig, - # CpuidleRuntimeConfig, - # ] - - # def __init__(self, target, context, **kwargs): - # # super(LinuxTargetManager, self).__init__(target, context, **kwargs) - # self.target = target - # self.context = context - # self.info = TargetInfo() - # self.runtime_configs = [cls(target) for cls in self.runtime_config_cls] - - # def __init__(self): - # # super(LinuxTargetManager, self).__init__(target, context, **kwargs) - # self.target = target - # self.info = TargetInfo() - # self.parameters = parameters - - # self.info = TargetInfo() - # self.runtime_configs = [cls(target) for cls in self.runtime_config_cls] - - # def initialize(self): - # # self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls] - # # if self.parameters: - # self.logger.info('Connecting to the device') - # with signal.wrap('TARGET_CONNECT'): - # self.target.connect() - # self.info.load(self.target) - # # info_file = os.path.join(self.context.info_directory, 'target.json') - # # with open(info_file, 'w') as wfh: - # # json.dump(self.info.to_pod(), wfh) - - # def finalize(self, runner): - # self.logger.info('Disconnecting from the device') - # if self.disconnect: - # with signal.wrap('TARGET_DISCONNECT'): - # self.target.disconnect() - - # def _add_parameters(self): - # for name, value in self.parameters.iteritems(): - # self.add_parameter(name, value) - - # def validate_runtime_parameters(self, parameters): - # self.clear() - # for name, value in parameters.iteritems(): - # self.add_parameter(name, value) - # self.validate_parameters() - - # def set_runtime_parameters(self, parameters): - # self.clear() - # for name, value in parameters.iteritems(): - # self.add_parameter(name, value) - # self.set_parameters() - - # def clear_parameters(self): - # for cfg in self.runtime_configs: - # cfg.clear() - - # def add_parameter(self, name, value): - # for cfg in self.runtime_configs: - # if name in cfg.supported_parameters: - # cfg.add(name, value) - # return - # raise ConfigError('Unexpected runtime parameter "{}".'.format(name)) - - # def validate_parameters(self): - # for cfg in self.runtime_configs: - # cfg.validate() - - # def set_parameters(self): - # for cfg in self.runtime_configs: - # cfg.set() - - -class AndroidAssistant(LinuxAssistant): - - name = 'android-assistant' - description = """ - Extends ``LinuxTargetManager`` with Android-specific operations. - """ - - parameters = [ - Parameter('logcat_poll_period', kind=int, - description=""" - If specified, logcat will cached in a temporary file on the - host every ``logcat_poll_period`` seconds. This is useful for - longer job executions, where on-device logcat buffer may not be - big enough to capture output for the entire execution. - """), - ] - - def __init__(self, target, **kwargs): - super(AndroidAssistant, self).__init__(target) - self.logcat_poll_period = kwargs.get('logcat_poll_period', None) - if self.logcat_poll_period: - self.logcat_poller = LogcatPoller(target, self.logcat_poll_period) - else: - self.logcat_poller = None - - # def __init__(self, target, context, **kwargs): - # super(AndroidAssistant, self).__init__(target, context, **kwargs) - # self.logcat_poll_period = kwargs.get('logcat_poll_period', None) - # if self.logcat_poll_period: - # self.logcat_poller = LogcatPoller(target, self.logcat_poll_period) - # else: - # self.logcat_poller = None - - # def next_job(self, job): - # super(AndroidAssistant, self).next_job(job) - # if self.logcat_poller: - # self.logcat_poller.start() - - # def job_done(self, job): - # super(AndroidAssistant, self).job_done(job) - # if self.logcat_poller: - # self.logcat_poller.stop() - # outfile = os.path.join(self.context.output_directory, 'logcat.log') - # self.logger.debug('Dumping logcat to {}'.format(outfile)) - # self.dump_logcat(outfile) - # self.clear() - - def dump_logcat(self, outfile): - if self.logcat_poller: - self.logcat_poller.write_log(outfile) - else: - self.target.dump_logcat(outfile) - - def clear_logcat(self): - if self.logcat_poller: - self.logcat_poller.clear_buffer() - - -class LogcatPoller(threading.Thread): - - def __init__(self, target, period=60, timeout=30): - super(LogcatPoller, self).__init__() - self.target = target - self.logger = logging.getLogger('logcat') - self.period = period - self.timeout = timeout - self.stop_signal = threading.Event() - self.lock = threading.Lock() - self.buffer_file = tempfile.mktemp() - self.last_poll = 0 - self.daemon = True - self.exc = None - - def start(self): - self.logger.debug('starting polling') - try: - while True: - if self.stop_signal.is_set(): - break - with self.lock: - current_time = time.time() - if (current_time - self.last_poll) >= self.period: - self.poll() - time.sleep(0.5) - except Exception: # pylint: disable=W0703 - self.exc = WorkerThreadError(self.name, sys.exc_info()) - self.logger.debug('polling stopped') - - def stop(self): - self.logger.debug('Stopping logcat polling') - self.stop_signal.set() - self.join(self.timeout) - if self.is_alive(): - self.logger.error('Could not join logcat poller thread.') - if self.exc: - raise self.exc # pylint: disable=E0702 - - def clear_buffer(self): - self.logger.debug('clearing logcat buffer') - with self.lock: - self.target.clear_logcat() - with open(self.buffer_file, 'w') as _: # NOQA - pass - - def write_log(self, outfile): - with self.lock: - self.poll() - if os.path.isfile(self.buffer_file): - shutil.copy(self.buffer_file, outfile) - else: # there was no logcat trace at this time - with open(outfile, 'w') as _: # NOQA - pass - - def close(self): - self.logger.debug('closing poller') - if os.path.isfile(self.buffer_file): - os.remove(self.buffer_file) - - def poll(self): - self.last_poll = time.time() - self.target.dump_logcat(self.buffer_file, append=True, timeout=self.timeout) - self.target.clear_logcat() + self.assistant = instantiate_assistant(tdesc, self.parameters, self.target) diff --git a/wa/framework/workload.py b/wa/framework/workload.py index 850bceca..842e9caa 100644 --- a/wa/framework/workload.py +++ b/wa/framework/workload.py @@ -66,10 +66,16 @@ class Workload(TargetedPlugin): """ pass - def update_result(self, context): + def extract_results(self, context): """ - Update the result within the specified execution context with the - metrics form this workload iteration. + Extract results on the target + """ + pass + + def update_output(self, context): + """ + Update the output within the specified execution context with the + metrics and artifacts form this workload iteration. """ pass diff --git a/wa/workloads/dhrystone/__init__.py b/wa/workloads/dhrystone/__init__.py index 106d0483..369e84bf 100644 --- a/wa/workloads/dhrystone/__init__.py +++ b/wa/workloads/dhrystone/__init__.py @@ -105,12 +105,13 @@ class Dhrystone(Workload): self.target.killall('dhrystone') raise - def update_result(self, context): + def extract_results(self, context): outfile = os.path.join(context.output_directory, 'dhrystone.output') with open(outfile, 'w') as wfh: wfh.write(self.output) context.add_artifact('dhrystone-output', outfile, 'raw', "dhrystone's stdout") + def update_output(self, context): score_count = 0 dmips_count = 0 total_score = 0