1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2024-10-06 10:51:13 +01:00
workload-automation/wa/framework/target/manager.py

384 lines
13 KiB
Python
Raw Normal View History

2017-02-21 13:51:12 +00:00
import logging
import tempfile
import threading
import os
import time
import shutil
import sys
from wa.framework import signal
from wa.framework.exception import WorkerThreadError, ConfigError
from wa.framework.plugin import Parameter
from wa.framework.target.info import TargetInfo
from wa.framework.target.runtime_config import (SysfileValuesRuntimeConfig,
HotplugRuntimeConfig,
CpufreqRuntimeConfig,
CpuidleRuntimeConfig)
from wa.utils.misc import isiterable
2017-02-21 13:51:12 +00:00
from wa.utils.serializer import json
from devlib import LocalLinuxTarget, LinuxTarget, AndroidTarget
2017-02-21 15:41:30 +00:00
from devlib.utils.types import identifier
2017-02-21 13:51:12 +00:00
# from wa.target.manager import AndroidTargetManager, LinuxTargetManager
class TargetManager(object):
2017-02-21 13:51:12 +00:00
name = 'target-manager'
description = """
Instanciated the required target and performs configuration and validation
of the device.
2017-02-21 13:51:12 +00:00
"""
parameters = [
Parameter('disconnect', kind=bool, default=False,
description="""
Specifies whether the target should be disconnected from
at the end of the run.
"""),
]
DEVICE_MAPPING = {'test' : {'platform_name':'generic',
'target_name': 'android'},
'other': {'platform_name':'test',
'target_name': 'linux'},
}
2017-02-21 13:51:12 +00:00
runtime_config_cls = [
# order matters
SysfileValuesRuntimeConfig,
HotplugRuntimeConfig,
CpufreqRuntimeConfig,
CpuidleRuntimeConfig,
]
2017-02-21 13:51:12 +00:00
def __init__(self, name, parameters):
self.name = name
2017-02-21 13:51:12 +00:00
self.target = None
self.assistant = None
self.target_name = None
2017-02-21 13:51:12 +00:00
self.platform_name = None
self.parameters = parameters
self.disconnect = parameters.get('disconnect')
self.info = TargetInfo()
# Determine platform and target based on passed name
self._parse_name()
# Create target
self._get_target()
# Create an assistant to perform target specific configuration
self._get_assistant()
### HERE FOR TESTING, WILL BE CALLED EXTERNALLY ###
# Connect to device and retrieve details.
# self.initialize()
# self.add_parameters()
# self.validate_parameters()
# self.set_parameters()
def initialize(self):
2017-02-21 13:51:12 +00:00
self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls]
# if self.parameters:
# self.logger.info('Connecting to the device')
with signal.wrap('TARGET_CONNECT'):
self.target.connect()
# self.info.load(self.target)
# info_file = os.path.join(self.context.info_directory, 'target.json')
# with open(info_file, 'w') as wfh:
# json.dump(self.info.to_pod(), wfh)
2017-02-21 13:51:12 +00:00
def finalize(self):
# self.logger.info('Disconnecting from the device')
if self.disconnect:
with signal.wrap('TARGET_DISCONNECT'):
self.target.disconnect()
def add_parameters(self, parameters=None):
if parameters:
self.parameters = parameters
if not self.parameters:
raise ConfigError('No Configuration Provided')
for name in self.parameters.keys():
for cfg in self.runtime_configs:
# if name in cfg.supported_parameters:
if any(parameter in name for parameter in cfg.supported_parameters):
cfg.add(name, self.parameters.pop(name))
def validate_parameters(self):
2017-02-21 13:51:12 +00:00
for cfg in self.runtime_configs:
cfg.validate()
def set_parameters(self):
for cfg in self.runtime_configs:
cfg.set()
def clear_parameters(self):
for cfg in self.runtime_configs:
cfg.clear()
def _parse_name(self):
# Try and get platform and target
self.name = identifier(self.name.replace('-', '_'))
if '_' in self.name:
self.platform_name, self.target_name = self.name.split('_', 1)
elif self.name in self.DEVICE_MAPPING:
self.platform_name = self.DEVICE_MAPPING[self.name]['platform_name']
self.target_name = self.DEVICE_MAPPING[self.name]['target_name']
else:
raise ConfigError('Unknown Device Specified {}'.format(self.name))
def _get_target(self):
# Create a corresponding target and target-assistant
if self.target_name == 'android':
self.target = AndroidTarget()
elif self.target_name == 'linux':
self.target = LinuxTarget() # pylint: disable=redefined-variable-type
elif self.target_name == 'localLinux':
self.target = LocalLinuxTarget()
else:
raise ConfigError('Unknown Target Specified {}'.format(self.target_name))
def _get_assistant(self):
# Create a corresponding target and target-assistant to help with platformy stuff?
if self.target_name == 'android':
2017-02-21 13:51:12 +00:00
self.assistant = AndroidAssistant(self.target)
elif self.target_name in ['linux', 'localLinux']:
2017-02-21 13:51:12 +00:00
self.assistant = LinuxAssistant(self.target) # pylint: disable=redefined-variable-type
else:
raise ConfigError('Unknown Target Specified {}'.format(self.target_name))
# def validate_runtime_parameters(self, parameters):
# for name, value in parameters.iteritems():
# self.add_parameter(name, value)
# self.validate_parameters()
# def set_runtime_parameters(self, parameters):
# # self.clear()
# for name, value in parameters.iteritems():
# self.add_parameter(name, value)
# self.set_parameters()
2017-02-21 13:51:12 +00:00
class LinuxAssistant(object):
name = 'linux-assistant'
description = """
Performs configuration, instrumentation, etc. during runs on Linux targets.
"""
def __init__(self, target, **kwargs):
self.target = target
# parameters = [
# Parameter('disconnect', kind=bool, default=False,
# description="""
# Specifies whether the target should be disconnected from
# at the end of the run.
# """),
# ]
# runtime_config_cls = [
# # order matters
# SysfileValuesRuntimeConfig,
# HotplugRuntimeConfig,
# CpufreqRuntimeConfig,
# CpuidleRuntimeConfig,
# ]
# def __init__(self, target, context, **kwargs):
# # super(LinuxTargetManager, self).__init__(target, context, **kwargs)
# self.target = target
# self.context = context
# self.info = TargetInfo()
# self.runtime_configs = [cls(target) for cls in self.runtime_config_cls]
# def __init__(self):
# # super(LinuxTargetManager, self).__init__(target, context, **kwargs)
# self.target = target
# self.info = TargetInfo()
# self.parameters = parameters
# self.info = TargetInfo()
# self.runtime_configs = [cls(target) for cls in self.runtime_config_cls]
# def initialize(self):
# # self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls]
# # if self.parameters:
# self.logger.info('Connecting to the device')
# with signal.wrap('TARGET_CONNECT'):
# self.target.connect()
# self.info.load(self.target)
# # info_file = os.path.join(self.context.info_directory, 'target.json')
# # with open(info_file, 'w') as wfh:
# # json.dump(self.info.to_pod(), wfh)
# def finalize(self, runner):
# self.logger.info('Disconnecting from the device')
# if self.disconnect:
# with signal.wrap('TARGET_DISCONNECT'):
# self.target.disconnect()
# def _add_parameters(self):
# for name, value in self.parameters.iteritems():
# self.add_parameter(name, value)
# def validate_runtime_parameters(self, parameters):
# self.clear()
# for name, value in parameters.iteritems():
# self.add_parameter(name, value)
# self.validate_parameters()
# def set_runtime_parameters(self, parameters):
# self.clear()
# for name, value in parameters.iteritems():
# self.add_parameter(name, value)
# self.set_parameters()
# def clear_parameters(self):
# for cfg in self.runtime_configs:
# cfg.clear()
# def add_parameter(self, name, value):
# for cfg in self.runtime_configs:
# if name in cfg.supported_parameters:
# cfg.add(name, value)
# return
# raise ConfigError('Unexpected runtime parameter "{}".'.format(name))
# def validate_parameters(self):
# for cfg in self.runtime_configs:
# cfg.validate()
# def set_parameters(self):
# for cfg in self.runtime_configs:
# cfg.set()
class AndroidAssistant(LinuxAssistant):
name = 'android-assistant'
description = """
Extends ``LinuxTargetManager`` with Android-specific operations.
"""
parameters = [
Parameter('logcat_poll_period', kind=int,
description="""
If specified, logcat will cached in a temporary file on the
host every ``logcat_poll_period`` seconds. This is useful for
longer job executions, where on-device logcat buffer may not be
big enough to capture output for the entire execution.
"""),
]
def __init__(self, target, **kwargs):
super(AndroidAssistant, self).__init__(target)
self.logcat_poll_period = kwargs.get('logcat_poll_period', None)
if self.logcat_poll_period:
self.logcat_poller = LogcatPoller(target, self.logcat_poll_period)
else:
self.logcat_poller = None
# def __init__(self, target, context, **kwargs):
# super(AndroidAssistant, self).__init__(target, context, **kwargs)
# self.logcat_poll_period = kwargs.get('logcat_poll_period', None)
# if self.logcat_poll_period:
# self.logcat_poller = LogcatPoller(target, self.logcat_poll_period)
# else:
# self.logcat_poller = None
# def next_job(self, job):
# super(AndroidAssistant, self).next_job(job)
# if self.logcat_poller:
# self.logcat_poller.start()
# def job_done(self, job):
# super(AndroidAssistant, self).job_done(job)
# if self.logcat_poller:
# self.logcat_poller.stop()
# outfile = os.path.join(self.context.output_directory, 'logcat.log')
# self.logger.debug('Dumping logcat to {}'.format(outfile))
# self.dump_logcat(outfile)
# self.clear()
def dump_logcat(self, outfile):
if self.logcat_poller:
self.logcat_poller.write_log(outfile)
else:
self.target.dump_logcat(outfile)
def clear_logcat(self):
if self.logcat_poller:
self.logcat_poller.clear_buffer()
class LogcatPoller(threading.Thread):
def __init__(self, target, period=60, timeout=30):
super(LogcatPoller, self).__init__()
self.target = target
self.logger = logging.getLogger('logcat')
self.period = period
self.timeout = timeout
self.stop_signal = threading.Event()
self.lock = threading.Lock()
self.buffer_file = tempfile.mktemp()
self.last_poll = 0
self.daemon = True
self.exc = None
def start(self):
self.logger.debug('starting polling')
try:
while True:
if self.stop_signal.is_set():
break
with self.lock:
current_time = time.time()
if (current_time - self.last_poll) >= self.period:
self.poll()
time.sleep(0.5)
except Exception: # pylint: disable=W0703
self.exc = WorkerThreadError(self.name, sys.exc_info())
self.logger.debug('polling stopped')
def stop(self):
self.logger.debug('Stopping logcat polling')
self.stop_signal.set()
self.join(self.timeout)
if self.is_alive():
self.logger.error('Could not join logcat poller thread.')
if self.exc:
raise self.exc # pylint: disable=E0702
def clear_buffer(self):
self.logger.debug('clearing logcat buffer')
with self.lock:
self.target.clear_logcat()
with open(self.buffer_file, 'w') as _: # NOQA
pass
def write_log(self, outfile):
with self.lock:
self.poll()
if os.path.isfile(self.buffer_file):
shutil.copy(self.buffer_file, outfile)
else: # there was no logcat trace at this time
with open(outfile, 'w') as _: # NOQA
pass
def close(self):
self.logger.debug('closing poller')
if os.path.isfile(self.buffer_file):
os.remove(self.buffer_file)
def poll(self):
self.last_poll = time.time()
self.target.dump_logcat(self.buffer_file, append=True, timeout=self.timeout)
self.target.clear_logcat()