1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-02-20 20:09:11 +00:00

Implemeting target assistants

- Workload's update_result stage has now been broken up into two parts:
  extract_results and update_output. This is to allow the assistant to
  pull output from the target in between the two stages.
- Updated assistant implementations for Linux and Android targets from
  the exisiting code.
- Extended target descriptor code to handle assistants and their
  parameters as well.
- Updated  the target manager to actually make use of the assistants.
This commit is contained in:
Sergei Trofimov 2017-03-27 17:31:44 +01:00
parent 044aef2535
commit 0032e347fe
9 changed files with 219 additions and 246 deletions

View File

@ -138,16 +138,21 @@ class ExecutionContext(object):
self.current_job = self.job_queue.pop(0)
self.current_job.output = init_job_output(self.run_output, self.current_job)
self.update_job_state(self.current_job)
self.tm.start()
return self.current_job
def end_job(self):
if not self.current_job:
raise RuntimeError('No jobs in progress')
self.tm.stop()
self.completed_jobs.append(self.current_job)
self.update_job_state(self.current_job)
self.output.write_result()
self.current_job = None
def extract_results(self):
self.tm.extract_results(self)
def move_failed(self, job):
self.run_output.move_failed(job.output)

View File

@ -125,8 +125,8 @@ SIGNAL_MAP = OrderedDict([
('setup', signal.BEFORE_WORKLOAD_SETUP),
('start', signal.BEFORE_WORKLOAD_EXECUTION),
('stop', signal.AFTER_WORKLOAD_EXECUTION),
('process_workload_result', signal.SUCCESSFUL_WORKLOAD_RESULT_UPDATE),
('update_result', signal.AFTER_WORKLOAD_RESULT_UPDATE),
('process_workload_output', signal.SUCCESSFUL_WORKLOAD_OUTPUT_UPDATE),
('update_result', signal.AFTER_WORKLOAD_OUTPUT_UPDATE),
('teardown', signal.AFTER_WORKLOAD_TEARDOWN),
('finalize', signal.RUN_FINALIZED),

View File

@ -68,8 +68,11 @@ class Job(object):
def process_output(self, context):
self.logger.info('Processing output for job {}'.format(self.id))
with signal.wrap('WORKLOAD_RESULT_UPDATE', self, context):
self.workload.update_result(context)
with signal.wrap('WORKLOAD_RESULT_EXTRACTION', self, context):
self.workload.extract_results(context)
context.extract_results()
with signal.wrap('WORKLOAD_OUTPUT_UPDATE', self, context):
self.workload.update_output(context)
def teardown(self, context):
self.logger.info('Tearing down job {}'.format(self.id))

View File

@ -97,7 +97,8 @@ JOB_FINALIZED = Signal('job-finalized')
# Signals associated with particular stages of workload execution
BEFORE_WORKLOAD_INITIALIZED = Signal('before-workload-initialized', invert_priority=True)
BEFORE_WORKLOAD_INITIALIZED = Signal('before-workload-initialized',
invert_priority=True)
SUCCESSFUL_WORKLOAD_INITIALIZED = Signal('successful-workload-initialized')
AFTER_WORKLOAD_INITIALIZED = Signal('after-workload-initialized')
@ -109,9 +110,15 @@ BEFORE_WORKLOAD_EXECUTION = Signal('before-workload-execution', invert_priority=
SUCCESSFUL_WORKLOAD_EXECUTION = Signal('successful-workload-execution')
AFTER_WORKLOAD_EXECUTION = Signal('after-workload-execution')
BEFORE_WORKLOAD_RESULT_UPDATE = Signal('before-workload-result-update', invert_priority=True)
SUCCESSFUL_WORKLOAD_RESULT_UPDATE = Signal('successful-workload-result-update')
AFTER_WORKLOAD_RESULT_UPDATE = Signal('after-workload-result-update')
BEFORE_WORKLOAD_RESULT_EXTRACTION = Signal('before-workload-result-exptracton',
invert_priority=True)
SUCCESSFUL_WORKLOAD_RESULT_EXTRACTION = Signal('successful-workload-result-exptracton')
AFTER_WORKLOAD_RESULT_EXTRACTION = Signal('after-workload-result-exptracton')
BEFORE_WORKLOAD_OUTPUT_UPDATE = Signal('before-workload-output-update',
invert_priority=True)
SUCCESSFUL_WORKLOAD_OUTPUT_UPDATE = Signal('successful-workload-output-update')
AFTER_WORKLOAD_OUTPUT_UPDATE = Signal('after-workload-output-update')
BEFORE_WORKLOAD_TEARDOWN = Signal('before-workload-teardown', invert_priority=True)
SUCCESSFUL_WORKLOAD_TEARDOWN = Signal('successful-workload-teardown')

View File

@ -0,0 +1,144 @@
import logging
import os
import shutil
import sys
import tempfile
import threading
import time
from wa import Parameter
from wa.framework.exception import WorkerThreadError
class LinuxAssistant(object):
parameters = []
def __init__(self, target):
self.target = target
def start(self):
pass
def extract_results(self, context):
pass
def stop(self):
pass
class AndroidAssistant(object):
parameters = [
Parameter('logcat_poll_period', kind=int,
constraint=lambda x: x > 0,
description="""
Polling period for logcat in seconds. If not specified,
no polling will be used.
Logcat buffer on android is of limited size and it cannot be
adjusted at run time. Depending on the amount of logging activity,
the buffer may not be enought to capture comlete trace for a
workload execution. For those situations, logcat may be polled
periodically during the course of the run and stored in a
temporary locaiton on the host. Setting the value of the poll
period enables this behavior.
"""),
]
def __init__(self, target, logcat_poll_period=None):
self.target = target
if logcat_poll_period:
self.logcat_poller = LogcatPoller(target, logcat_poll_period)
else:
self.logcat_poller = None
def start(self):
if self.logcat_poller:
self.logcat_poller.start()
def stop(self):
if self.logcat_poller:
self.logcat_poller.stop()
def extract_results(self, context):
logcat_file = os.path.join(context.output_directory, 'logcat.log')
self.dump_logcat(logcat_file)
context.add_artifact('logcat', logcat_file, kind='log')
self.clear_logcat()
def dump_logcat(self, outfile):
if self.logcat_poller:
self.logcat_poller.write_log(outfile)
else:
self.target.dump_logcat(outfile)
def clear_logcat(self):
if self.logcat_poller:
self.logcat_poller.clear_buffer()
class LogcatPoller(threading.Thread):
def __init__(self, target, period=60, timeout=30):
super(LogcatPoller, self).__init__()
self.target = target
self.logger = logging.getLogger('logcat')
self.period = period
self.timeout = timeout
self.stop_signal = threading.Event()
self.lock = threading.Lock()
self.buffer_file = tempfile.mktemp()
self.last_poll = 0
self.daemon = True
self.exc = None
def start(self):
self.logger.debug('starting polling')
try:
while True:
if self.stop_signal.is_set():
break
with self.lock:
current_time = time.time()
if (current_time - self.last_poll) >= self.period:
self.poll()
time.sleep(0.5)
except Exception: # pylint: disable=W0703
self.exc = WorkerThreadError(self.name, sys.exc_info())
self.logger.debug('polling stopped')
def stop(self):
self.logger.debug('Stopping logcat polling')
self.stop_signal.set()
self.join(self.timeout)
if self.is_alive():
self.logger.error('Could not join logcat poller thread.')
if self.exc:
raise self.exc # pylint: disable=E0702
def clear_buffer(self):
self.logger.debug('clearing logcat buffer')
with self.lock:
self.target.clear_logcat()
with open(self.buffer_file, 'w') as _: # NOQA
pass
def write_log(self, outfile):
with self.lock:
self.poll()
if os.path.isfile(self.buffer_file):
shutil.copy(self.buffer_file, outfile)
else: # there was no logcat trace at this time
with open(outfile, 'w') as _: # NOQA
pass
def close(self):
self.logger.debug('closing poller')
if os.path.isfile(self.buffer_file):
os.remove(self.buffer_file)
def poll(self):
self.last_poll = time.time()
self.target.dump_logcat(self.buffer_file, append=True, timeout=self.timeout)
self.target.clear_logcat()

View File

@ -7,6 +7,7 @@ from devlib import (LinuxTarget, AndroidTarget, LocalLinuxTarget,
from wa.framework import pluginloader
from wa.framework.exception import PluginLoaderError
from wa.framework.plugin import Plugin, Parameter
from wa.framework.target.assistant import LinuxAssistant, AndroidAssistant
from wa.utils.types import list_of_strings, list_of_ints
from wa.utils.misc import isiterable
@ -28,6 +29,7 @@ def instantiate_target(tdesc, params, connect=None):
target_params = {p.name: p for p in tdesc.target_params}
platform_params = {p.name: p for p in tdesc.platform_params}
conn_params = {p.name: p for p in tdesc.conn_params}
assistant_params = {p.name: p for p in tdesc.assistant_params}
tp, pp, cp = {}, {}, {}
@ -38,6 +40,8 @@ def instantiate_target(tdesc, params, connect=None):
pp[name] = value
elif name in conn_params:
cp[name] = value
elif name in assistant_params:
pass
else:
msg = 'Unexpected parameter for {}: {}'
raise ValueError(msg.format(tdesc.name, name))
@ -53,17 +57,27 @@ def instantiate_target(tdesc, params, connect=None):
return tdesc.target(**tp)
def instantiate_assistant(tdesc, params, target):
assistant_params = {}
for param in tdesc.assistant_params:
if param.name in params:
assistant_params[param.name] = params[param.name]
return tdesc.assistant(target, **assistant_params)
class TargetDescription(object):
def __init__(self, name, source, description=None, target=None, platform=None,
conn=None, target_params=None, platform_params=None,
conn_params=None):
conn=None, assistant=None, target_params=None, platform_params=None,
conn_params=None, assistant_params=None):
self.name = name
self.source = source
self.description = description
self.target = target
self.platform = platform
self.connection = conn
self.assistant = assistant
self.assistant_params = assistant_params
self._set('target_params', target_params)
self._set('platform_params', platform_params)
self._set('conn_params', conn_params)
@ -218,7 +232,7 @@ GEM5_PLATFORM_PARAMS = [
'''),
]
# name --> (target_class, params_list, defaults)
# name --> (target_class, params_list, defaults, assistant_class)
TARGETS = {
'linux': (LinuxTarget, COMMON_TARGET_PARAMS, None),
'android': (AndroidTarget, COMMON_TARGET_PARAMS +
@ -230,6 +244,13 @@ TARGETS = {
'local': (LocalLinuxTarget, COMMON_TARGET_PARAMS, None),
}
# name --> assistant
ASSISTANTS = {
'linux': LinuxAssistant,
'android': AndroidAssistant,
'local': LinuxAssistant,
}
# name --> (platform_class, params_list, defaults)
PLATFORMS = {
'generic': (Platform, COMMON_PLATFORM_PARAMS, None),
@ -267,6 +288,7 @@ class DefaultTargetDescriptor(TargetDescriptor):
result = []
for target_name, target_tuple in TARGETS.iteritems():
target, target_params = self._get_item(target_tuple)
assistant = ASSISTANTS[target_name]
for platform_name, platform_tuple in PLATFORMS.iteritems():
platform, platform_params = self._get_item(platform_tuple)
@ -274,8 +296,10 @@ class DefaultTargetDescriptor(TargetDescriptor):
td = TargetDescription(name, self)
td.target = target
td.platform = platform
td.assistant = assistant
td.target_params = target_params
td.platform_params = platform_params
td.assistant_params = assistant.parameters
result.append(td)
return result

View File

@ -10,7 +10,8 @@ from wa.framework import signal
from wa.framework.exception import WorkerThreadError, ConfigError
from wa.framework.plugin import Parameter
from wa.framework.target.descriptor import (get_target_descriptions,
instantiate_target)
instantiate_target,
instantiate_assistant)
from wa.framework.target.info import TargetInfo
from wa.framework.target.runtime_config import (SysfileValuesRuntimeConfig,
HotplugRuntimeConfig,
@ -62,7 +63,6 @@ class TargetManager(object):
self.info = TargetInfo()
self._init_target()
self._init_assistant()
self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls]
def finalize(self):
@ -83,6 +83,15 @@ class TargetManager(object):
if any(parameter in name for parameter in cfg.supported_parameters):
cfg.add(name, self.parameters.pop(name))
def start(self):
self.assistant.start()
def stop(self):
self.assistant.stop()
def extract_results(self, context):
self.assistant.extract_results(context)
@memoized
def get_target_info(self):
return TargetInfo(self.target)
@ -107,238 +116,12 @@ class TargetManager(object):
if self.target_name not in target_map:
raise ValueError('Unknown Target: {}'.format(self.target_name))
tdesc = target_map[self.target_name]
self.target = instantiate_target(tdesc, self.parameters, connect=False)
with signal.wrap('TARGET_CONNECT'):
self.target.connect()
self.logger.info('Setting up target')
self.target.setup()
def _init_assistant(self):
# Create a corresponding target and target-assistant to help with
# platformy stuff?
if self.target.os == 'android':
self.assistant = AndroidAssistant(self.target)
elif self.target.os == 'linux':
self.assistant = LinuxAssistant(self.target) # pylint: disable=redefined-variable-type
else:
raise ValueError('Unknown Target OS: {}'.format(self.target.os))
class LinuxAssistant(object):
name = 'linux-assistant'
description = """
Performs configuration, instrumentation, etc. during runs on Linux targets.
"""
def __init__(self, target, **kwargs):
self.target = target
# parameters = [
# Parameter('disconnect', kind=bool, default=False,
# description="""
# Specifies whether the target should be disconnected from
# at the end of the run.
# """),
# ]
# runtime_config_cls = [
# # order matters
# SysfileValuesRuntimeConfig,
# HotplugRuntimeConfig,
# CpufreqRuntimeConfig,
# CpuidleRuntimeConfig,
# ]
# def __init__(self, target, context, **kwargs):
# # super(LinuxTargetManager, self).__init__(target, context, **kwargs)
# self.target = target
# self.context = context
# self.info = TargetInfo()
# self.runtime_configs = [cls(target) for cls in self.runtime_config_cls]
# def __init__(self):
# # super(LinuxTargetManager, self).__init__(target, context, **kwargs)
# self.target = target
# self.info = TargetInfo()
# self.parameters = parameters
# self.info = TargetInfo()
# self.runtime_configs = [cls(target) for cls in self.runtime_config_cls]
# def initialize(self):
# # self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls]
# # if self.parameters:
# self.logger.info('Connecting to the device')
# with signal.wrap('TARGET_CONNECT'):
# self.target.connect()
# self.info.load(self.target)
# # info_file = os.path.join(self.context.info_directory, 'target.json')
# # with open(info_file, 'w') as wfh:
# # json.dump(self.info.to_pod(), wfh)
# def finalize(self, runner):
# self.logger.info('Disconnecting from the device')
# if self.disconnect:
# with signal.wrap('TARGET_DISCONNECT'):
# self.target.disconnect()
# def _add_parameters(self):
# for name, value in self.parameters.iteritems():
# self.add_parameter(name, value)
# def validate_runtime_parameters(self, parameters):
# self.clear()
# for name, value in parameters.iteritems():
# self.add_parameter(name, value)
# self.validate_parameters()
# def set_runtime_parameters(self, parameters):
# self.clear()
# for name, value in parameters.iteritems():
# self.add_parameter(name, value)
# self.set_parameters()
# def clear_parameters(self):
# for cfg in self.runtime_configs:
# cfg.clear()
# def add_parameter(self, name, value):
# for cfg in self.runtime_configs:
# if name in cfg.supported_parameters:
# cfg.add(name, value)
# return
# raise ConfigError('Unexpected runtime parameter "{}".'.format(name))
# def validate_parameters(self):
# for cfg in self.runtime_configs:
# cfg.validate()
# def set_parameters(self):
# for cfg in self.runtime_configs:
# cfg.set()
class AndroidAssistant(LinuxAssistant):
name = 'android-assistant'
description = """
Extends ``LinuxTargetManager`` with Android-specific operations.
"""
parameters = [
Parameter('logcat_poll_period', kind=int,
description="""
If specified, logcat will cached in a temporary file on the
host every ``logcat_poll_period`` seconds. This is useful for
longer job executions, where on-device logcat buffer may not be
big enough to capture output for the entire execution.
"""),
]
def __init__(self, target, **kwargs):
super(AndroidAssistant, self).__init__(target)
self.logcat_poll_period = kwargs.get('logcat_poll_period', None)
if self.logcat_poll_period:
self.logcat_poller = LogcatPoller(target, self.logcat_poll_period)
else:
self.logcat_poller = None
# def __init__(self, target, context, **kwargs):
# super(AndroidAssistant, self).__init__(target, context, **kwargs)
# self.logcat_poll_period = kwargs.get('logcat_poll_period', None)
# if self.logcat_poll_period:
# self.logcat_poller = LogcatPoller(target, self.logcat_poll_period)
# else:
# self.logcat_poller = None
# def next_job(self, job):
# super(AndroidAssistant, self).next_job(job)
# if self.logcat_poller:
# self.logcat_poller.start()
# def job_done(self, job):
# super(AndroidAssistant, self).job_done(job)
# if self.logcat_poller:
# self.logcat_poller.stop()
# outfile = os.path.join(self.context.output_directory, 'logcat.log')
# self.logger.debug('Dumping logcat to {}'.format(outfile))
# self.dump_logcat(outfile)
# self.clear()
def dump_logcat(self, outfile):
if self.logcat_poller:
self.logcat_poller.write_log(outfile)
else:
self.target.dump_logcat(outfile)
def clear_logcat(self):
if self.logcat_poller:
self.logcat_poller.clear_buffer()
class LogcatPoller(threading.Thread):
def __init__(self, target, period=60, timeout=30):
super(LogcatPoller, self).__init__()
self.target = target
self.logger = logging.getLogger('logcat')
self.period = period
self.timeout = timeout
self.stop_signal = threading.Event()
self.lock = threading.Lock()
self.buffer_file = tempfile.mktemp()
self.last_poll = 0
self.daemon = True
self.exc = None
def start(self):
self.logger.debug('starting polling')
try:
while True:
if self.stop_signal.is_set():
break
with self.lock:
current_time = time.time()
if (current_time - self.last_poll) >= self.period:
self.poll()
time.sleep(0.5)
except Exception: # pylint: disable=W0703
self.exc = WorkerThreadError(self.name, sys.exc_info())
self.logger.debug('polling stopped')
def stop(self):
self.logger.debug('Stopping logcat polling')
self.stop_signal.set()
self.join(self.timeout)
if self.is_alive():
self.logger.error('Could not join logcat poller thread.')
if self.exc:
raise self.exc # pylint: disable=E0702
def clear_buffer(self):
self.logger.debug('clearing logcat buffer')
with self.lock:
self.target.clear_logcat()
with open(self.buffer_file, 'w') as _: # NOQA
pass
def write_log(self, outfile):
with self.lock:
self.poll()
if os.path.isfile(self.buffer_file):
shutil.copy(self.buffer_file, outfile)
else: # there was no logcat trace at this time
with open(outfile, 'w') as _: # NOQA
pass
def close(self):
self.logger.debug('closing poller')
if os.path.isfile(self.buffer_file):
os.remove(self.buffer_file)
def poll(self):
self.last_poll = time.time()
self.target.dump_logcat(self.buffer_file, append=True, timeout=self.timeout)
self.target.clear_logcat()
self.assistant = instantiate_assistant(tdesc, self.parameters, self.target)

View File

@ -66,10 +66,16 @@ class Workload(TargetedPlugin):
"""
pass
def update_result(self, context):
def extract_results(self, context):
"""
Update the result within the specified execution context with the
metrics form this workload iteration.
Extract results on the target
"""
pass
def update_output(self, context):
"""
Update the output within the specified execution context with the
metrics and artifacts form this workload iteration.
"""
pass

View File

@ -105,12 +105,13 @@ class Dhrystone(Workload):
self.target.killall('dhrystone')
raise
def update_result(self, context):
def extract_results(self, context):
outfile = os.path.join(context.output_directory, 'dhrystone.output')
with open(outfile, 'w') as wfh:
wfh.write(self.output)
context.add_artifact('dhrystone-output', outfile, 'raw', "dhrystone's stdout")
def update_output(self, context):
score_count = 0
dmips_count = 0
total_score = 0