1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-31 02:01:16 +00:00

Instrument intialization, job generation + bits

This commit is contained in:
Sergei Trofimov 2017-02-21 10:55:21 +00:00
parent 390e9ca78a
commit 18d001fd76
12 changed files with 228 additions and 19 deletions

View File

@ -90,6 +90,7 @@ class RunCommand(Command):
parser = AgendaParser() parser = AgendaParser()
if os.path.isfile(args.agenda): if os.path.isfile(args.agenda):
parser.load_from_path(config, args.agenda) parser.load_from_path(config, args.agenda)
shutil.copy(args.agenda, output.raw_config_dir)
else: else:
try: try:
pluginloader.get_plugin_class(args.agenda, kind='workload') pluginloader.get_plugin_class(args.agenda, kind='workload')

View File

@ -541,10 +541,9 @@ class Configuration(object):
def to_pod(self): def to_pod(self):
pod = {} pod = {}
for cfg_point in self.configuration.itervalues(): for cfg_point in self.config_points:
value = getattr(self, cfg_point.name, None) value = getattr(self, cfg_point.name, None)
if value is not None: pod[cfg_point.name] = _to_pod(cfg_point, value)
pod[cfg_point.name] = _to_pod(cfg_point, value)
return pod return pod
@ -848,6 +847,16 @@ class JobSpec(Configuration):
instance['id'] = job_id instance['id'] = job_id
return instance return instance
@property
def section_id(self):
if self.id is not None:
self.id.rsplit('-', 1)[0]
@property
def workload_id(self):
if self.id is not None:
self.id.rsplit('-', 1)[-1]
def __init__(self): def __init__(self):
super(JobSpec, self).__init__() super(JobSpec, self).__init__()
self.to_merge = defaultdict(OrderedDict) self.to_merge = defaultdict(OrderedDict)
@ -1001,7 +1010,6 @@ class JobGenerator(object):
return specs return specs
def create_job_spec(workload_entry, sections, target_manager, plugin_cache, def create_job_spec(workload_entry, sections, target_manager, plugin_cache,
disabled_instruments): disabled_instruments):
job_spec = JobSpec() job_spec = JobSpec()

View File

@ -1,3 +1,7 @@
import random
from itertools import izip_longest, groupby, chain
from wlauto.core import pluginloader
from wlauto.core.configuration.configuration import (MetaConfiguration, from wlauto.core.configuration.configuration import (MetaConfiguration,
RunConfiguration, RunConfiguration,
JobGenerator, settings) JobGenerator, settings)
@ -10,7 +14,7 @@ class CombinedConfig(object):
@staticmethod @staticmethod
def from_pod(pod): def from_pod(pod):
instance = CombinedConfig() instance = CombinedConfig()
instance.settings = MetaConfiguration.from_pod(pod.get('setttings', {})) instance.settings = MetaConfiguration.from_pod(pod.get('settings', {}))
instance.run_config = RunConfiguration.from_pod(pod.get('run_config', {})) instance.run_config = RunConfiguration.from_pod(pod.get('run_config', {}))
return instance return instance
@ -23,6 +27,24 @@ class CombinedConfig(object):
'run_config': self.run_config.to_pod()} 'run_config': self.run_config.to_pod()}
class Job(object):
def __init__(self, spec, iteration, context):
self.spec = spec
self.iteration = iteration
self.context = context
self.status = 'new'
self.workload = None
self.output = None
def load(self, target, loader=pluginloader):
self.workload = loader.get_workload(self.spec.workload_name,
target,
**self.spec.workload_parameters)
self.workload.init_resources(self.context)
self.workload.validate()
class ConfigManager(object): class ConfigManager(object):
""" """
Represents run-time state of WA. Mostly used as a container for loaded Represents run-time state of WA. Mostly used as a container for loaded
@ -32,6 +54,26 @@ class ConfigManager(object):
instance of wA itself. instance of wA itself.
""" """
@property
def enabled_instruments(self):
return self.jobs_config.enabled_instruments
@property
def job_specs(self):
if not self._jobs_generated:
msg = 'Attempting to access job specs before '\
'jobs have been generated'
raise RuntimeError(msg)
return [j.spec for j in self._jobs]
@property
def jobs(self):
if not self._jobs_generated:
msg = 'Attempting to access jobs before '\
'they have been generated'
raise RuntimeError(msg)
return self._jobs
def __init__(self, settings=settings): def __init__(self, settings=settings):
self.settings = settings self.settings = settings
self.run_config = RunConfiguration() self.run_config = RunConfiguration()
@ -39,8 +81,9 @@ class ConfigManager(object):
self.jobs_config = JobGenerator(self.plugin_cache) self.jobs_config = JobGenerator(self.plugin_cache)
self.loaded_config_sources = [] self.loaded_config_sources = []
self._config_parser = ConfigParser() self._config_parser = ConfigParser()
self._job_specs = [] self._jobs = []
self.jobs = [] self._jobs_generated = False
self.agenda = None
def load_config_file(self, filepath): def load_config_file(self, filepath):
self._config_parser.load_from_path(self, filepath) self._config_parser.load_from_path(self, filepath)
@ -50,7 +93,121 @@ class ConfigManager(object):
self._config_parser.load(self, values, source) self._config_parser.load(self, values, source)
self.loaded_config_sources.append(source) self.loaded_config_sources.append(source)
def get_plugin(self, name=None, kind=None, *args, **kwargs):
return self.plugin_cache.get_plugin(name, kind, *args, **kwargs)
def get_instruments(self, target):
instruments = []
for name in self.enabled_instruments:
instruments.append(self.get_plugin(name, kind='instrument',
target=target))
return instruments
def finalize(self): def finalize(self):
if not self.agenda:
msg = 'Attempting to finalize config before agenda has been set'
raise RuntimeError(msg)
self.run_config.merge_device_config(self.plugin_cache) self.run_config.merge_device_config(self.plugin_cache)
return CombinedConfig(self.settings, self.run_config) return CombinedConfig(self.settings, self.run_config)
def generate_jobs(self, context):
job_specs = self.jobs_config.generate_job_specs(context.tm)
exec_order = self.run_config.execution_order
for spec, i in permute_iterations(job_specs, exec_order):
job = Job(spec, i, context)
job.load(context.tm.target)
self._jobs.append(job)
self._jobs_generated = True
def permute_by_job(specs):
"""
This is that "classic" implementation that executes all iterations of a
workload spec before proceeding onto the next spec.
"""
for spec in specs:
for i in range(1, spec.iterations + 1):
yield (spec, i)
def permute_by_iteration(specs):
"""
Runs the first iteration for all benchmarks first, before proceeding to the
next iteration, i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2,
C1, C2...
If multiple sections where specified in the agenda, this will run all
sections for the first global spec first, followed by all sections for the
second spec, etc.
e.g. given sections X and Y, and global specs A and B, with 2 iterations,
this will run
X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2
"""
groups = [list(g) for k, g in groupby(specs, lambda s: s.workload_id)]
all_tuples = []
for spec in chain(*groups):
all_tuples.append([(spec, i + 1)
for i in xrange(spec.iterations)])
for t in chain(*map(list, izip_longest(*all_tuples))):
if t is not None:
yield t
def permute_by_section(specs):
"""
Runs the first iteration for all benchmarks first, before proceeding to the
next iteration, i.e. A1, B1, C1, A2, B2, C2... instead of A1, A1, B1, B2,
C1, C2...
If multiple sections where specified in the agenda, this will run all specs
for the first section followed by all specs for the seciod section, etc.
e.g. given sections X and Y, and global specs A and B, with 2 iterations,
this will run
X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2
"""
groups = [list(g) for k, g in groupby(specs, lambda s: s.section_id)]
all_tuples = []
for spec in chain(*groups):
all_tuples.append([(spec, i + 1)
for i in xrange(spec.iterations)])
for t in chain(*map(list, izip_longest(*all_tuples))):
if t is not None:
yield t
def permute_randomly(specs):
"""
This will generate a random permutation of specs/iteration tuples.
"""
result = []
for spec in specs:
for i in xrange(1, spec.iterations + 1):
result.append((spec, i))
random.shuffle(result)
for t in result:
yield t
permute_map = {
'by_iteration': permute_by_iteration,
'by_job': permute_by_job,
'by_section': permute_by_section,
'random': permute_randomly,
}
def permute_iterations(specs, exec_order):
if exec_order not in permute_map:
msg = 'Unknown execution order "{}"; must be in: {}'
raise ValueError(msg.format(exec_order, permute_map.keys()))
return permute_map[exec_order](specs)

View File

@ -95,6 +95,8 @@ class AgendaParser(object):
self._process_global_workloads(state, global_workloads, wkl_ids) self._process_global_workloads(state, global_workloads, wkl_ids)
self._process_sections(state, sections, sect_ids, wkl_ids) self._process_sections(state, sections, sect_ids, wkl_ids)
state.agenda = source
except (ConfigError, SerializerSyntaxError) as e: except (ConfigError, SerializerSyntaxError) as e:
raise ConfigError('Error in "{}":\n\t{}'.format(source, str(e))) raise ConfigError('Error in "{}":\n\t{}'.format(source, str(e)))
@ -156,7 +158,7 @@ class AgendaParser(object):
state.jobs_config) state.jobs_config)
workloads.append(workload) workloads.append(workload)
section = _construct_valid_entry(section, seen_section_ids, section = _construct_valid_entry(section, seen_sect_ids,
"s", state.jobs_config) "s", state.jobs_config)
state.jobs_config.add_section(section, workloads) state.jobs_config.add_section(section, workloads)

View File

@ -123,6 +123,11 @@ class PluginCache(object):
return config return config
def get_plugin(self, name, kind=None, *args, **kwargs):
config = self.get_plugin_config(name)
kwargs = dict(config.items() + kwargs.items())
return self.loader.get_plugin(name, kind=kind, *args, **kwargs)
@memoized @memoized
def get_plugin_parameters(self, name): def get_plugin_parameters(self, name):
params = self.loader.get_plugin_class(name).parameters params = self.loader.get_plugin_class(name).parameters

View File

@ -73,6 +73,19 @@ REBOOT_DELAY = 3
class ExecutionContext(object): class ExecutionContext(object):
def __init__(self, cm, tm, output):
self.logger = logging.getLogger('ExecContext')
self.cm = cm
self.tm = tm
self.output = output
self.logger.debug('Loading resource discoverers')
self.resolver = ResourceResolver(cm)
self.resolver.load()
class OldExecutionContext(object):
""" """
Provides a context for instrumentation. Keeps track of things like Provides a context for instrumentation. Keeps track of things like
current workload and iteration. current workload and iteration.
@ -214,8 +227,8 @@ def _check_artifact_path(path, rootpath):
return full_path return full_path
class FakeTargetManager(object): class FakeTargetManager(object):
# TODO: this is a FAKE
def __init__(self, name, config): def __init__(self, name, config):
self.device_name = name self.device_name = name
@ -286,9 +299,17 @@ class Executor(object):
target_manager = init_target_manager(config.run_config) target_manager = init_target_manager(config.run_config)
output.write_target_info(target_manager.get_target_info()) output.write_target_info(target_manager.get_target_info())
self.logger.info('Generationg jobs') self.logger.info('Initializing execution conetext')
job_specs = config_manager.jobs_config.generate_job_specs(target_manager) context = ExecutionContext(config_manager, target_manager, output)
output.write_job_specs(job_specs)
self.logger.info('Generating jobs')
config_manager.generate_jobs(context)
output.write_job_specs(config_manager.job_specs)
self.logger.info('Installing instrumentation')
for instrument in config_manager.get_instruments(target_manager.target):
instrumentation.install(instrument)
instrumentation.validate()
def old_exec(self, agenda, selectors={}): def old_exec(self, agenda, selectors={}):
self.config.set_agenda(agenda, selectors) self.config.set_agenda(agenda, selectors)
@ -396,6 +417,12 @@ class Executor(object):
signal.disconnect(self._warning_signalled_callback, signal.WARNING_LOGGED) signal.disconnect(self._warning_signalled_callback, signal.WARNING_LOGGED)
class Runner(object):
"""
"""
class RunnerJob(object): class RunnerJob(object):
""" """
Represents a single execution of a ``RunnerJobDescription``. There will be one created for each iteration Represents a single execution of a ``RunnerJobDescription``. There will be one created for each iteration
@ -410,7 +437,7 @@ class RunnerJob(object):
self.result = IterationResult(self.spec) self.result = IterationResult(self.spec)
class Runner(object): class OldRunner(object):
""" """
This class is responsible for actually performing a workload automation This class is responsible for actually performing a workload automation
run. The main responsibility of this class is to emit appropriate signals run. The main responsibility of this class is to emit appropriate signals

View File

@ -380,9 +380,9 @@ class Instrument(Plugin):
""" """
kind = "instrument" kind = "instrument"
def __init__(self, device, **kwargs): def __init__(self, target, **kwargs):
super(Instrument, self).__init__(**kwargs) super(Instrument, self).__init__(**kwargs)
self.device = device self.target = target
self.is_enabled = True self.is_enabled = True
self.is_broken = False self.is_broken = False

View File

@ -93,6 +93,10 @@ class RunOutput(object):
def jobsfile(self): def jobsfile(self):
return os.path.join(self.metadir, 'jobs.json') return os.path.join(self.metadir, 'jobs.json')
@property
def raw_config_dir(self):
return os.path.join(self.metadir, 'raw_config')
def __init__(self, path): def __init__(self, path):
self.basepath = path self.basepath = path
self.info = None self.info = None

View File

@ -557,6 +557,8 @@ class PluginLoader(object):
def update(self, packages=None, paths=None, ignore_paths=None): def update(self, packages=None, paths=None, ignore_paths=None):
""" Load plugins from the specified paths/packages """ Load plugins from the specified paths/packages
without clearing or reloading existing plugin. """ without clearing or reloading existing plugin. """
msg = 'Updating from: packages={} paths={}'
self.logger.debug(msg.format(packages, paths))
if packages: if packages:
self.packages.extend(packages) self.packages.extend(packages)
self._discover_from_packages(packages) self._discover_from_packages(packages)
@ -572,6 +574,7 @@ class PluginLoader(object):
def reload(self): def reload(self):
""" Clear all discovered items and re-run the discovery. """ """ Clear all discovered items and re-run the discovery. """
self.logger.debug('Reloading')
self.clear() self.clear()
self._discover_from_packages(self.packages) self._discover_from_packages(self.packages)
self._discover_from_paths(self.paths, self.ignore_paths) self._discover_from_paths(self.paths, self.ignore_paths)
@ -591,7 +594,8 @@ class PluginLoader(object):
raise ValueError('Unknown plugin type: {}'.format(kind)) raise ValueError('Unknown plugin type: {}'.format(kind))
store = self.kind_map[kind] store = self.kind_map[kind]
if name not in store: if name not in store:
raise NotFoundError('plugins {} is not {} {}.'.format(name, get_article(kind), kind)) msg = 'plugins {} is not {} {}.'
raise NotFoundError(msg.format(name, get_article(kind), kind))
return store[name] return store[name]
def get_plugin(self, name=None, kind=None, *args, **kwargs): def get_plugin(self, name=None, kind=None, *args, **kwargs):

View File

@ -48,7 +48,7 @@ class ResourceResolver(object):
""" """
for rescls in self.config.ext_loader.list_resource_getters(): for rescls in pluginloader.list_resource_getters():
getter = self.config.get_plugin(name=rescls.name, kind="resource_getter", resolver=self) getter = self.config.get_plugin(name=rescls.name, kind="resource_getter", resolver=self)
getter.register() getter.register()

View File

@ -207,8 +207,8 @@ class ExecutionTimeInstrument(Instrument):
priority = 15 priority = 15
def __init__(self, device, **kwargs): def __init__(self, target, **kwargs):
super(ExecutionTimeInstrument, self).__init__(device, **kwargs) super(ExecutionTimeInstrument, self).__init__(target, **kwargs)
self.start_time = None self.start_time = None
self.end_time = None self.end_time = None

View File

@ -404,6 +404,7 @@ class toggle_set(set):
def to_pod(self): def to_pod(self):
return list(self.values()) return list(self.values())
class ID(str): class ID(str):
def merge_with(self, other): def merge_with(self, other):