1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-02-21 12:28:44 +00:00

Properly initialize output directory and run state

This commit is contained in:
Sergei Trofimov 2017-02-13 17:04:50 +00:00
parent 3d8503b056
commit 9cfa4e7f51
10 changed files with 279 additions and 133 deletions

View File

@ -24,6 +24,7 @@ from wlauto.core import pluginloader
from wlauto.core.configuration import RunConfiguration from wlauto.core.configuration import RunConfiguration
from wlauto.core.configuration.parsers import AgendaParser, ConfigParser from wlauto.core.configuration.parsers import AgendaParser, ConfigParser
from wlauto.core.execution import Executor from wlauto.core.execution import Executor
from wlauto.core.output import init_wa_output
from wlauto.exceptions import NotFoundError, ConfigError from wlauto.exceptions import NotFoundError, ConfigError
from wlauto.utils.log import add_log_file from wlauto.utils.log import add_log_file
from wlauto.utils.types import toggle_set from wlauto.utils.types import toggle_set
@ -74,8 +75,8 @@ class RunCommand(Command):
""") """)
def execute(self, state, args): def execute(self, state, args):
output_directory = self.set_up_output_directory(args) output = self.set_up_output_directory(state, args)
add_log_file(os.path.join(output_directory, "run.log")) add_log_file(output.logfile)
disabled_instruments = toggle_set(["~{}".format(i) disabled_instruments = toggle_set(["~{}".format(i)
for i in args.instruments_to_disable]) for i in args.instruments_to_disable])
@ -87,7 +88,7 @@ class RunCommand(Command):
parser.load_from_path(state, args.agenda) parser.load_from_path(state, args.agenda)
else: else:
try: try:
pluginloader.get_workload(args.agenda) pluginloader.get_plugin_class(args.agenda, kind='workload')
agenda = {'workloads': [{'name': args.agenda}]} agenda = {'workloads': [{'name': args.agenda}]}
parser.load(state, agenda, 'CMDLINE_ARGS') parser.load(state, agenda, 'CMDLINE_ARGS')
except NotFoundError: except NotFoundError:
@ -97,24 +98,21 @@ class RunCommand(Command):
raise ConfigError(msg.format(args.agenda)) raise ConfigError(msg.format(args.agenda))
executor = Executor() executor = Executor()
# TODO: fix executor executor.execute(state, output)
# executor.execute(state, selectors={'ids': args.only_run_ids})
def set_up_output_directory(self, args): def set_up_output_directory(self, state, args):
if args.output_directory: if args.output_directory:
output_directory = args.output_directory output_directory = args.output_directory
else: else:
output_directory = settings.default_output_directory output_directory = settings.default_output_directory
self.logger.debug('Using output directory: {}'.format(output_directory)) self.logger.debug('Using output directory: {}'.format(output_directory))
if os.path.exists(output_directory): try:
if args.force: return init_wa_output(output_directory, state, args.force)
self.logger.info('Removing existing output directory.') except RuntimeError as e:
shutil.rmtree(os.path.abspath(output_directory)) if 'path exists' in str(e):
else: msg = 'Output directory "{}" exists.\nPlease specify another '\
self.logger.error('Output directory {} exists.'.format(output_directory)) 'location, or use -f option to overwrite.'
self.logger.error('Please specify another location, or use -f option to overwrite.\n') self.logger.critical(msg.format(output_directory))
sys.exit(1) sys.exit(1)
else:
self.logger.info('Creating output directory.') raise e
os.makedirs(output_directory)
return output_directory

View File

@ -614,7 +614,7 @@ class WAConfiguration(Configuration):
@property @property
def user_config_file(self): def user_config_file(self):
return os.path.joion(self.user_directory, 'config.yaml') return os.path.join(self.user_directory, 'config.yaml')
def __init__(self, environ): def __init__(self, environ):
super(WAConfiguration, self).__init__() super(WAConfiguration, self).__init__()
@ -738,6 +738,8 @@ class RunConfiguration(Configuration):
def __init__(self): def __init__(self):
super(RunConfiguration, self).__init__() super(RunConfiguration, self).__init__()
for confpoint in self.meta_data:
confpoint.set_value(self, check_mandatory=False)
self.device_config = None self.device_config = None
def merge_device_config(self, plugin_cache): def merge_device_config(self, plugin_cache):
@ -836,7 +838,7 @@ class JobSpec(Configuration):
for k, v in values.iteritems(): for k, v in values.iteritems():
if k == "id": if k == "id":
continue continue
elif k in ["workload_parameters", "runtime_parameters", "boot_parameters"]: elif k.endswith('_parameters'):
if v: if v:
self.to_merge[k][source] = copy(v) self.to_merge[k][source] = copy(v)
else: else:
@ -846,27 +848,27 @@ class JobSpec(Configuration):
msg = 'Error in {}:\n\t{}' msg = 'Error in {}:\n\t{}'
raise ConfigError(msg.format(source.name, e.message)) raise ConfigError(msg.format(source.name, e.message))
# pylint: disable=no-member
# Only call after the rest of the JobSpec is merged
def merge_workload_parameters(self, plugin_cache): def merge_workload_parameters(self, plugin_cache):
# merge global generic and specific config # merge global generic and specific config
workload_params = plugin_cache.get_plugin_config(self.workload_name, workload_params = plugin_cache.get_plugin_config(self.workload_name,
generic_name="workload_parameters") generic_name="workload_parameters")
# Merge entry "workload_parameters"
# TODO: Wrap in - "error in [agenda path]"
cfg_points = plugin_cache.get_plugin_parameters(self.workload_name) cfg_points = plugin_cache.get_plugin_parameters(self.workload_name)
for source in self._sources: for source in self._sources:
if source in self.to_merge["workload_params"]: config = self.to_merge["workload_parameters"].get(source)
config = self.to_merge["workload_params"][source] if config is None:
for name, cfg_point in cfg_points.iteritems(): continue
if name in config:
value = config.pop(name) for name, cfg_point in cfg_points.iteritems():
cfg_point.set_value(workload_params, value, check_mandatory=False) if name in config:
if config: value = config.pop(name)
msg = 'conflicting entry(ies) for "{}" in {}: "{}"' cfg_point.set_value(workload_params, value,
msg = msg.format(self.workload_name, source.name, check_mandatory=False)
'", "'.join(workload_params[source])) if config:
msg = 'conflicting entry(ies) for "{}" in {}: "{}"'
msg = msg.format(self.workload_name, source.name,
'", "'.join(workload_params[source]))
self.workload_parameters = workload_params self.workload_parameters = workload_params
@ -920,12 +922,6 @@ class JobGenerator(object):
self._read_enabled_instruments = True self._read_enabled_instruments = True
return self._enabled_instruments return self._enabled_instruments
def update_enabled_instruments(self, value):
if self._read_enabled_instruments:
msg = "'enabled_instruments' cannot be updated after it has been accessed"
raise RuntimeError(msg)
self._enabled_instruments.update(value)
def __init__(self, plugin_cache): def __init__(self, plugin_cache):
self.plugin_cache = plugin_cache self.plugin_cache = plugin_cache
self.ids_to_run = [] self.ids_to_run = []
@ -962,55 +958,58 @@ class JobGenerator(object):
#TODO: Validate #TODO: Validate
self.disabled_instruments = ["~{}".format(i) for i in instruments] self.disabled_instruments = ["~{}".format(i) for i in instruments]
def update_enabled_instruments(self, value):
if self._read_enabled_instruments:
msg = "'enabled_instruments' cannot be updated after it has been accessed"
raise RuntimeError(msg)
self._enabled_instruments.update(value)
def only_run_ids(self, ids): def only_run_ids(self, ids):
if isinstance(ids, str): if isinstance(ids, str):
ids = [ids] ids = [ids]
self.ids_to_run = ids self.ids_to_run = ids
def generate_job_specs(self, target_manager): def generate_job_specs(self, target_manager):
for leaf in self.root_node.leaves(): for leaf in self.root_node.leaves():
# PHASE 1: Gather workload and section entries for this leaf
workload_entries = leaf.workload_entries workload_entries = leaf.workload_entries
sections = [leaf] sections = [leaf]
for ancestor in leaf.ancestors(): for ancestor in leaf.ancestors():
workload_entries = ancestor.workload_entries + workload_entries workload_entries = ancestor.workload_entries + workload_entries
sections.insert(0, ancestor) sections.insert(0, ancestor)
# PHASE 2: Create job specs for this leaf
for workload_entry in workload_entries: for workload_entry in workload_entries:
job_spec = JobSpec() # Loads defaults job_spec = create_job_spec(workload_entry, sections, target_manager)
for job_id in self.ids_to_run:
# PHASE 2.1: Merge general job spec configuration if job_id in job_spec.id:
for section in sections: break
job_spec.update_config(section, check_mandatory=False) else:
job_spec.update_config(workload_entry, check_mandatory=False) continue
# PHASE 2.2: Merge global, section and workload entry "workload_parameters"
job_spec.merge_workload_parameters(self.plugin_cache)
target_manager.static_runtime_parameter_validation(job_spec.runtime_parameters)
# TODO: PHASE 2.3: Validate device runtime/boot paramerers
job_spec.merge_runtime_parameters(self.plugin_cache, target_manager)
target_manager.validate_runtime_parameters(job_spec.runtime_parameters)
# PHASE 2.4: Disable globally disabled instrumentation
job_spec.set("instrumentation", self.disabled_instruments)
job_spec.finalize()
# PHASE 2.5: Skip job_spec if part of it's ID is not in self.ids_to_run
if self.ids_to_run:
for job_id in self.ids_to_run:
if job_id in job_spec.id:
#TODO: logging
break
else:
continue
# PHASE 2.6: Update list of instruments that need to be setup
# pylint: disable=no-member
self.update_enabled_instruments(job_spec.instrumentation.values()) self.update_enabled_instruments(job_spec.instrumentation.values())
yield job_spec
def create_job_spec(workload_entry, sections, target_manager):
job_spec = JobSpec()
# PHASE 2.1: Merge general job spec configuration
for section in sections:
job_spec.update_config(section, check_mandatory=False)
job_spec.update_config(workload_entry, check_mandatory=False)
# PHASE 2.2: Merge global, section and workload entry "workload_parameters"
job_spec.merge_workload_parameters(self.plugin_cache)
target_manager.static_runtime_parameter_validation(job_spec.runtime_parameters)
# TODO: PHASE 2.3: Validate device runtime/boot paramerers
job_spec.merge_runtime_parameters(self.plugin_cache, target_manager)
target_manager.validate_runtime_parameters(job_spec.runtime_parameters)
# PHASE 2.4: Disable globally disabled instrumentation
job_spec.set("instrumentation", self.disabled_instruments)
job_spec.finalize()
return job_spec
yield job_spec
settings = WAConfiguration(os.environ) settings = WAConfiguration(os.environ)

View File

@ -28,7 +28,7 @@ from wlauto.core.configuration.configuration import JobSpec
class ConfigParser(object): class ConfigParser(object):
def load_from_path(self, state, filepath): def load_from_path(self, state, filepath):
self.load(_load_file(filepath, "Config"), filepath) self.load(state, _load_file(filepath, "Config"), filepath)
def load(self, state, raw, source, wrap_exceptions=True): # pylint: disable=too-many-branches def load(self, state, raw, source, wrap_exceptions=True): # pylint: disable=too-many-branches
try: try:

View File

@ -105,7 +105,8 @@ class PluginCache(object):
for source in self.sources: for source in self.sources:
if source not in self.global_alias_values[alias]: if source not in self.global_alias_values[alias]:
continue continue
param.set_value(config, value=self.global_alias_values[alias][source]) val = self.global_alias_values[alias][source]
param.set_value(config, value=val)
# Merge user config # Merge user config
# Perform a simple merge with the order of sources representing priority # Perform a simple merge with the order of sources representing priority
@ -128,27 +129,34 @@ class PluginCache(object):
return {param.name: param for param in params} return {param.name: param for param in params}
# pylint: disable=too-many-nested-blocks, too-many-branches # pylint: disable=too-many-nested-blocks, too-many-branches
def _merge_using_priority_specificity(self, specific_name, generic_name, final_config): def _merge_using_priority_specificity(self, specific_name,
generic_name, final_config):
""" """
WA configuration can come from various sources of increasing priority, as well WA configuration can come from various sources of increasing priority,
as being specified in a generic and specific manner (e.g. ``device_config`` as well as being specified in a generic and specific manner (e.g.
and ``nexus10`` respectivly). WA has two rules for the priority of configuration: ``device_config`` and ``nexus10`` respectivly). WA has two rules for
the priority of configuration:
- Configuration from higher priority sources overrides configuration from - Configuration from higher priority sources overrides
lower priority sources. configuration from lower priority sources.
- More specific configuration overrides less specific configuration. - More specific configuration overrides less specific configuration.
There is a situation where these two rules come into conflict. When a generic There is a situation where these two rules come into conflict. When a
configuration is given in config source of high priority and a specific generic configuration is given in config source of high priority and a
configuration is given in a config source of lower priority. In this situation specific configuration is given in a config source of lower priority.
it is not possible to know the end users intention and WA will error. In this situation it is not possible to know the end users intention
and WA will error.
:param generic_name: The name of the generic configuration e.g ``device_config`` :param generic_name: The name of the generic configuration
:param specific_name: The name of the specific configuration used, e.g ``nexus10`` e.g ``device_config``
:param cfg_point: A dict of ``ConfigurationPoint``s to be used when merging configuration. :param specific_name: The name of the specific configuration used
keys=config point name, values=config point e.g ``nexus10``
:param cfg_point: A dict of ``ConfigurationPoint``s to be used when
merging configuration. keys=config point name,
values=config point
:rtype: A fully merged and validated configuration in the form of a obj_dict. :rtype: A fully merged and validated configuration in the form of a
obj_dict.
""" """
generic_config = copy(self.plugin_configs[generic_name]) generic_config = copy(self.plugin_configs[generic_name])
specific_config = copy(self.plugin_configs[specific_name]) specific_config = copy(self.plugin_configs[specific_name])

View File

@ -68,6 +68,7 @@ def main():
settings.set("verbosity", args.verbose) settings.set("verbosity", args.verbose)
state.load_config_file(settings.user_config_file)
for config_file in args.config: for config_file in args.config:
if not os.path.exists(config_file): if not os.path.exists(config_file):
raise ConfigError("Config file {} not found".format(config_file)) raise ConfigError("Config file {} not found".format(config_file))

View File

@ -71,33 +71,6 @@ MAX_REBOOT_ATTEMPTS = 3
REBOOT_DELAY = 3 REBOOT_DELAY = 3
class RunInfo(object):
"""
Information about the current run, such as its unique ID, run
time, etc.
"""
def __init__(self, config):
self.config = config
self.uuid = uuid.uuid4()
self.start_time = None
self.end_time = None
self.duration = None
self.project = config.project
self.project_stage = config.project_stage
self.run_name = config.run_name or "{}_{}".format(os.path.split(config.output_directory)[1],
datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S"))
self.notes = None
self.device_properties = {}
def to_dict(self):
d = copy(self.__dict__)
d['uuid'] = str(self.uuid)
return d
#TODO: pod
class ExecutionContext(object): class ExecutionContext(object):
""" """
Provides a context for instrumentation. Keeps track of things like Provides a context for instrumentation. Keeps track of things like
@ -264,33 +237,26 @@ class Executor(object):
self.device = None self.device = None
self.context = None self.context = None
def execute(self, state, selectors=None): # NOQA def execute(self, state, output):
""" """
Execute the run specified by an agenda. Optionally, selectors may be Execute the run specified by an agenda. Optionally, selectors may be
used to only selecute a subset of the specified agenda. used to only selecute a subset of the specified agenda.
Params:: Params::
:agenda: an ``Agenda`` instance to be executed. :state: a ``WAState`` containing processed configuraiton
:selectors: A dict mapping selector name to the coresponding values. :output: an initialized ``RunOutput`` that will be used to
store the results.
**Selectors**
Currently, the following seectors are supported:
ids
The value must be a sequence of workload specfication IDs to be
executed. Note that if sections are specified inthe agenda, the
workload specifacation ID will be a combination of the section and
workload IDs.
""" """
signal.connect(self._error_signalled_callback, signal.ERROR_LOGGED) signal.connect(self._error_signalled_callback, signal.ERROR_LOGGED)
signal.connect(self._warning_signalled_callback, signal.WARNING_LOGGED) signal.connect(self._warning_signalled_callback, signal.WARNING_LOGGED)
self.logger.info('Initializing') self.logger.info('Initializing run')
self.logger.debug('Loading run configuration.') self.logger.debug('Loading run configuration.')
def old_exec(self, agenda, selectors={}):
self.config.set_agenda(agenda, selectors) self.config.set_agenda(agenda, selectors)
self.config.finalize() self.config.finalize()
config_outfile = os.path.join(self.config.meta_directory, 'run_config.json') config_outfile = os.path.join(self.config.meta_directory, 'run_config.json')
@ -300,7 +266,8 @@ class Executor(object):
self.logger.debug('Initialising device configuration.') self.logger.debug('Initialising device configuration.')
if not self.config.device: if not self.config.device:
raise ConfigError('Make sure a device is specified in the config.') raise ConfigError('Make sure a device is specified in the config.')
self.device_manager = pluginloader.get_manager(self.config.device, **self.config.device_config) self.device_manager = pluginloader.get_manager(self.config.device,
**self.config.device_config)
self.device_manager.validate() self.device_manager.validate()
self.device = self.device_manager.target self.device = self.device_manager.target

143
wlauto/core/output.py Normal file
View File

@ -0,0 +1,143 @@
import logging
import os
import shutil
import string
import sys
import uuid
from copy import copy
from wlauto.utils.misc import touch
from wlauto.utils.serializer import write_pod, read_pod
logger = logging.getLogger('output')
class RunInfo(object):
"""
Information about the current run, such as its unique ID, run
time, etc.
"""
@staticmethod
def from_pod(pod):
uid = pod.pop('uuid')
if uid is not None:
uid = uuid.UUID(uid)
instance = RunInfo(**pod)
instance.uuid = uid
return instance
def __init__(self, run_name=None, project=None, project_stage=None,
start_time=None, end_time=None, duration=None):
self.uuid = uuid.uuid4()
self.run_name = None
self.project = None
self.project_stage = None
self.start_time = None
self.end_time = None
self.duration = None
def to_pod(self):
d = copy(self.__dict__)
d['uuid'] = str(self.uuid)
return d
class RunState(object):
"""
Represents the state of a WA run.
"""
@staticmethod
def from_pod(pod):
return RunState()
def __init__(self):
pass
def to_pod(self):
return {}
class RunOutput(object):
@property
def logfile(self):
return os.path.join(self.basepath, 'run.log')
@property
def metadir(self):
return os.path.join(self.basepath, '__meta')
@property
def infofile(self):
return os.path.join(self.metadir, 'run_info.json')
@property
def statefile(self):
return os.path.join(self.basepath, '.run_state.json')
def __init__(self, path):
self.basepath = path
self.info = None
self.state = None
if (not os.path.isfile(self.statefile) or
not os.path.isfile(self.infofile)):
msg = '"{}" does not exist or is not a valid WA output directory.'
raise ValueError(msg.format(self.basepath))
self.reload()
def reload(self):
self.info = RunInfo.from_pod(read_pod(self.infofile))
self.state = RunState.from_pod(read_pod(self.statefile))
def write_info(self):
write_pod(self.info.to_pod(), self.infofile)
def write_state(self):
write_pod(self.state.to_pod(), self.statefile)
def init_wa_output(path, wa_state, force=False):
if os.path.exists(path):
if force:
logger.info('Removing existing output directory.')
shutil.rmtree(os.path.abspath(path))
else:
raise RuntimeError('path exists: {}'.format(path))
logger.info('Creating output directory.')
os.makedirs(path)
meta_dir = os.path.join(path, '__meta')
os.makedirs(meta_dir)
_save_raw_config(meta_dir, wa_state)
touch(os.path.join(path, 'run.log'))
info = RunInfo(
run_name=wa_state.run_config.run_name,
project=wa_state.run_config.project,
project_stage=wa_state.run_config.project_stage,
)
write_pod(info.to_pod(), os.path.join(meta_dir, 'run_info.json'))
with open(os.path.join(path, '.run_state.json'), 'w') as wfh:
wfh.write('{}')
return RunOutput(path)
def _save_raw_config(meta_dir, state):
raw_config_dir = os.path.join(meta_dir, 'raw_config')
os.makedirs(raw_config_dir)
for i, source in enumerate(state.loaded_config_sources):
if not os.path.isfile(source):
continue
basename = os.path.basename(source)
dest_path = os.path.join(raw_config_dir, 'cfg{}-{}'.format(i, basename))
shutil.copy(source, dest_path)

View File

@ -18,11 +18,15 @@ class WAState(object):
self.run_config = RunConfiguration() self.run_config = RunConfiguration()
self.plugin_cache = PluginCache() self.plugin_cache = PluginCache()
self.jobs_config = JobGenerator(self.plugin_cache) self.jobs_config = JobGenerator(self.plugin_cache)
self.loaded_config_sources = []
self._config_parser = ConfigParser() self._config_parser = ConfigParser()
def load_config_file(self, filepath): def load_config_file(self, filepath):
self._config_parser.load_from_path(self, filepath) self._config_parser.load_from_path(self, filepath)
self.loaded_config_sources.append(filepath)
def load_config(self, values, source, wrap_exceptions=True): def load_config(self, values, source, wrap_exceptions=True):
self._config_parser.load(self, values, source) self._config_parser.load(self, values, source)
self.loaded_config_sources.append(source)

View File

@ -593,3 +593,8 @@ def merge_dicts_simple(base, other):
for key, value in (base or {}).iteritems(): for key, value in (base or {}).iteritems():
result[key] = merge_config_values(result.get(key), value) result[key] = merge_config_values(result.get(key), value)
return result return result
def touch(path):
with open(path, 'w'):
pass

View File

@ -228,6 +228,16 @@ def read_pod(source, fmt=None):
message = 'source must be a path or an open file handle; got {}' message = 'source must be a path or an open file handle; got {}'
raise ValueError(message.format(type(source))) raise ValueError(message.format(type(source)))
def write_pod(pod, dest, fmt=None):
if isinstance(dest, basestring):
with open(dest, 'w') as wfh:
return _write_pod(pod, wfh, fmt)
elif hasattr(dest, 'write') and (hasattr(dest, 'name') or fmt):
return _write_pod(pod, dest, fmt)
else:
message = 'dest must be a path or an open file handle; got {}'
raise ValueError(message.format(type(dest)))
def dump(o, wfh, fmt='json', *args, **kwargs): def dump(o, wfh, fmt='json', *args, **kwargs):
serializer = {'yaml': yaml, serializer = {'yaml': yaml,
@ -256,6 +266,17 @@ def _read_pod(fh, fmt=None):
else: else:
raise ValueError('Unknown format "{}": {}'.format(fmt, getattr(fh, 'name', '<none>'))) raise ValueError('Unknown format "{}": {}'.format(fmt, getattr(fh, 'name', '<none>')))
def _write_pod(pod, wfh, fmt=None):
if fmt is None:
fmt = os.path.splitext(wfh.name)[1].lower().strip('.')
if fmt == 'yaml':
return yaml.dump(pod, wfh)
elif fmt == 'json':
return json.dump(pod, wfh)
elif fmt == 'py':
raise ValueError('Serializing to Python is not supported')
else:
raise ValueError('Unknown format "{}": {}'.format(fmt, getattr(wfh, 'name', '<none>')))
def is_pod(obj): def is_pod(obj):
return type(obj) in POD_TYPES return type(obj) in POD_TYPES