From 6eb5c3681d5fbeb3f45e63f1603ae78e5c2ad4a1 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Mon, 6 Mar 2017 11:10:25 +0000 Subject: [PATCH 1/5] New target description + moving target stuff under "framework" Changing the way target descriptions work from a static mapping to something that is dynamically generated and is extensible via plugins. Also moving core target implementation stuff under "framework". --- wa/framework/configuration/core.py | 6 +- wa/framework/configuration/parsers.py | 1 - wa/framework/configuration/plugin_cache.py | 85 ++++++---------- wa/framework/execution.py | 28 +++++- wa/framework/output.py | 10 +- wa/framework/target/descriptor.py | 41 -------- wa/framework/target/info.py | 8 +- wa/framework/target/manager.py | 108 ++++++++++++++------- wa/utils/log.py | 2 +- 9 files changed, 140 insertions(+), 149 deletions(-) diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 8d11ceb5..c79df8b8 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -638,8 +638,7 @@ class RunConfiguration(Configuration): name = "Run Configuration" - # Metadata is separated out because it is not loaded into the auto - # generated config file + # Metadata is separated out because it is not loaded into the auto generated config file meta_data = [ ConfigurationPoint('run_name', kind=str, description=''' @@ -918,8 +917,7 @@ class JobSpec(Configuration): except NotFoundError: global_runtime_params = {} for source in plugin_cache.sources: - if source in global_runtime_params: - runtime_parameters[source] = global_runtime_params[source] + runtime_parameters[source] = global_runtime_params[source] # Add runtime parameters from JobSpec for source, values in self.to_merge['runtime_parameters'].iteritems(): diff --git a/wa/framework/configuration/parsers.py b/wa/framework/configuration/parsers.py index 70f50857..df6d019e 100644 --- a/wa/framework/configuration/parsers.py +++ b/wa/framework/configuration/parsers.py @@ -32,7 +32,6 @@ class ConfigParser(object): def load(self, state, raw, source, wrap_exceptions=True): # pylint: disable=too-many-branches try: - state.plugin_cache.add_source(source) if 'run_name' in raw: msg = '"run_name" can only be specified in the config '\ 'section of an agenda' diff --git a/wa/framework/configuration/plugin_cache.py b/wa/framework/configuration/plugin_cache.py index d8a3f8e8..bfabb97c 100644 --- a/wa/framework/configuration/plugin_cache.py +++ b/wa/framework/configuration/plugin_cache.py @@ -181,74 +181,47 @@ class PluginCache(object): :rtype: A fully merged and validated configuration in the form of a obj_dict. """ - ms = MergeState() - ms.generic_name = generic_name - ms.specific_name = specific_name - ms.generic_config = copy(self.plugin_configs[generic_name]) - ms.specific_config = copy(self.plugin_configs[specific_name]) - ms.cfg_points = self.get_plugin_parameters(specific_name) + generic_config = copy(self.plugin_configs[generic_name]) + specific_config = copy(self.plugin_configs[specific_name]) + cfg_points = self.get_plugin_parameters(specific_name) sources = self.sources + seen_specific_config = defaultdict(list) # set_value uses the 'name' attribute of the passed object in it error # messages, to ensure these messages make sense the name will have to be # changed several times during this function. final_config.name = specific_name + # pylint: disable=too-many-nested-blocks for source in sources: try: - update_config_from_source(final_config, source, ms) + if source in generic_config: + final_config.name = generic_name + for name, cfg_point in cfg_points.iteritems(): + if name in generic_config[source]: + if name in seen_specific_config: + msg = ('"{generic_name}" configuration "{config_name}" has already been ' + 'specified more specifically for {specific_name} in:\n\t\t{sources}') + msg = msg.format(generic_name=generic_name, + config_name=name, + specific_name=specific_name, + sources=", ".join(seen_specific_config[name])) + raise ConfigError(msg) + value = generic_config[source][name] + cfg_point.set_value(final_config, value, check_mandatory=False) + + if source in specific_config: + final_config.name = specific_name + for name, cfg_point in cfg_points.iteritems(): + if name in specific_config[source]: + seen_specific_config[name].append(str(source)) + value = specific_config[source][name] + cfg_point.set_value(final_config, value, check_mandatory=False) + except ConfigError as e: raise ConfigError('Error in "{}":\n\t{}'.format(source, str(e))) # Validate final configuration final_config.name = specific_name - for cfg_point in ms.cfg_points.itervalues(): + for cfg_point in cfg_points.itervalues(): cfg_point.validate(final_config) - - -class MergeState(object): - - def __init__(self): - self.generic_name = None - self.specific_name = None - self.generic_config = None - self.specific_config = None - self.cfg_points = None - self.seen_specific_config = defaultdict(list) - - -def update_config_from_source(final_config, source, state): - if source in state.generic_config: - final_config.name = state.generic_name - for name, cfg_point in state.cfg_points.iteritems(): - if name in state.generic_config[source]: - if name in state.seen_specific_config: - msg = ('"{generic_name}" configuration "{config_name}" has ' - 'already been specified more specifically for ' - '{specific_name} in:\n\t\t{sources}') - seen_sources = state.seen_specific_config[name] - msg = msg.format(generic_name=generic_name, - config_name=name, - specific_name=specific_name, - sources=", ".join(seen_sources)) - raise ConfigError(msg) - value = state.generic_config[source].pop(name) - cfg_point.set_value(final_config, value, check_mandatory=False) - - if state.generic_config[source]: - msg = 'Unexected values for {}: {}' - raise ConfigError(msg.format(state.generic_name, - state.generic_config[source])) - - if source in state.specific_config: - final_config.name = state.specific_name - for name, cfg_point in state.cfg_points.iteritems(): - if name in state.specific_config[source]: - seen_state.specific_config[name].append(str(source)) - value = state.specific_config[source].pop(name) - cfg_point.set_value(final_config, value, check_mandatory=False) - - if state.specific_config[source]: - msg = 'Unexected values for {}: {}' - raise ConfigError(msg.format(state.specific_name, - state.specific_config[source])) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 699f6494..a5c79714 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -57,7 +57,6 @@ from wa.framework.exception import (WAError, ConfigError, TimeoutError, from wa.framework.plugin import Artifact from wa.framework.resource import ResourceResolver from wa.framework.target.info import TargetInfo -from wa.framework.target.manager import TargetManager from wa.utils.misc import (ensure_directory_exists as _d, get_traceback, format_duration) from wa.utils.serializer import json @@ -229,6 +228,30 @@ def _check_artifact_path(path, rootpath): return full_path +class FakeTargetManager(object): + # TODO: this is a FAKE + + def __init__(self, name, config): + self.device_name = name + self.device_config = config + + from devlib import LocalLinuxTarget + self.target = LocalLinuxTarget({'unrooted': True}) + + def get_target_info(self): + return TargetInfo(self.target) + + def validate_runtime_parameters(self, params): + pass + + def merge_runtime_parameters(self, params): + pass + + +def init_target_manager(config): + return FakeTargetManager(config.device, config.device_config) + + class Executor(object): """ The ``Executor``'s job is to set up the execution context and pass to a @@ -274,8 +297,7 @@ class Executor(object): output.write_config(config) self.logger.info('Connecting to target') - target_manager = TargetManager(config.run_config.device, - config.run_config.device_config) + target_manager = init_target_manager(config.run_config) output.write_target_info(target_manager.get_target_info()) self.logger.info('Initializing execution conetext') diff --git a/wa/framework/output.py b/wa/framework/output.py index 07912bb4..77d5853e 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -6,11 +6,11 @@ import sys import uuid from copy import copy -from wa.framework.configuration.core import JobSpec -from wa.framework.configuration.manager import ConfigManager -from wa.framework.target.info import TargetInfo -from wa.utils.misc import touch -from wa.utils.serializer import write_pod, read_pod +from wlauto.core.configuration.configuration import JobSpec +from wlauto.core.configuration.manager import ConfigManager +from wlauto.core.device_manager import TargetInfo +from wlauto.utils.misc import touch +from wlauto.utils.serializer import write_pod, read_pod logger = logging.getLogger('output') diff --git a/wa/framework/target/descriptor.py b/wa/framework/target/descriptor.py index c1252289..34966367 100644 --- a/wa/framework/target/descriptor.py +++ b/wa/framework/target/descriptor.py @@ -24,35 +24,6 @@ def get_target_descriptions(loader=pluginloader): return targets.values() -def instantiate_target(tdesc, params, connect=None): - target_params = {p.name: p for p in tdesc.target_params} - platform_params = {p.name: p for p in tdesc.platform_params} - conn_params = {p.name: p for p in tdesc.conn_params} - - tp, pp, cp = {}, {}, {} - - for name, value in params.iteritems(): - if name in target_params: - tp[name] = value - elif name in platform_params: - pp[name] = value - elif name in conn_params: - cp[name] = value - else: - msg = 'Unexpected parameter for {}: {}' - raise ValueError(msg.format(tdesc.name, name)) - - tp['platform'] = (tdesc.platform or Platform)(**pp) - if cp: - tp['connection_settings'] = cp - if tdesc.connection: - tp['conn_cls'] = tdesc.connection - if connect is not None: - tp['connect'] = connect - - return tdesc.target(**tp) - - class TargetDescription(object): def __init__(self, name, source, description=None, target=None, platform=None, @@ -115,18 +86,6 @@ COMMON_TARGET_PARAMS = [ Please see ``devlab`` documentation for information on the available modules. '''), - Parameter('load_default_modules', kind=bool, default=True, - description=''' - A number of modules (e.g. for working with the cpufreq subsystem) are - loaded by default when a Target is instantiated. Setting this to - ``True`` would suppress that, ensuring that only the base Target - interface is initialized. - - You may want to set this if there is a problem with one or more default - modules on your platform (e.g. your device is unrooted and cpufreq is - not accessible to unprivileged users), or if Target initialization is - taking too long for your platform. - '''), ] COMMON_PLATFORM_PARAMS = [ diff --git a/wa/framework/target/info.py b/wa/framework/target/info.py index f3e40119..4341e155 100644 --- a/wa/framework/target/info.py +++ b/wa/framework/target/info.py @@ -1,7 +1,6 @@ from devlib import AndroidTarget from devlib.exception import TargetError from devlib.target import KernelConfig, KernelVersion, Cpuinfo -from devlib.utils.android import AndroidProperties class TargetInfo(object): @@ -22,9 +21,8 @@ class TargetInfo(object): if pod["target"] == "AndroidTarget": instance.screen_resolution = pod['screen_resolution'] - instance.prop = AndroidProperties('') - instance.prop._properties = pod['prop'] - instance.android_id = pod['android_id'] + instance.prop = pod['prop'] + instance.prop = pod['android_id'] return instance @@ -74,7 +72,7 @@ class TargetInfo(object): if self.target == "AndroidTarget": pod['screen_resolution'] = self.screen_resolution - pod['prop'] = self.prop._properties + pod['prop'] = self.prop pod['android_id'] = self.android_id return pod diff --git a/wa/framework/target/manager.py b/wa/framework/target/manager.py index 545178e6..659516d6 100644 --- a/wa/framework/target/manager.py +++ b/wa/framework/target/manager.py @@ -9,8 +9,6 @@ import sys from wa.framework import signal from wa.framework.exception import WorkerThreadError, ConfigError from wa.framework.plugin import Parameter -from wa.framework.target.descriptor import (get_target_descriptions, - instantiate_target) from wa.framework.target.info import TargetInfo from wa.framework.target.runtime_config import (SysfileValuesRuntimeConfig, HotplugRuntimeConfig, @@ -43,26 +41,54 @@ class TargetManager(object): """), ] + DEVICE_MAPPING = {'test' : {'platform_name':'generic', + 'target_name': 'android'}, + 'other': {'platform_name':'test', + 'target_name': 'linux'}, + } + runtime_config_cls = [ - # order matters - SysfileValuesRuntimeConfig, - HotplugRuntimeConfig, - CpufreqRuntimeConfig, - CpuidleRuntimeConfig, - ] + # order matters + SysfileValuesRuntimeConfig, + HotplugRuntimeConfig, + CpufreqRuntimeConfig, + CpuidleRuntimeConfig, + ] def __init__(self, name, parameters): - self.target_name = name + self.name = name self.target = None self.assistant = None + self.target_name = None self.platform_name = None self.parameters = parameters self.disconnect = parameters.get('disconnect') self.info = TargetInfo() - self._init_target() - self._init_assistant() + # Determine platform and target based on passed name + self._parse_name() + # Create target + self._get_target() + # Create an assistant to perform target specific configuration + self._get_assistant() + + ### HERE FOR TESTING, WILL BE CALLED EXTERNALLY ### + # Connect to device and retrieve details. + # self.initialize() + # self.add_parameters() + # self.validate_parameters() + # self.set_parameters() + + def initialize(self): self.runtime_configs = [cls(self.target) for cls in self.runtime_config_cls] + # if self.parameters: + # self.logger.info('Connecting to the device') + with signal.wrap('TARGET_CONNECT'): + self.target.connect() + # self.info.load(self.target) + # info_file = os.path.join(self.context.info_directory, 'target.json') + # with open(info_file, 'w') as wfh: + # json.dump(self.info.to_pod(), wfh) def finalize(self): # self.logger.info('Disconnecting from the device') @@ -82,16 +108,10 @@ class TargetManager(object): if any(parameter in name for parameter in cfg.supported_parameters): cfg.add(name, self.parameters.pop(name)) - def get_target_info(self): - return TargetInfo(self.target) - - def validate_runtime_parameters(self, params): + def validate_parameters(self): for cfg in self.runtime_configs: cfg.validate() - def merge_runtime_parameters(self, params): - pass - def set_parameters(self): for cfg in self.runtime_configs: cfg.set() @@ -100,25 +120,47 @@ class TargetManager(object): for cfg in self.runtime_configs: cfg.clear() - def _init_target(self): - target_map = {td.name: td for td in get_target_descriptions()} - if self.target_name not in target_map: - raise ValueError('Unknown Target: {}'.format(self.target_name)) - tdesc = target_map[self.target_name] - self.target = instantiate_target(tdesc, self.parameters, connect=False) - with signal.wrap('TARGET_CONNECT'): - self.target.connect() - self.target.setup() + def _parse_name(self): + # Try and get platform and target + self.name = identifier(self.name.replace('-', '_')) + if '_' in self.name: + self.platform_name, self.target_name = self.name.split('_', 1) + elif self.name in self.DEVICE_MAPPING: + self.platform_name = self.DEVICE_MAPPING[self.name]['platform_name'] + self.target_name = self.DEVICE_MAPPING[self.name]['target_name'] + else: + raise ConfigError('Unknown Device Specified {}'.format(self.name)) - def _init_assistant(self): - # Create a corresponding target and target-assistant to help with - # platformy stuff? - if self.target.os == 'android': + def _get_target(self): + # Create a corresponding target and target-assistant + if self.target_name == 'android': + self.target = AndroidTarget() + elif self.target_name == 'linux': + self.target = LinuxTarget() # pylint: disable=redefined-variable-type + elif self.target_name == 'localLinux': + self.target = LocalLinuxTarget() + else: + raise ConfigError('Unknown Target Specified {}'.format(self.target_name)) + + def _get_assistant(self): + # Create a corresponding target and target-assistant to help with platformy stuff? + if self.target_name == 'android': self.assistant = AndroidAssistant(self.target) - elif self.target.os == 'linux': + elif self.target_name in ['linux', 'localLinux']: self.assistant = LinuxAssistant(self.target) # pylint: disable=redefined-variable-type else: - raise ValueError('Unknown Target OS: {}'.format(self.target.os)) + raise ConfigError('Unknown Target Specified {}'.format(self.target_name)) + + # def validate_runtime_parameters(self, parameters): + # for name, value in parameters.iteritems(): + # self.add_parameter(name, value) + # self.validate_parameters() + + # def set_runtime_parameters(self, parameters): + # # self.clear() + # for name, value in parameters.iteritems(): + # self.add_parameter(name, value) + # self.set_parameters() class LinuxAssistant(object): diff --git a/wa/utils/log.py b/wa/utils/log.py index 8dbe5f20..567943e5 100644 --- a/wa/utils/log.py +++ b/wa/utils/log.py @@ -78,7 +78,7 @@ def set_level(level): def add_file(filepath, level=logging.DEBUG, - fmt='%(asctime)s %(levelname)-8s %(name)s: %(message)-10.10s'): + fmt='%(asctime)s %(levelname)-8s %(name)10.10s: %(message)s'): root_logger = logging.getLogger() file_handler = logging.FileHandler(filepath) file_handler.setLevel(level) From 011fd684bdee096a7dc158df96c58a0d948b80e3 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Thu, 9 Mar 2017 14:44:26 +0000 Subject: [PATCH 2/5] Skeleton job execution --- setup.py | 7 +- wa/__init__.py | 15 +- wa/framework/configuration/core.py | 2 +- wa/framework/configuration/execution.py | 52 +++- wa/framework/entrypoint.py | 24 +- wa/framework/execution.py | 195 +++++++++--- wa/framework/instrumentation.py | 2 +- wa/framework/output.py | 8 + wa/framework/plugin.py | 2 +- wa/framework/resource.py | 2 +- wa/framework/signal.py | 39 ++- wa/instrumentation/__init__.py | 0 wa/instrumentation/misc/__init__.py | 391 ++++++++++++++++++++++++ wa/utils/types.py | 67 ++++ 14 files changed, 714 insertions(+), 92 deletions(-) create mode 100644 wa/instrumentation/__init__.py create mode 100644 wa/instrumentation/misc/__init__.py diff --git a/setup.py b/setup.py index 0b9bdf04..4a9b43dd 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ scripts = [os.path.join('scripts', s) for s in os.listdir('scripts')] params = dict( name='wlauto', - description='A framework for automating workload execution and measurment collection on ARM devices.', + description='A framework for automating workload execution and measurement collection on ARM devices.', version=get_wa_version(), packages=packages, package_data=data_files, @@ -72,13 +72,14 @@ params = dict( maintainer_email='workload-automation@arm.com', install_requires=[ 'python-dateutil', # converting between UTC and local time. - 'pexpect>=3.3', # Send/recieve to/from device + 'pexpect>=3.3', # Send/receive to/from device 'pyserial', # Serial port interface 'colorama', # Printing with colors 'pyYAML', # YAML-formatted agenda parsing 'requests', # Fetch assets over HTTP 'devlib', # Interacting with devices - 'louie' # callbacks dispatch + 'louie', # callbacks dispatch + 'wrapt', # better decorators ], extras_require={ 'other': ['jinja2', 'pandas>=0.13.1'], diff --git a/wa/__init__.py b/wa/__init__.py index 262984be..575c5963 100644 --- a/wa/__init__.py +++ b/wa/__init__.py @@ -1,11 +1,12 @@ from wa.framework import pluginloader, log, signal -from wa.framework.configuration import settings -from wa.framework.plugin import Plugin, Parameter from wa.framework.command import Command -from wa.framework.workload import Workload - -from wa.framework.exception import WAError, NotFoundError, ValidationError, WorkloadError +from wa.framework.configuration import settings from wa.framework.exception import HostError, JobError, InstrumentError, ConfigError -from wa.framework.exception import ResultProcessorError, ResourceError, CommandError, ToolError +from wa.framework.exception import (ResultProcessorError, ResourceError, + CommandError, ToolError) +from wa.framework.exception import (WAError, NotFoundError, ValidationError, + WorkloadError) from wa.framework.exception import WorkerThreadError, PluginLoaderError - +from wa.framework.instrumentation import Instrument +from wa.framework.plugin import Plugin, Parameter +from wa.framework.workload import Workload diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index c79df8b8..82237ce6 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -555,7 +555,7 @@ class MetaConfiguration(Configuration): plugin_packages = [ 'wa.commands', 'wa.workloads', - #'wa.instrumentation', + 'wa.instrumentation', #'wa.result_processors', #'wa.managers', 'wa.framework.target.descriptor', diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 442adf21..032d9e52 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -1,4 +1,5 @@ import random +import logging from itertools import izip_longest, groupby, chain from wa.framework import pluginloader @@ -6,6 +7,8 @@ from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration JobGenerator, settings) from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache +from wa.framework.exception import NotFoundError +from wa.utils.types import enum class CombinedConfig(object): @@ -26,33 +29,54 @@ class CombinedConfig(object): 'run_config': self.run_config.to_pod()} -class JobStatus: - PENDING = 0 - RUNNING = 1 - OK = 2 - FAILED = 3 - PARTIAL = 4 - ABORTED = 5 - PASSED = 6 - +JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', + 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) class Job(object): + @property + def id(self): + return self.spec.id + def __init__(self, spec, iteration, context): + self.logger = logging.getLogger('job') self.spec = spec self.iteration = iteration self.context = context - self.status = 'new' + self.status = JobStatus.NEW self.workload = None self.output = None def load(self, target, loader=pluginloader): + self.logger.debug('Loading job {}'.format(self.id)) self.workload = loader.get_workload(self.spec.workload_name, target, **self.spec.workload_parameters) self.workload.init_resources(self.context) self.workload.validate() + self.status = JobStatus.LOADED + def initialize(self, context): + self.logger.info('Initializing job {}'.format(self.id)) + self.status = JobStatus.PENDING + + def configure_target(self, context): + self.logger.info('Configuring target for job {}'.format(self.id)) + + def setup(self, context): + self.logger.info('Setting up job {}'.format(self.id)) + + def run(self, context): + self.logger.info('Running job {}'.format(self.id)) + + def process_output(self, context): + self.looger.info('Processing output for job {}'.format(self.id)) + + def teardown(self, context): + self.logger.info('Tearing down job {}'.format(self.id)) + + def finalize(self, context): + self.logger.info('Finalizing job {}'.format(self.id)) class ConfigManager(object): """ @@ -108,8 +132,12 @@ class ConfigManager(object): def get_instruments(self, target): instruments = [] for name in self.enabled_instruments: - instruments.append(self.get_plugin(name, kind='instrument', - target=target)) + try: + instruments.append(self.get_plugin(name, kind='instrument', + target=target)) + except NotFoundError: + msg = 'Instrument "{}" not found' + raise NotFoundError(msg.format(name)) return instruments def finalize(self): diff --git a/wa/framework/entrypoint.py b/wa/framework/entrypoint.py index 3e73b910..db72b687 100644 --- a/wa/framework/entrypoint.py +++ b/wa/framework/entrypoint.py @@ -85,27 +85,7 @@ def main(): except KeyboardInterrupt: logging.info('Got CTRL-C. Aborting.') sys.exit(3) - except (WAError, DevlibError) as e: - logging.critical(e) - sys.exit(1) - except subprocess.CalledProcessError as e: - tb = get_traceback() - logging.critical(tb) - command = e.cmd - if e.args: - command = '{} {}'.format(command, ' '.join(e.args)) - message = 'Command \'{}\' returned non-zero exit status {}\nOUTPUT:\n{}\n' - logging.critical(message.format(command, e.returncode, e.output)) - sys.exit(2) - except SyntaxError as e: - tb = get_traceback() - logging.critical(tb) - message = 'Syntax Error in {}, line {}, offset {}:' - logging.critical(message.format(e.filename, e.lineno, e.offset)) - logging.critical('\t{}'.format(e.msg)) - sys.exit(2) except Exception as e: # pylint: disable=broad-except - tb = get_traceback() - logging.critical(tb) - logging.critical('{}({})'.format(e.__class__.__name__, e)) + if not getattr(e, 'logged', None): + log.log_error(e, logger) sys.exit(2) diff --git a/wa/framework/execution.py b/wa/framework/execution.py index a5c79714..3b562fad 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -57,6 +57,8 @@ from wa.framework.exception import (WAError, ConfigError, TimeoutError, from wa.framework.plugin import Artifact from wa.framework.resource import ResourceResolver from wa.framework.target.info import TargetInfo +from wa.framework.target.manager import TargetManager +from wa.utils import log from wa.utils.misc import (ensure_directory_exists as _d, get_traceback, format_duration) from wa.utils.serializer import json @@ -74,15 +76,70 @@ REBOOT_DELAY = 3 class ExecutionContext(object): + @property + def previous_job(self): + if not self.job_queue: + return None + return self.job_queue[0] + + @property + def next_job(self): + if not self.completed_jobs: + return None + return self.completed_jobs[-1] + + @property + def spec_changed(self): + if self.previous_job is None and self.current_job is not None: # Start of run + return True + if self.previous_job is not None and self.current_job is None: # End of run + return True + return self.current_job.spec.id != self.previous_job.spec.id + + @property + def spec_will_change(self): + if self.current_job is None and self.next_job is not None: # Start of run + return True + if self.current_job is not None and self.next_job is None: # End of run + return True + return self.current_job.spec.id != self.next_job.spec.id + def __init__(self, cm, tm, output): - self.logger = logging.getLogger('ExecContext') + self.logger = logging.getLogger('context') self.cm = cm self.tm = tm self.output = output self.logger.debug('Loading resource discoverers') self.resolver = ResourceResolver(cm) self.resolver.load() + self.job_queue = None + self.completed_jobs = None + self.current_job = None + + def start_run(self): + self.output.info.start_time = datetime.now() + self.output.write_info() + self.job_queue = copy(self.cm.jobs) + self.completed_jobs = [] + + def end_run(self): + self.output.info.end_time = datetime.now() + self.output.info.duration = self.output.info.end_time -\ + self.output.info.start_time + self.output.write_info() + + def start_job(self): + if not self.job_queue: + raise RuntimeError('No jobs to run') + self.current_job = self.job_queue.pop(0) + return self.current_job + + def end_job(self): + if not self.current_job: + raise RuntimeError('No jobs in progress') + self.completed_jobs.append(self.current_job) + self.current_job = None class OldExecutionContext(object): @@ -147,22 +204,6 @@ class OldExecutionContext(object): self.job_iteration_counts = defaultdict(int) self.aborted = False self.runner = None - if config.agenda.filepath: - self.run_artifacts.append(Artifact('agenda', - os.path.join(self.host_working_directory, - os.path.basename(config.agenda.filepath)), - 'meta', - mandatory=True, - description='Agenda for this run.')) - for i, filepath in enumerate(settings.config_paths, 1): - name = 'config_{}'.format(i) - path = os.path.join(self.host_working_directory, - name + os.path.splitext(filepath)[1]) - self.run_artifacts.append(Artifact(name, - path, - kind='meta', - mandatory=True, - description='Config file used for the run.')) def initialize(self): if not os.path.isdir(self.run_output_directory): @@ -228,30 +269,6 @@ def _check_artifact_path(path, rootpath): return full_path -class FakeTargetManager(object): - # TODO: this is a FAKE - - def __init__(self, name, config): - self.device_name = name - self.device_config = config - - from devlib import LocalLinuxTarget - self.target = LocalLinuxTarget({'unrooted': True}) - - def get_target_info(self): - return TargetInfo(self.target) - - def validate_runtime_parameters(self, params): - pass - - def merge_runtime_parameters(self, params): - pass - - -def init_target_manager(config): - return FakeTargetManager(config.device, config.device_config) - - class Executor(object): """ The ``Executor``'s job is to set up the execution context and pass to a @@ -268,7 +285,7 @@ class Executor(object): # pylint: disable=R0915 def __init__(self): - self.logger = logging.getLogger('Executor') + self.logger = logging.getLogger('executor') self.error_logged = False self.warning_logged = False pluginloader = None @@ -297,7 +314,8 @@ class Executor(object): output.write_config(config) self.logger.info('Connecting to target') - target_manager = init_target_manager(config.run_config) + target_manager = TargetManager(config.run_config.device, + config.run_config.device_config) output.write_target_info(target_manager.get_target_info()) self.logger.info('Initializing execution conetext') @@ -312,6 +330,11 @@ class Executor(object): instrumentation.install(instrument) instrumentation.validate() + self.logger.info('Starting run') + runner = Runner(context) + runner.run() + + def execute_postamble(self): """ @@ -370,6 +393,92 @@ class Runner(object): """ + def __init__(self, context): + self.logger = logging.getLogger('runner') + self.context = context + self.output = self.context.output + self.config = self.context.cm + + def run(self): + self.send(signal.RUN_STARTED) + try: + self.initialize_run() + self.send(signal.RUN_INITIALIZED) + + while self.context.job_queue: + with signal.wrap('JOB_EXECUTION', self): + self.run_next_job(self.context) + except Exception as e: + if (not getattr(e, 'logged', None) and + not isinstance(e, KeyboardInterrupt)): + log.log_error(e, self.logger) + e.logged = True + raise e + finally: + self.finalize_run() + self.send(signal.RUN_COMPLETED) + + def initialize_run(self): + self.logger.info('Initializing run') + self.context.start_run() + log.indent() + for job in self.context.job_queue: + job.initialize(self.context) + log.dedent() + + def finalize_run(self): + self.logger.info('Finalizing run') + self.context.end_run() + + def run_next_job(self, context): + job = context.start_job() + self.logger.info('Running job {}'.format(job.id)) + job.status = JobStatus.RUNNING + log.indent() + self.send(signal.JOB_STARTED) + + with signal.wrap('JOB_TARGET_CONFIG', self): + job.configure_target(context) + + with signal.wrap('JOB_SETUP', self): + job.setup(context) + + try: + with signal.wrap('JOB_EXECUTION', self): + job.run(context) + + try: + with signal.wrap('JOB_OUTPUT_PROCESSED', self): + job.run(context) + except Exception: + job.status = JobStatus.PARTIAL + raise + except KeyboardInterrupt: + job.status = JobStatus.ABORTED + raise + except Exception as e: + job.status = JobStatus.FAILED + if not getattr(e, 'logged', None): + log.log_error(e, self.logger) + e.logged = True + raise e + finally: + # If setup was successfully completed, teardown must + # run even if the job failed + with signal.wrap('JOB_TEARDOWN', self): + job.teardown(context) + + log.dedent() + self.logger.info('Completing job {}'.format(job.id)) + self.send(signal.JOB_COMPLETED) + context.end_job() + + def send(self, s): + signal.send(s, self, self.context) + + def __str__(self): + return 'runner' + class RunnerJob(object): """ diff --git a/wa/framework/instrumentation.py b/wa/framework/instrumentation.py index f4d3e480..69386f13 100644 --- a/wa/framework/instrumentation.py +++ b/wa/framework/instrumentation.py @@ -102,7 +102,7 @@ import logging import inspect from collections import OrderedDict -import wa.framework.signal as signal +from wa.framework import signal from wa.framework.plugin import Plugin from wa.framework.exception import WAError, TargetNotRespondingError, TimeoutError from wa.utils.misc import get_traceback, isiterable diff --git a/wa/framework/output.py b/wa/framework/output.py index 77d5853e..c8d5dd4a 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -5,6 +5,7 @@ import string import sys import uuid from copy import copy +from datetime import timedelta from wlauto.core.configuration.configuration import JobSpec from wlauto.core.configuration.manager import ConfigManager @@ -25,10 +26,13 @@ class RunInfo(object): @staticmethod def from_pod(pod): uid = pod.pop('uuid') + duration = pod.pop('duration') if uid is not None: uid = uuid.UUID(uid) instance = RunInfo(**pod) instance.uuid = uid + instance.duration = duration if duration is None else\ + timedelta(seconds=duration) return instance def __init__(self, run_name=None, project=None, project_stage=None, @@ -44,6 +48,10 @@ class RunInfo(object): def to_pod(self): d = copy(self.__dict__) d['uuid'] = str(self.uuid) + if self.duration is None: + d['duration'] = self.duration + else: + d['duration'] = self.duration.total_seconds() return d diff --git a/wa/framework/plugin.py b/wa/framework/plugin.py index b642ee29..9f06de5e 100644 --- a/wa/framework/plugin.py +++ b/wa/framework/plugin.py @@ -406,7 +406,7 @@ class Plugin(object): return self.__class__.__name__ def __init__(self, **kwargs): - self.logger = logging.getLogger(self._classname) + self.logger = logging.getLogger(self.name) self._modules = [] self.capabilities = getattr(self.__class__, 'capabilities', []) for param in self.parameters: diff --git a/wa/framework/resource.py b/wa/framework/resource.py index e86eb830..6ef1992c 100644 --- a/wa/framework/resource.py +++ b/wa/framework/resource.py @@ -286,7 +286,7 @@ class ResourceResolver(object): """ def __init__(self, config): - self.logger = logging.getLogger(self.__class__.__name__) + self.logger = logging.getLogger('resolver') self.getters = defaultdict(prioritylist) self.config = config diff --git a/wa/framework/signal.py b/wa/framework/signal.py index dd19a5e5..7dbfd73d 100644 --- a/wa/framework/signal.py +++ b/wa/framework/signal.py @@ -27,7 +27,7 @@ from louie import dispatcher from wa.utils.types import prioritylist -logger = logging.getLogger('dispatcher') +logger = logging.getLogger('signal') class Signal(object): @@ -101,6 +101,27 @@ BEFORE_RUN_INIT = Signal('before-run-init', invert_priority=True) SUCCESSFUL_RUN_INIT = Signal('successful-run-init') AFTER_RUN_INIT = Signal('after-run-init') +BEFORE_JOB_TARGET_CONFIG = Signal('before-job-target-config', invert_priority=True) +SUCCESSFUL_JOB_TARGET_CONFIG = Signal('successful-job-target-config') +AFTER_JOB_TARGET_CONFIG = Signal('after-job-target-config') + +BEFORE_JOB_SETUP = Signal('before-job-setup', invert_priority=True) +SUCCESSFUL_JOB_SETUP = Signal('successful-job-setup') +AFTER_JOB_SETUP = Signal('after-job-setup') + +BEFORE_JOB_EXECUTION = Signal('before-job-execution', invert_priority=True) +SUCCESSFUL_JOB_EXECUTION = Signal('successful-job-execution') +AFTER_JOB_EXECUTION = Signal('after-job-execution') + +BEFORE_JOB_OUTPUT_PROCESSED = Signal('before-job-output-processed', + invert_priority=True) +SUCCESSFUL_JOB_OUTPUT_PROCESSED = Signal('successful-job-output-processed') +AFTER_JOB_OUTPUT_PROCESSED = Signal('after-job-output-processed') + +BEFORE_JOB_TEARDOWN = Signal('before-job-teardown', invert_priority=True) +SUCCESSFUL_JOB_TEARDOWN = Signal('successful-job-teardown') +AFTER_JOB_TEARDOWN = Signal('after-job-teardown') + BEFORE_FLASHING = Signal('before-flashing', invert_priority=True) SUCCESSFUL_FLASHING = Signal('successful-flashing') AFTER_FLASHING = Signal('after-flashing') @@ -250,6 +271,7 @@ def send(signal, sender=dispatcher.Anonymous, *args, **kwargs): The rest of the parameters will be passed on as aruments to the handler. """ + logger.debug('Sending {} from {}'.format(signal, sender)) return dispatcher.send(signal, sender, *args, **kwargs) @@ -266,6 +288,7 @@ def safe_send(signal, sender=dispatcher.Anonymous, to just ``[KeyboardInterrupt]``). """ try: + logger.debug('Safe-sending {} from {}'.format(signal, sender)) send(singnal, sender, *args, **kwargs) except Exception as e: if any(isinstance(e, p) for p in propagate): @@ -292,3 +315,17 @@ def wrap(signal_name, sender=dispatcher.Anonymous, safe=False, *args, **kwargs): finally: send_func(after_signal, sender, *args, **kwargs) + +def wrapped(signal_name, sender=dispatcher.Anonymous, safe=False): + """A decorator for wrapping function in signal dispatch.""" + @wrapt.decorator + def signal_wrapped(wrapped, instance, args, kwargs): + func_id = repr(wrapped) + + def signal_wrapper(*args, **kwargs): + with wrap(signal_name, sender, safe): + return wrapped(*args, **kwargs) + + return signal_wrapper(*args, **kwargs) + + return signal_wrapped diff --git a/wa/instrumentation/__init__.py b/wa/instrumentation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wa/instrumentation/misc/__init__.py b/wa/instrumentation/misc/__init__.py new file mode 100644 index 00000000..634f4014 --- /dev/null +++ b/wa/instrumentation/misc/__init__.py @@ -0,0 +1,391 @@ +# Copyright 2013-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# pylint: disable=W0613,no-member,attribute-defined-outside-init +""" + +Some "standard" instruments to collect additional info about workload execution. + +.. note:: The run() method of a Workload may perform some "boilerplate" as well as + the actual execution of the workload (e.g. it may contain UI automation + needed to start the workload). This "boilerplate" execution will also + be measured by these instruments. As such, they are not suitable for collected + precise data about specific operations. +""" +import os +import re +import logging +import time +import tarfile +from itertools import izip, izip_longest +from subprocess import CalledProcessError + +from devlib.exception import TargetError + +from devlib.utils.android import ApkInfo + +from wa import Instrument, Parameter +from wa.framework import signal +from wa.framework.exception import ConfigError +from wa.utils.misc import diff_tokens, write_table, check_output, as_relative +from wa.utils.misc import ensure_file_directory_exists as _f +from wa.utils.misc import ensure_directory_exists as _d +from wa.utils.types import list_of_strings + + +logger = logging.getLogger(__name__) + + +class SysfsExtractor(Instrument): + + name = 'sysfs_extractor' + description = """ + Collects the contest of a set of directories, before and after workload execution + and diffs the result. + + """ + + mount_command = 'mount -t tmpfs -o size={} tmpfs {}' + extract_timeout = 30 + tarname = 'sysfs.tar.gz' + DEVICE_PATH = 0 + BEFORE_PATH = 1 + AFTER_PATH = 2 + DIFF_PATH = 3 + + parameters = [ + Parameter('paths', kind=list_of_strings, mandatory=True, + description="""A list of paths to be pulled from the device. These could be directories + as well as files.""", + global_alias='sysfs_extract_dirs'), + Parameter('use_tmpfs', kind=bool, default=None, + description=""" + Specifies whether tmpfs should be used to cache sysfile trees and then pull them down + as a tarball. This is significantly faster then just copying the directory trees from + the device directly, bur requres root and may not work on all devices. Defaults to + ``True`` if the device is rooted and ``False`` if it is not. + """), + Parameter('tmpfs_mount_point', default=None, + description="""Mount point for tmpfs partition used to store snapshots of paths."""), + Parameter('tmpfs_size', default='32m', + description="""Size of the tempfs partition."""), + ] + + def initialize(self, context): + if not self.device.is_rooted and self.use_tmpfs: # pylint: disable=access-member-before-definition + raise ConfigError('use_tempfs must be False for an unrooted device.') + elif self.use_tmpfs is None: # pylint: disable=access-member-before-definition + self.use_tmpfs = self.device.is_rooted + + if self.use_tmpfs: + self.on_device_before = self.device.path.join(self.tmpfs_mount_point, 'before') + self.on_device_after = self.device.path.join(self.tmpfs_mount_point, 'after') + + if not self.device.file_exists(self.tmpfs_mount_point): + self.device.execute('mkdir -p {}'.format(self.tmpfs_mount_point), as_root=True) + self.device.execute(self.mount_command.format(self.tmpfs_size, self.tmpfs_mount_point), + as_root=True) + + def setup(self, context): + before_dirs = [ + _d(os.path.join(context.output_directory, 'before', self._local_dir(d))) + for d in self.paths + ] + after_dirs = [ + _d(os.path.join(context.output_directory, 'after', self._local_dir(d))) + for d in self.paths + ] + diff_dirs = [ + _d(os.path.join(context.output_directory, 'diff', self._local_dir(d))) + for d in self.paths + ] + self.device_and_host_paths = zip(self.paths, before_dirs, after_dirs, diff_dirs) + + if self.use_tmpfs: + for d in self.paths: + before_dir = self.device.path.join(self.on_device_before, + self.device.path.dirname(as_relative(d))) + after_dir = self.device.path.join(self.on_device_after, + self.device.path.dirname(as_relative(d))) + if self.device.file_exists(before_dir): + self.device.execute('rm -rf {}'.format(before_dir), as_root=True) + self.device.execute('mkdir -p {}'.format(before_dir), as_root=True) + if self.device.file_exists(after_dir): + self.device.execute('rm -rf {}'.format(after_dir), as_root=True) + self.device.execute('mkdir -p {}'.format(after_dir), as_root=True) + + def slow_start(self, context): + if self.use_tmpfs: + for d in self.paths: + dest_dir = self.device.path.join(self.on_device_before, as_relative(d)) + if '*' in dest_dir: + dest_dir = self.device.path.dirname(dest_dir) + self.device.execute('{} cp -Hr {} {}'.format(self.device.busybox, d, dest_dir), + as_root=True, check_exit_code=False) + else: # not rooted + for dev_dir, before_dir, _, _ in self.device_and_host_paths: + self.device.pull(dev_dir, before_dir) + + def slow_stop(self, context): + if self.use_tmpfs: + for d in self.paths: + dest_dir = self.device.path.join(self.on_device_after, as_relative(d)) + if '*' in dest_dir: + dest_dir = self.device.path.dirname(dest_dir) + self.device.execute('{} cp -Hr {} {}'.format(self.device.busybox, d, dest_dir), + as_root=True, check_exit_code=False) + else: # not using tmpfs + for dev_dir, _, after_dir, _ in self.device_and_host_paths: + self.device.pull(dev_dir, after_dir) + + def update_result(self, context): + if self.use_tmpfs: + on_device_tarball = self.device.path.join(self.device.working_directory, self.tarname) + on_host_tarball = self.device.path.join(context.output_directory, self.tarname) + self.device.execute('{} tar czf {} -C {} .'.format(self.device.busybox, + on_device_tarball, + self.tmpfs_mount_point), + as_root=True) + self.device.execute('chmod 0777 {}'.format(on_device_tarball), as_root=True) + self.device.pull(on_device_tarball, on_host_tarball) + with tarfile.open(on_host_tarball, 'r:gz') as tf: + tf.extractall(context.output_directory) + self.device.remove(on_device_tarball) + os.remove(on_host_tarball) + + for paths in self.device_and_host_paths: + after_dir = paths[self.AFTER_PATH] + dev_dir = paths[self.DEVICE_PATH].strip('*') # remove potential trailing '*' + if (not os.listdir(after_dir) and + self.device.file_exists(dev_dir) and + self.device.list_directory(dev_dir)): + self.logger.error('sysfs files were not pulled from the device.') + self.device_and_host_paths.remove(paths) # Path is removed to skip diffing it + for _, before_dir, after_dir, diff_dir in self.device_and_host_paths: + _diff_sysfs_dirs(before_dir, after_dir, diff_dir) + + def teardown(self, context): + self._one_time_setup_done = [] + + def finalize(self, context): + if self.use_tmpfs: + try: + self.device.execute('umount {}'.format(self.tmpfs_mount_point), as_root=True) + except (TargetError, CalledProcessError): + # assume a directory but not mount point + pass + self.device.execute('rm -rf {}'.format(self.tmpfs_mount_point), + as_root=True, check_exit_code=False) + + def validate(self): + if not self.tmpfs_mount_point: # pylint: disable=access-member-before-definition + self.tmpfs_mount_point = self.device.path.join(self.device.working_directory, 'temp-fs') + + def _local_dir(self, directory): + return os.path.dirname(as_relative(directory).replace(self.device.path.sep, os.sep)) + + +class ExecutionTimeInstrument(Instrument): + + name = 'execution_time' + description = """ + Measure how long it took to execute the run() methods of a Workload. + + """ + + priority = 15 + + def __init__(self, target, **kwargs): + super(ExecutionTimeInstrument, self).__init__(target, **kwargs) + self.start_time = None + self.end_time = None + + def on_run_start(self, context): + signal.connect(self.get_start_time, signal.BEFORE_WORKLOAD_EXECUTION, priority=self.priority) + signal.connect(self.get_stop_time, signal.AFTER_WORKLOAD_EXECUTION, priority=self.priority) + + def get_start_time(self, context): + self.start_time = time.time() + + def get_stop_time(self, context): + self.end_time = time.time() + + def update_result(self, context): + execution_time = self.end_time - self.start_time + context.result.add_metric('execution_time', execution_time, 'seconds') + + +class ApkVersion(Instrument): + + name = 'apk_version' + description = """ + Extracts APK versions for workloads that have them. + + """ + + def __init__(self, device, **kwargs): + super(ApkVersion, self).__init__(device, **kwargs) + self.apk_info = None + + def setup(self, context): + if hasattr(context.workload, 'apk_file'): + self.apk_info = ApkInfo(context.workload.apk_file) + else: + self.apk_info = None + + def update_result(self, context): + if self.apk_info: + context.result.add_metric(self.name, self.apk_info.version_name) + + +class InterruptStatsInstrument(Instrument): + + name = 'interrupts' + description = """ + Pulls the ``/proc/interrupts`` file before and after workload execution and diffs them + to show what interrupts occurred during that time. + + """ + + def __init__(self, device, **kwargs): + super(InterruptStatsInstrument, self).__init__(device, **kwargs) + self.before_file = None + self.after_file = None + self.diff_file = None + + def setup(self, context): + self.before_file = os.path.join(context.output_directory, 'before', 'proc', 'interrupts') + self.after_file = os.path.join(context.output_directory, 'after', 'proc', 'interrupts') + self.diff_file = os.path.join(context.output_directory, 'diff', 'proc', 'interrupts') + + def start(self, context): + with open(_f(self.before_file), 'w') as wfh: + wfh.write(self.device.execute('cat /proc/interrupts')) + + def stop(self, context): + with open(_f(self.after_file), 'w') as wfh: + wfh.write(self.device.execute('cat /proc/interrupts')) + + def update_result(self, context): + # If workload execution failed, the after_file may not have been created. + if os.path.isfile(self.after_file): + _diff_interrupt_files(self.before_file, self.after_file, _f(self.diff_file)) + + +class DynamicFrequencyInstrument(SysfsExtractor): + + name = 'cpufreq' + description = """ + Collects dynamic frequency (DVFS) settings before and after workload execution. + + """ + + tarname = 'cpufreq.tar.gz' + + parameters = [ + Parameter('paths', mandatory=False, override=True), + ] + + def setup(self, context): + self.paths = ['/sys/devices/system/cpu'] + if self.use_tmpfs: + self.paths.append('/sys/class/devfreq/*') # the '*' would cause problems for adb pull. + super(DynamicFrequencyInstrument, self).setup(context) + + def validate(self): + # temp-fs would have been set in super's validate, if not explicitly specified. + if not self.tmpfs_mount_point.endswith('-cpufreq'): # pylint: disable=access-member-before-definition + self.tmpfs_mount_point += '-cpufreq' + + +def _diff_interrupt_files(before, after, result): # pylint: disable=R0914 + output_lines = [] + with open(before) as bfh: + with open(after) as ofh: + for bline, aline in izip(bfh, ofh): + bchunks = bline.strip().split() + while True: + achunks = aline.strip().split() + if achunks[0] == bchunks[0]: + diffchunks = [''] + diffchunks.append(achunks[0]) + diffchunks.extend([diff_tokens(b, a) for b, a + in zip(bchunks[1:], achunks[1:])]) + output_lines.append(diffchunks) + break + else: # new category appeared in the after file + diffchunks = ['>'] + achunks + output_lines.append(diffchunks) + try: + aline = ofh.next() + except StopIteration: + break + + # Offset heading columns by one to allow for row labels on subsequent + # lines. + output_lines[0].insert(0, '') + + # Any "columns" that do not have headings in the first row are not actually + # columns -- they are a single column where space-spearated words got + # split. Merge them back together to prevent them from being + # column-aligned by write_table. + table_rows = [output_lines[0]] + num_cols = len(output_lines[0]) + for row in output_lines[1:]: + table_row = row[:num_cols] + table_row.append(' '.join(row[num_cols:])) + table_rows.append(table_row) + + with open(result, 'w') as wfh: + write_table(table_rows, wfh) + + +def _diff_sysfs_dirs(before, after, result): # pylint: disable=R0914 + before_files = [] + os.path.walk(before, + lambda arg, dirname, names: arg.extend([os.path.join(dirname, f) for f in names]), + before_files + ) + before_files = filter(os.path.isfile, before_files) + files = [os.path.relpath(f, before) for f in before_files] + after_files = [os.path.join(after, f) for f in files] + diff_files = [os.path.join(result, f) for f in files] + + for bfile, afile, dfile in zip(before_files, after_files, diff_files): + if not os.path.isfile(afile): + logger.debug('sysfs_diff: {} does not exist or is not a file'.format(afile)) + continue + + with open(bfile) as bfh, open(afile) as afh: # pylint: disable=C0321 + with open(_f(dfile), 'w') as dfh: + for i, (bline, aline) in enumerate(izip_longest(bfh, afh), 1): + if aline is None: + logger.debug('Lines missing from {}'.format(afile)) + break + bchunks = re.split(r'(\W+)', bline) + achunks = re.split(r'(\W+)', aline) + if len(bchunks) != len(achunks): + logger.debug('Token length mismatch in {} on line {}'.format(bfile, i)) + dfh.write('xxx ' + bline) + continue + if ((len([c for c in bchunks if c.strip()]) == len([c for c in achunks if c.strip()]) == 2) and + (bchunks[0] == achunks[0])): + # if there are only two columns and the first column is the + # same, assume it's a "header" column and do not diff it. + dchunks = [bchunks[0]] + [diff_tokens(b, a) for b, a in zip(bchunks[1:], achunks[1:])] + else: + dchunks = [diff_tokens(b, a) for b, a in zip(bchunks, achunks)] + dfh.write(''.join(dchunks)) diff --git a/wa/utils/types.py b/wa/utils/types.py index c23d2886..87071205 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -29,6 +29,7 @@ import os import re import math import shlex +import string from bisect import insort from collections import defaultdict, MutableMapping from copy import copy @@ -475,3 +476,69 @@ class obj_dict(MutableMapping): return self.__dict__['dict'][name] else: raise AttributeError("No such attribute: " + name) + + +class level(object): + """ + A level has a name and behaves like a string when printed, + however it also has a numeric value which is used in comparisons. + + """ + + def __init__(self, name, value): + self.name = name + self.value = value + + def __str__(self): + return self.name + + def __repr__(self): + return '{}({})'.format(self.name, self.value) + + def __cmp__(self, other): + if isinstance(other, level): + return cmp(self.value, other.value) + else: + return cmp(self.value, other) + + def __eq__(self, other): + if isinstance(other, level): + return self.value == other.value + else: + return self.value == other + + def __ne__(self, other): + if isinstance(other, level): + return self.value != other.value + else: + return self.value != other + + +def enum(args, start=0): + """ + Creates a class with attributes named by the first argument. + Each attribute is a ``level`` so they behave is integers in comparisons. + The value of the first attribute is specified by the second argument + (``0`` if not specified). + + :: + MyEnum = enum(['A', 'B', 'C']) + + is equivalent of:: + + class MyEnum(object): + A = 0 + B = 1 + C = 2 + + """ + + class Enum(object): + pass + + for i, v in enumerate(args, start): + name = string.upper(identifier(v)) + setattr(Enum, name, level(v, i)) + + return Enum + From 547ae1c10eeb3be3d1bf0afb7d1e83168dc98c72 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Thu, 9 Mar 2017 16:26:50 +0000 Subject: [PATCH 3/5] Job output dir handling. --- wa/framework/configuration/core.py | 6 +++++- wa/framework/configuration/execution.py | 4 ++++ wa/framework/execution.py | 6 ++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 82237ce6..d274b6c0 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -927,7 +927,11 @@ class JobSpec(Configuration): self.runtime_parameters = target_manager.merge_runtime_parameters(runtime_parameters) def finalize(self): - self.id = "-".join([source.config['id'] for source in self._sources[1:]]) # ignore first id, "global" + self.id = "-".join([source.config['id'] + for source in self._sources[1:]]) # ignore first id, "global" + if self.label is None: + self.label = self.workload_name + # This is used to construct the list of Jobs WA will run diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 032d9e52..dd8df4e0 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -38,6 +38,10 @@ class Job(object): def id(self): return self.spec.id + @property + def output_name(self): + return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) + def __init__(self, spec, iteration, context): self.logger = logging.getLogger('job') self.spec = spec diff --git a/wa/framework/execution.py b/wa/framework/execution.py index 3b562fad..cedc5714 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -104,6 +104,11 @@ class ExecutionContext(object): return True return self.current_job.spec.id != self.next_job.spec.id + @property + def output_directory(self): + if self.current_job: + return os.path.join(self.output.basepath, self.current_job.output_name) + return self.output.basepath def __init__(self, cm, tm, output): self.logger = logging.getLogger('context') @@ -133,6 +138,7 @@ class ExecutionContext(object): if not self.job_queue: raise RuntimeError('No jobs to run') self.current_job = self.job_queue.pop(0) + os.makedirs(self.output_directory) return self.current_job def end_job(self): From ccdc3492e73b9b8b3a1258c99fe766d289db36c3 Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Thu, 9 Mar 2017 17:39:44 +0000 Subject: [PATCH 4/5] Handle retry failed --- wa/framework/configuration/core.py | 25 ++++------- wa/framework/configuration/execution.py | 56 +----------------------- wa/framework/execution.py | 49 ++++++++++++++++++--- wa/framework/job.py | 57 +++++++++++++++++++++++++ wa/framework/output.py | 24 ++++++++--- wa/framework/plugin.py | 4 +- wa/utils/log.py | 4 +- wa/utils/types.py | 28 ++++++++++-- 8 files changed, 159 insertions(+), 88 deletions(-) create mode 100644 wa/framework/job.py diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index d274b6c0..8bd8b3e2 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -20,9 +20,8 @@ from collections import OrderedDict, defaultdict from wa.framework.exception import ConfigError, NotFoundError from wa.framework.configuration.tree import SectionNode from wa.utils.misc import (get_article, merge_config_values) -from wa.utils.types import (identifier, integer, boolean, - list_of_strings, toggle_set, - obj_dict) +from wa.utils.types import (identifier, integer, boolean, list_of_strings, + list_of, toggle_set, obj_dict, enum) from wa.utils.serializer import is_pod # Mapping for kind conversion; see docs for convert_types below @@ -32,17 +31,9 @@ KIND_MAP = { dict: OrderedDict, } -ITERATION_STATUS = [ - 'NOT_STARTED', - 'RUNNING', +JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', + 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) - 'OK', - 'NONCRITICAL', - 'PARTIAL', - 'FAILED', - 'ABORTED', - 'SKIPPED', -] ########################## ### CONFIG POINT TYPES ### @@ -716,9 +707,9 @@ class RunConfiguration(Configuration): This setting defines what specific Device subclass will be used to interact the connected device. Obviously, this must match your setup. '''), - ConfigurationPoint('retry_on_status', kind=status_list, + ConfigurationPoint('retry_on_status', kind=list_of(JobStatus), default=['FAILED', 'PARTIAL'], - allowed_values=ITERATION_STATUS, + allowed_values=JobStatus.values, description=''' This is list of statuses on which a job will be cosidered to have failed and will be automatically retried up to ``max_retries`` times. This defaults to @@ -736,10 +727,10 @@ class RunConfiguration(Configuration): ``"ABORTED"`` The user interupted the workload '''), - ConfigurationPoint('max_retries', kind=int, default=3, + ConfigurationPoint('max_retries', kind=int, default=2, description=''' The maximum number of times failed jobs will be retried before giving up. If - not set, this will default to ``3``. + not set. .. note:: this number does not include the original attempt '''), diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index dd8df4e0..4cfddd53 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -4,10 +4,11 @@ from itertools import izip_longest, groupby, chain from wa.framework import pluginloader from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration, - JobGenerator, settings) + JobGenerator, JobStatus, settings) from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache from wa.framework.exception import NotFoundError +from wa.framework.job import Job from wa.utils.types import enum @@ -29,59 +30,6 @@ class CombinedConfig(object): 'run_config': self.run_config.to_pod()} -JobStatus = enum(['NEW', 'LOADED', 'PENDING', 'RUNNING', - 'OK', 'FAILED', 'PARTIAL', 'ABORTED', 'SKIPPED']) - -class Job(object): - - @property - def id(self): - return self.spec.id - - @property - def output_name(self): - return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) - - def __init__(self, spec, iteration, context): - self.logger = logging.getLogger('job') - self.spec = spec - self.iteration = iteration - self.context = context - self.status = JobStatus.NEW - self.workload = None - self.output = None - - def load(self, target, loader=pluginloader): - self.logger.debug('Loading job {}'.format(self.id)) - self.workload = loader.get_workload(self.spec.workload_name, - target, - **self.spec.workload_parameters) - self.workload.init_resources(self.context) - self.workload.validate() - self.status = JobStatus.LOADED - - def initialize(self, context): - self.logger.info('Initializing job {}'.format(self.id)) - self.status = JobStatus.PENDING - - def configure_target(self, context): - self.logger.info('Configuring target for job {}'.format(self.id)) - - def setup(self, context): - self.logger.info('Setting up job {}'.format(self.id)) - - def run(self, context): - self.logger.info('Running job {}'.format(self.id)) - - def process_output(self, context): - self.looger.info('Processing output for job {}'.format(self.id)) - - def teardown(self, context): - self.logger.info('Tearing down job {}'.format(self.id)) - - def finalize(self, context): - self.logger.info('Finalizing job {}'.format(self.id)) - class ConfigManager(object): """ Represents run-time state of WA. Mostly used as a container for loaded diff --git a/wa/framework/execution.py b/wa/framework/execution.py index cedc5714..45b78a53 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -147,6 +147,11 @@ class ExecutionContext(object): self.completed_jobs.append(self.current_job) self.current_job = None + def move_failed(self, job): + attempt = job.retries + 1 + failed_name = '{}-attempt{:02}'.format(job.output_name, attempt) + self.output.move_failed(job.output_name, failed_name) + class OldExecutionContext(object): """ @@ -439,8 +444,28 @@ class Runner(object): def run_next_job(self, context): job = context.start_job() self.logger.info('Running job {}'.format(job.id)) + + try: + log.indent() + self.do_run_job(job, context) + except KeyboardInterrupt: + job.status = JobStatus.ABORTED + raise + except Exception as e: + job.status = JobStatus.FAILED + if not getattr(e, 'logged', None): + log.log_error(e, self.logger) + e.logged = True + finally: + self.logger.info('Completing job {}'.format(job.id)) + self.send(signal.JOB_COMPLETED) + context.end_job() + + log.dedent() + self.check_job(job) + + def do_run_job(self, job, context): job.status = JobStatus.RUNNING - log.indent() self.send(signal.JOB_STARTED) with signal.wrap('JOB_TARGET_CONFIG', self): @@ -455,7 +480,7 @@ class Runner(object): try: with signal.wrap('JOB_OUTPUT_PROCESSED', self): - job.run(context) + job.process_output(context) except Exception: job.status = JobStatus.PARTIAL raise @@ -474,10 +499,22 @@ class Runner(object): with signal.wrap('JOB_TEARDOWN', self): job.teardown(context) - log.dedent() - self.logger.info('Completing job {}'.format(job.id)) - self.send(signal.JOB_COMPLETED) - context.end_job() + def check_job(self, job): + rc = self.context.cm.run_config + if job.status in rc.retry_on_status: + if job.retries < rc.max_retries: + msg = 'Job {} iteration {} complted with status {}. retrying...' + self.logger.error(msg.format(job.id, job.status, job.iteration)) + self.context.move_failed(job) + job.retries += 1 + job.status = JobStatus.PENDING + self.context.job_queue.insert(0, job) + else: + msg = 'Job {} iteration {} completed with status {}. '\ + 'Max retries exceeded.' + self.logger.error(msg.format(job.id, job.status, job.iteration)) + else: # status not in retry_on_status + self.logger.info('Job completed with status {}'.format(job.status)) def send(self, s): signal.send(s, self, self.context) diff --git a/wa/framework/job.py b/wa/framework/job.py new file mode 100644 index 00000000..598d6072 --- /dev/null +++ b/wa/framework/job.py @@ -0,0 +1,57 @@ +import logging + +from wa.framework import pluginloader +from wa.framework.configuration.core import JobStatus + + +class Job(object): + + @property + def id(self): + return self.spec.id + + @property + def output_name(self): + return '{}-{}-{}'.format(self.id, self.spec.label, self.iteration) + + def __init__(self, spec, iteration, context): + self.logger = logging.getLogger('job') + self.spec = spec + self.iteration = iteration + self.context = context + self.status = JobStatus.NEW + self.workload = None + self.output = None + self.retries = 0 + + def load(self, target, loader=pluginloader): + self.logger.debug('Loading job {}'.format(self.id)) + self.workload = loader.get_workload(self.spec.workload_name, + target, + **self.spec.workload_parameters) + self.workload.init_resources(self.context) + self.workload.validate() + self.status = JobStatus.LOADED + + def initialize(self, context): + self.logger.info('Initializing job {}'.format(self.id)) + self.status = JobStatus.PENDING + + def configure_target(self, context): + self.logger.info('Configuring target for job {}'.format(self.id)) + + def setup(self, context): + self.logger.info('Setting up job {}'.format(self.id)) + + def run(self, context): + self.logger.info('Running job {}'.format(self.id)) + + def process_output(self, context): + self.logger.info('Processing output for job {}'.format(self.id)) + + def teardown(self, context): + self.logger.info('Tearing down job {}'.format(self.id)) + + def finalize(self, context): + self.logger.info('Finalizing job {}'.format(self.id)) + diff --git a/wa/framework/output.py b/wa/framework/output.py index c8d5dd4a..16853539 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -7,11 +7,11 @@ import uuid from copy import copy from datetime import timedelta -from wlauto.core.configuration.configuration import JobSpec -from wlauto.core.configuration.manager import ConfigManager -from wlauto.core.device_manager import TargetInfo -from wlauto.utils.misc import touch -from wlauto.utils.serializer import write_pod, read_pod +from wa.framework.configuration.core import JobSpec +from wa.framework.configuration.manager import ConfigManager +from wa.framework.target.info import TargetInfo +from wa.utils.misc import touch, ensure_directory_exists +from wa.utils.serializer import write_pod, read_pod logger = logging.getLogger('output') @@ -105,6 +105,11 @@ class RunOutput(object): def raw_config_dir(self): return os.path.join(self.metadir, 'raw_config') + @property + def failed_dir(self): + path = os.path.join(self.basepath, '__failed') + return ensure_directory_exists(path) + def __init__(self, path): self.basepath = path self.info = None @@ -152,6 +157,15 @@ class RunOutput(object): pod = read_pod(self.jobsfile) return [JobSpec.from_pod(jp) for jp in pod['jobs']] + def move_failed(self, name, failed_name): + path = os.path.join(self.basepath, name) + failed_path = os.path.join(self.failed_dir, failed_name) + if not os.path.exists(path): + raise ValueError('Path {} does not exist'.format(path)) + if os.path.exists(failed_path): + raise ValueError('Path {} already exists'.format(failed_path)) + shutil.move(path, failed_path) + def init_wa_output(path, wa_state, force=False): if os.path.exists(path): diff --git a/wa/framework/plugin.py b/wa/framework/plugin.py index 9f06de5e..f6ef4e2e 100644 --- a/wa/framework/plugin.py +++ b/wa/framework/plugin.py @@ -26,8 +26,8 @@ from itertools import chain from copy import copy from wa.framework.configuration.core import settings, ConfigurationPoint as Parameter -from wa.framework.exception import (NotFoundError, PluginLoaderError, ValidationError, - ConfigError, HostError) +from wa.framework.exception import (NotFoundError, PluginLoaderError, + ValidationError, ConfigError, HostError) from wa.utils import log from wa.utils.misc import (ensure_directory_exists as _d, walk_modules, load_class, merge_dicts_simple, get_article) diff --git a/wa/utils/log.py b/wa/utils/log.py index 567943e5..906116e6 100644 --- a/wa/utils/log.py +++ b/wa/utils/log.py @@ -22,6 +22,8 @@ import subprocess import colorama +from devlib import DevlibError + from wa.framework import signal from wa.framework.exception import WAError from wa.utils.misc import get_traceback @@ -142,7 +144,7 @@ def log_error(e, logger, critical=False): if isinstance(e, KeyboardInterrupt): log_func('Got CTRL-C. Aborting.') - elif isinstance(e, WAError): + elif isinstance(e, WAError) or isinstance(e, DevlibError): log_func(e) elif isinstance(e, subprocess.CalledProcessError): tb = get_traceback() diff --git a/wa/utils/types.py b/wa/utils/types.py index 87071205..fed3004f 100644 --- a/wa/utils/types.py +++ b/wa/utils/types.py @@ -504,12 +504,16 @@ class level(object): def __eq__(self, other): if isinstance(other, level): return self.value == other.value + elif isinstance(other, basestring): + return self.name == other else: return self.value == other def __ne__(self, other): if isinstance(other, level): return self.value != other.value + elif isinstance(other, basestring): + return self.name != other else: return self.value != other @@ -524,21 +528,39 @@ def enum(args, start=0): :: MyEnum = enum(['A', 'B', 'C']) - is equivalent of:: + is roughly equivalent of:: class MyEnum(object): A = 0 B = 1 C = 2 + however it also implement some specialized behaviors for comparisons and + instantiation. + """ class Enum(object): - pass + def __new__(cls, name): + for attr_name in dir(cls): + if attr_name.startswith('__'): + continue + + attr = getattr(cls, attr_name) + if name == attr: + return attr + + raise ValueError('Invalid enum value: {}'.format(repr(name))) + + levels = [] for i, v in enumerate(args, start): name = string.upper(identifier(v)) - setattr(Enum, name, level(v, i)) + lv = level(v, i) + setattr(Enum, name, lv) + levels.append(lv) + + setattr(Enum, 'values', levels) return Enum From ec109f5d0ba97588b01bb5903e860bdd13b1c63e Mon Sep 17 00:00:00 2001 From: Sergei Trofimov Date: Wed, 15 Mar 2017 14:07:14 +0000 Subject: [PATCH 5/5] fixing "params" handling in section entries "params" is interpreted differently in section vs workload entires in the agenda; previously, this was handled in the generic entry construciton function by examining the ID prefix of the entry to distinguish between the two. This is unreliable as the user may specify their own IDs that won't have the expected prefixes. To handle this, the "params" alias resolution for sections is now handled in section specific part of entry processing (workloads are handled autmatically because that is the default for the corresponding JobConfig config point). --- wa/framework/configuration/parsers.py | 46 +++++++++++---------------- wa/utils/serializer.py | 4 +++ 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/wa/framework/configuration/parsers.py b/wa/framework/configuration/parsers.py index df6d019e..7f1a747c 100644 --- a/wa/framework/configuration/parsers.py +++ b/wa/framework/configuration/parsers.py @@ -15,10 +15,10 @@ import os -from wlauto.exceptions import ConfigError -from wlauto.utils.serializer import read_pod, SerializerSyntaxError -from wlauto.utils.types import toggle_set, counter -from wlauto.core.configuration.configuration import JobSpec +from wa.framework.configuration.core import JobSpec +from wa.framework.exception import ConfigError +from wa.utils.serializer import json, read_pod, SerializerSyntaxError +from wa.utils.types import toggle_set, counter ############### @@ -44,19 +44,19 @@ class ConfigParser(object): # Get WA core configuration for cfg_point in state.settings.configuration.itervalues(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: state.settings.set(cfg_point.name, value) # Get run specific configuration for cfg_point in state.run_config.configuration.itervalues(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: state.run_config.set(cfg_point.name, value) # Get global job spec configuration for cfg_point in JobSpec.configuration.itervalues(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: state.jobs_config.set_global_value(cfg_point.name, value) @@ -158,6 +158,13 @@ class AgendaParser(object): state.jobs_config) workloads.append(workload) + if 'params' in section: + if 'runtime_params' in section: + msg = 'both "params" and "runtime_params" specified in a '\ + 'section: "{}"' + raise ConfigError(msg.format(json.dumps(section, indent=None))) + section['runtime_params'] = section.pop('params') + section = _construct_valid_entry(section, seen_sect_ids, "s", state.jobs_config) state.jobs_config.add_section(section, workloads) @@ -167,7 +174,7 @@ class AgendaParser(object): ### Helper functions ### ######################## -def get_aliased_param(cfg_point, d, default=None, pop=True): +def pop_aliased_param(cfg_point, d, default=None): """ Given a ConfigurationPoint and a dict, this function will search the dict for the ConfigurationPoint's name/aliases. If more than one is found it will raise @@ -180,10 +187,7 @@ def get_aliased_param(cfg_point, d, default=None, pop=True): if len(alias_map) > 1: raise ConfigError(DUPLICATE_ENTRY_ERROR.format(aliases)) elif alias_map: - if pop: - return d.pop(alias_map[0]) - else: - return d[alias_map[0]] + return d.pop(alias_map[0]) else: return default @@ -203,7 +207,7 @@ def _load_file(filepath, error_name): def merge_result_processors_instruments(raw): instr_config = JobSpec.configuration['instrumentation'] - instruments = toggle_set(get_aliased_param(instr_config, raw, default=[])) + instruments = toggle_set(pop_aliased_param(instr_config, raw, default=[])) result_processors = toggle_set(raw.pop('result_processors', [])) if instruments and result_processors: conflicts = instruments.conflicts_with(result_processors) @@ -246,26 +250,12 @@ def _construct_valid_entry(raw, seen_ids, prefix, jobs_config): # Validate all workload_entry for name, cfg_point in JobSpec.configuration.iteritems(): - value = get_aliased_param(cfg_point, raw) + value = pop_aliased_param(cfg_point, raw) if value is not None: value = cfg_point.kind(value) cfg_point.validate_value(name, value) workload_entry[name] = value - wk_id = workload_entry['id'] - param_names = ['workload_params', 'workload_parameters'] - if prefix == 'wk': - param_names += ['params', 'parameters'] - workload_entry["workload_parameters"] = _pop_aliased(raw, param_names, wk_id) - - param_names = ['runtime_parameters', 'runtime_params'] - if prefix == 's': - param_names += ['params', 'parameters'] - workload_entry["runtime_parameters"] = _pop_aliased(raw, param_names, wk_id) - - param_names = ['boot_parameters', 'boot_params'] - workload_entry["boot_parameters"] = _pop_aliased(raw, param_names, wk_id) - if "instrumentation" in workload_entry: jobs_config.update_enabled_instruments(workload_entry["instrumentation"]) diff --git a/wa/utils/serializer.py b/wa/utils/serializer.py index b2535961..56d8f988 100644 --- a/wa/utils/serializer.py +++ b/wa/utils/serializer.py @@ -126,6 +126,10 @@ class json(object): def dump(o, wfh, indent=4, *args, **kwargs): return _json.dump(o, wfh, cls=WAJSONEncoder, indent=indent, *args, **kwargs) + @staticmethod + def dumps(o, indent=4, *args, **kwargs): + return _json.dumps(o, cls=WAJSONEncoder, indent=indent, *args, **kwargs) + @staticmethod def load(fh, *args, **kwargs): try: