diff --git a/wa/__init__.py b/wa/__init__.py index ab6a7cca..cf6a4d2e 100644 --- a/wa/__init__.py +++ b/wa/__init__.py @@ -1,6 +1,7 @@ from wa.framework import pluginloader, log, signal from wa.framework.command import Command from wa.framework.configuration import settings +from wa.framework.configuration.core import Status from wa.framework.exception import HostError, JobError, InstrumentError, ConfigError from wa.framework.exception import (ResultProcessorError, ResourceError, CommandError, ToolError) @@ -10,4 +11,7 @@ from wa.framework.exception import WorkerThreadError, PluginLoaderError from wa.framework.instrumentation import (Instrument, very_slow, slow, normal, fast, very_fast) from wa.framework.plugin import Plugin, Parameter +from wa.framework.processor import ResultProcessor +from wa.framework.resource import (NO_ONE, JarFile, ApkFile, ReventFile, File, + Executable) from wa.framework.workload import Workload diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 8e2ed919..04924397 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -32,11 +32,10 @@ KIND_MAP = { dict: OrderedDict, } -RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', - 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) +Status = enum(['UNKNOWN', 'NEW', 'PENDING', + 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', + 'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) -JobStatus = enum(['NEW', 'PENDING', 'RUNNING', - 'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) ########################## @@ -549,12 +548,11 @@ class MetaConfiguration(Configuration): plugin_packages = [ 'wa.commands', - 'wa.workloads', - 'wa.instrumentation', - #'wa.result_processors', - #'wa.managers', + 'wa.framework.getters', 'wa.framework.target.descriptor', - 'wa.framework.resource_getters', + 'wa.instrumentation', + 'wa.processors', + 'wa.workloads', ] config_points = [ @@ -741,9 +739,9 @@ class RunConfiguration(Configuration): ), ConfigurationPoint( 'retry_on_status', - kind=list_of(JobStatus), + kind=list_of(Status), default=['FAILED', 'PARTIAL'], - allowed_values=JobStatus.values, + allowed_values=Status.values[Status.RUNNING.value:], description=''' This is list of statuses on which a job will be cosidered to have failed and will be automatically retried up to ``max_retries`` @@ -774,6 +772,17 @@ class RunConfiguration(Configuration): .. note:: this number does not include the original attempt ''', ), + ConfigurationPoint( + 'result_processors', + kind=toggle_set, + default=['csv', 'status'], + description=''' + The list of output processors to be used for this run. Output processors + post-process data generated by workloads and instruments, e.g. to + generate additional reports, format the output in a certain way, or + export the output to an exeternal location. + ''', + ), ] configuration = {cp.name: cp for cp in config_points + meta_data} diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 66df05ca..e568ad34 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -4,7 +4,7 @@ from itertools import izip_longest, groupby, chain from wa.framework import pluginloader from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration, - JobGenerator, JobStatus, settings) + JobGenerator, Status, settings) from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache from wa.framework.exception import NotFoundError @@ -88,12 +88,23 @@ class ConfigManager(object): for name in self.enabled_instruments: try: instruments.append(self.get_plugin(name, kind='instrument', - target=target)) + target=target)) except NotFoundError: msg = 'Instrument "{}" not found' raise NotFoundError(msg.format(name)) return instruments + def get_processors(self): + processors = [] + for name in self.run_config.result_processors: + try: + proc = self.plugin_cache.get_plugin(name, kind='result_processor') + except NotFoundError: + msg = 'Result processor "{}" not found' + raise NotFoundError(msg.format(name)) + processors.append(proc) + return processors + def finalize(self): if not self.agenda: msg = 'Attempting to finalize config before agenda has been set' diff --git a/wa/framework/configuration/plugin_cache.py b/wa/framework/configuration/plugin_cache.py index d8a3f8e8..3c87753e 100644 --- a/wa/framework/configuration/plugin_cache.py +++ b/wa/framework/configuration/plugin_cache.py @@ -95,6 +95,9 @@ class PluginCache(object): def is_global_alias(self, name): return name in self._list_of_global_aliases + def list_plugins(self, kind=None): + return self.loader.list_plugins(kind) + def get_plugin_config(self, plugin_name, generic_name=None): config = obj_dict(not_in_dict=['name']) config.name = plugin_name diff --git a/wa/framework/execution.py b/wa/framework/execution.py index d4e1672d..12b7be8a 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -28,12 +28,13 @@ from itertools import izip_longest import wa.framework.signal as signal from wa.framework import instrumentation, pluginloader -from wa.framework.configuration.core import settings, RunStatus, JobStatus +from wa.framework.configuration.core import settings, Status from wa.framework.exception import (WAError, ConfigError, TimeoutError, InstrumentError, TargetError, TargetNotRespondingError) from wa.framework.output import init_job_output from wa.framework.plugin import Artifact +from wa.framework.processor import ProcessorManager from wa.framework.resource import ResourceResolver from wa.framework.run import RunState from wa.framework.target.info import TargetInfo @@ -95,23 +96,36 @@ class ExecutionContext(object): self.tm = tm self.run_output = output self.run_state = output.state + self.target_info = self.tm.get_target_info() self.logger.debug('Loading resource discoverers') - self.resolver = ResourceResolver(cm) + self.resolver = ResourceResolver(cm.plugin_cache) self.resolver.load() self.job_queue = None self.completed_jobs = None self.current_job = None + self.successful_jobs = 0 + self.failed_jobs = 0 def start_run(self): - self.output.info.start_time = datetime.now() + self.output.info.start_time = datetime.utcnow() self.output.write_info() self.job_queue = copy(self.cm.jobs) self.completed_jobs = [] - self.run_state.status = RunStatus.STARTED + self.run_state.status = Status.STARTED + self.output.status = Status.STARTED self.output.write_state() def end_run(self): - self.output.info.end_time = datetime.now() + if self.successful_jobs: + if self.failed_jobs: + status = Status.PARTIAL + else: + status = Status.OK + else: + status = Status.FAILED + self.run_state.status = status + self.output.status = status + self.output.info.end_time = datetime.utcnow() self.output.info.duration = self.output.info.end_time -\ self.output.info.start_time self.output.write_info() @@ -144,7 +158,7 @@ class ExecutionContext(object): def skip_remaining_jobs(self): while self.job_queue: job = self.job_queue.pop(0) - job.status = JobStatus.SKIPPED + job.status = Status.SKIPPED self.run_state.update_job(job) self.completed_jobs.append(job) self.write_state() @@ -166,6 +180,9 @@ class ExecutionContext(object): classifiers=None): self.run_output.add_artifact(name, path, kind, description, classifiers) + def add_event(self, message): + self.output.add_event(message) + class Executor(object): """ @@ -228,8 +245,14 @@ class Executor(object): instrumentation.install(instrument) instrumentation.validate() + self.logger.info('Installing result processors') + pm = ProcessorManager() + for proc in config_manager.get_processors(): + pm.install(proc) + pm.validate() + self.logger.info('Starting run') - runner = Runner(context) + runner = Runner(context, pm) signal.send(signal.RUN_STARTED, self) runner.run() self.execute_postamble(context, output) @@ -244,7 +267,7 @@ class Executor(object): counter = context.run_state.get_status_counts() parts = [] - for status in reversed(JobStatus.values): + for status in reversed(Status.values): if status in counter: parts.append('{} {}'.format(counter[status], status)) self.logger.info(status_summary + ', '.join(parts)) @@ -272,9 +295,10 @@ class Runner(object): """ - def __init__(self, context): + def __init__(self, context, pm): self.logger = logging.getLogger('runner') self.context = context + self.pm = pm self.output = self.context.output self.config = self.context.cm @@ -290,6 +314,7 @@ class Runner(object): except KeyboardInterrupt: self.context.skip_remaining_jobs() except Exception as e: + self.context.add_event(e.message) if (not getattr(e, 'logged', None) and not isinstance(e, KeyboardInterrupt)): log.log_error(e, self.logger) @@ -302,6 +327,7 @@ class Runner(object): def initialize_run(self): self.logger.info('Initializing run') self.context.start_run() + self.pm.initialize() log.indent() for job in self.context.job_queue: job.initialize(self.context) @@ -311,6 +337,9 @@ class Runner(object): def finalize_run(self): self.logger.info('Finalizing run') self.context.end_run() + self.pm.process_run_output(self.context) + self.pm.export_run_output(self.context) + self.pm.finalize() def run_next_job(self, context): job = context.start_job() @@ -319,12 +348,13 @@ class Runner(object): try: log.indent() self.do_run_job(job, context) - job.status = JobStatus.OK + job.status = Status.OK except KeyboardInterrupt: - job.status = JobStatus.ABORTED + job.status = Status.ABORTED raise except Exception as e: - job.status = JobStatus.FAILED + job.status = Status.FAILED + context.add_event(e.message) if not getattr(e, 'logged', None): log.log_error(e, self.logger) e.logged = True @@ -337,7 +367,7 @@ class Runner(object): self.check_job(job) def do_run_job(self, job, context): - job.status = JobStatus.RUNNING + job.status = Status.RUNNING self.send(signal.JOB_STARTED) with signal.wrap('JOB_TARGET_CONFIG', self): @@ -353,15 +383,18 @@ class Runner(object): try: with signal.wrap('JOB_OUTPUT_PROCESSED', self): job.process_output(context) + self.pm.process_job_output(context) + self.pm.export_job_output(context) except Exception: - job.status = JobStatus.PARTIAL + job.status = Status.PARTIAL raise + except KeyboardInterrupt: - job.status = JobStatus.ABORTED + job.status = Status.ABORTED self.logger.info('Got CTRL-C. Aborting.') raise except Exception as e: - job.status = JobStatus.FAILED + job.status = Status.FAILED if not getattr(e, 'logged', None): log.log_error(e, self.logger) e.logged = True @@ -380,15 +413,17 @@ class Runner(object): self.logger.error(msg.format(job.id, job.status, job.iteration)) self.context.move_failed(job) job.retries += 1 - job.status = JobStatus.PENDING + job.status = Status.PENDING self.context.job_queue.insert(0, job) self.context.write_state() else: msg = 'Job {} iteration {} completed with status {}. '\ 'Max retries exceeded.' self.logger.error(msg.format(job.id, job.status, job.iteration)) + self.context.failed_jobs += 1 else: # status not in retry_on_status self.logger.info('Job completed with status {}'.format(job.status)) + self.context.successful_jobs += 1 def send(self, s): signal.send(s, self, self.context) diff --git a/wa/framework/getters.py b/wa/framework/getters.py new file mode 100644 index 00000000..9ce5f94e --- /dev/null +++ b/wa/framework/getters.py @@ -0,0 +1,331 @@ +# Copyright 2013-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +""" +This module contains the standard set of resource getters used by Workload Automation. + +""" +import httplib +import inspect +import json +import logging +import os +import re +import shutil +import sys + +import requests + +from devlib.utils.android import ApkInfo + +from wa import Parameter, settings, __file__ as __base_filepath +from wa.framework.resource import ResourceGetter, SourcePriority, NO_ONE +from wa.framework.exception import ResourceError +from wa.utils.misc import (ensure_directory_exists as _d, + ensure_file_directory_exists as _f, sha256, urljoin) +from wa.utils.types import boolean, caseless_string + + +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + +logger = logging.getLogger('resource') + + +def get_by_extension(path, ext): + if not ext.startswith('.'): + ext = '.' + ext + ext = caseless_string(ext) + + found = [] + for entry in os.listdir(path): + entry_ext = os.path.splitext(entry) + if entry_ext == ext: + found.append(os.path.join(path, entry)) + return found + + +def get_generic_resource(resource, files): + matches = [] + for f in files: + if resource.match(f): + matches.append(f) + if not matches: + return None + if len(matches) > 1: + msg = 'Multiple matches for {}: {}' + return ResourceError(msg.format(resource, matches)) + return matches[0] + + +def get_from_location(basepath, resource): + if resource.kind == 'file': + path = os.path.join(basepath, resource.path) + if os.path.exists(path): + return path + elif resource.kind == 'executable': + path = os.path.join(basepath, 'bin', resource.abi, resource.filename) + if os.path.exists(path): + return path + elif resource.kind in ['apk', 'jar', 'revent']: + files = get_by_extension(basepath, resource.kind) + return get_generic_resource(resource, files) + + return None + + +class Package(ResourceGetter): + + name = 'package' + + def register(self, resolver): + resolver.register(self.get, SourcePriority.package) + + def get(self, resource): + if resource.owner == NO_ONE: + basepath = os.path.join(os.path.dirname(__base_filepath), 'assets') + else: + modname = resource.owner.__module__ + basepath = os.path.dirname(sys.modules[modname].__file__) + return get_from_location(basepath, resource) + + +class UserDirectory(ResourceGetter): + + name = 'user' + + def register(self, resolver): + resolver.register(self.get, SourcePriority.local) + + def get(self, resource): + basepath = settings.dependencies_directory + return get_from_location(basepath, resource) + + +class Http(ResourceGetter): + + name = 'http' + description = """ + Downloads resources from a server based on an index fetched from the + specified URL. + + Given a URL, this will try to fetch ``/index.json``. The index file + maps extension names to a list of corresponing asset descriptons. Each + asset description continas a path (relative to the base URL) of the + resource and a SHA256 hash, so that this Getter can verify whether the + resource on the remote has changed. + + For example, let's assume we want to get the APK file for workload "foo", + and that assets are hosted at ``http://example.com/assets``. This Getter + will first try to donwload ``http://example.com/assests/index.json``. The + index file may contian something like :: + + { + "foo": [ + { + "path": "foo-app.apk", + "sha256": "b14530bb47e04ed655ac5e80e69beaa61c2020450e18638f54384332dffebe86" + }, + { + "path": "subdir/some-other-asset.file", + "sha256": "48d9050e9802246d820625717b72f1c2ba431904b8484ca39befd68d1dbedfff" + } + ] + } + + This Getter will look through the list of assets for "foo" (in this case, + two) check the paths until it finds one matching the resource (in this + case, "foo-app.apk"). Finally, it will try to dowload that file relative + to the base URL and extension name (in this case, + "http://example.com/assets/foo/foo-app.apk"). The downloaded version will + be cached locally, so that in the future, the getter will check the SHA256 + hash of the local file against the one advertised inside index.json, and + provided that hasn't changed, it won't try to download the file again. + + """ + parameters = [ + Parameter('url', global_alias='remote_assets_url', + description=""" + URL of the index file for assets on an HTTP server. + """), + Parameter('username', + description=""" + User name for authenticating with assets URL + """), + Parameter('password', + description=""" + Password for authenticationg with assets URL + """), + Parameter('always_fetch', kind=boolean, default=False, + global_alias='always_fetch_remote_assets', + description=""" + If ``True``, will always attempt to fetch assets from the + remote, even if a local cached copy is available. + """), + Parameter('chunk_size', kind=int, default=1024, + description=""" + Chunk size for streaming large assets. + """), + ] + + def __init__(self, **kwargs): + super(Http, self).__init__(**kwargs) + self.logger = logger + self.index = None + + def register(self, resolver): + resolver.register(self.get, SourcePriority.remote) + + def get(self, resource): + if not resource.owner: + return # TODO: add support for unowned resources + if not self.index: + self.index = self.fetch_index() + asset = self.resolve_resource(resource) + if not asset: + return + return self.download_asset(asset, resource.owner.name) + + def fetch_index(self): + if not self.url: + return {} + index_url = urljoin(self.url, 'index.json') + response = self.geturl(index_url) + if response.status_code != httplib.OK: + message = 'Could not fetch "{}"; recieved "{} {}"' + self.logger.error(message.format(index_url, + response.status_code, + response.reason)) + return {} + return json.loads(response.content) + + def download_asset(self, asset, owner_name): + url = urljoin(self.url, owner_name, asset['path']) + local_path = _f(os.path.join(settings.dependencies_directory, '__remote', + owner_name, asset['path'].replace('/', os.sep))) + if os.path.exists(local_path) and not self.always_fetch: + local_sha = sha256(local_path) + if local_sha == asset['sha256']: + self.logger.debug('Local SHA256 matches; not re-downloading') + return local_path + self.logger.debug('Downloading {}'.format(url)) + response = self.geturl(url, stream=True) + if response.status_code != httplib.OK: + message = 'Could not download asset "{}"; recieved "{} {}"' + self.logger.warning(message.format(url, + response.status_code, + response.reason)) + return + with open(local_path, 'wb') as wfh: + for chunk in response.iter_content(chunk_size=self.chunk_size): + wfh.write(chunk) + return local_path + + def geturl(self, url, stream=False): + if self.username: + auth = (self.username, self.password) + else: + auth = None + return requests.get(url, auth=auth, stream=stream) + + def resolve_resource(self, resource): + # pylint: disable=too-many-branches,too-many-locals + assets = self.index.get(resource.owner.name, {}) + if not assets: + return {} + + asset_map = {a['path']: a for a in assets} + if resource.kind in ['apk', 'jar', 'revent']: + if resource.kind == 'apk' and resource.version: + # TODO: modify the index format to attach version info to the + # APK entries. + msg = 'Versions of APKs cannot be fetched over HTTP at this time' + self.logger.warning(msg) + return {} + path = get_generic_resource(resource, asset_map.keys()) + if path: + return asset_map[path] + elif resource.kind == 'executable': + path = '/'.join(['bin', resource.abi, resource.filename]) + for asset in assets: + if asset['path'].lower() == path.lower(): + return asset + else: # file + for asset in assets: + if asset['path'].lower() == resource.path.lower(): + return asset + + +class Filer(ResourceGetter): + + name = 'filer' + description = """ + Finds resources on a (locally mounted) remote filer and caches them + locally. + + This assumes that the filer is mounted on the local machine (e.g. as a + samba share). + + """ + parameters = [ + Parameter('remote_path', global_alias='remote_assets_path', default='', + description=""" + Path, on the local system, where the assets are located. + """), + Parameter('always_fetch', kind=boolean, default=False, + global_alias='always_fetch_remote_assets', + description=""" + If ``True``, will always attempt to fetch assets from the + remote, even if a local cached copy is available. + """), + ] + + def register(self, resolver): + resolver.register(self.get, SourcePriority.lan) + + def get(self, resource): + if resource.owner: + remote_path = os.path.join(self.remote_path, resource.owner.name) + local_path = os.path.join(settings.dependencies_directory, '__filer', + resource.owner.dependencies_directory) + return self.try_get_resource(resource, remote_path, local_path) + else: # No owner + result = None + for entry in os.listdir(remote_path): + remote_path = os.path.join(self.remote_path, entry) + local_path = os.path.join(settings.dependencies_directory, '__filer', + settings.dependencies_directory, entry) + result = self.try_get_resource(resource, remote_path, local_path) + if result: + break + return result + + def try_get_resource(self, resource, remote_path, local_path): + if not self.always_fetch: + result = get_from_location(local_path, resource) + if result: + return result + if remote_path: + # Didn't find it cached locally; now check the remoted + result = get_from_location(remote_path, resource) + if not result: + return result + else: # remote path is not set + return None + # Found it remotely, cache locally, then return it + local_full_path = os.path.join(_d(local_path), os.path.basename(result)) + self.logger.debug('cp {} {}'.format(result, local_full_path)) + shutil.copy(result, local_full_path) diff --git a/wa/framework/job.py b/wa/framework/job.py index 3a3ee2d4..9b7bc7da 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -1,7 +1,7 @@ import logging from wa.framework import pluginloader, signal -from wa.framework.configuration.core import JobStatus +from wa.framework.configuration.core import Status class Job(object): @@ -18,15 +18,25 @@ class Job(object): def classifiers(self): return self.spec.classifiers + @property + def status(self): + return self._status + + @status.setter + def status(self, value): + self._status = value + if self.output: + self.output.status = value + def __init__(self, spec, iteration, context): self.logger = logging.getLogger('job') self.spec = spec self.iteration = iteration self.context = context - self.status = JobStatus.NEW self.workload = None self.output = None self.retries = 0 + self._status = Status.NEW def load(self, target, loader=pluginloader): self.logger.info('Loading job {}'.format(self.id)) @@ -40,7 +50,7 @@ class Job(object): self.logger.info('Initializing job {}'.format(self.id)) with signal.wrap('WORKLOAD_INITIALIZED', self, context): self.workload.initialize(context) - self.status = JobStatus.PENDING + self.status = Status.PENDING context.update_job_state(self) def configure_target(self, context): diff --git a/wa/framework/output.py b/wa/framework/output.py index 6111ecad..daf3d83a 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -5,10 +5,11 @@ import string import sys import uuid from copy import copy -from datetime import timedelta +from datetime import datetime, timedelta -from wa.framework.configuration.core import JobSpec, RunStatus -from wa.framework.configuration.manager import ConfigManager +from wa.framework.configuration.core import JobSpec, Status +from wa.framework.configuration.execution import ConfigManager +from wa.framework.exception import HostError from wa.framework.run import RunState, RunInfo from wa.framework.target.info import TargetInfo from wa.utils.misc import touch, ensure_directory_exists @@ -21,13 +22,37 @@ logger = logging.getLogger('output') class Output(object): + kind = None + @property def resultfile(self): return os.path.join(self.basepath, 'result.json') + @property + def event_summary(self): + num_events = len(self.events) + if num_events: + lines = self.events[0].message.split('\n') + message = '({} event(s)): {}' + if num_events > 1 or len(lines) > 1: + message += '[...]' + return message.format(num_events, lines[0]) + return '' + + @property + def status(self): + if self.result is None: + return None + return self.result.status + + @status.setter + def status(self, value): + self.result.status = value + def __init__(self, path): self.basepath = path self.result = None + self.events = [] def reload(self): pod = read_pod(self.resultfile) @@ -36,11 +61,16 @@ class Output(object): def write_result(self): write_pod(self.result.to_pod(), self.resultfile) + def get_path(self, subpath): + return os.path.join(self.basepath, subpath.strip(os.sep)) + def add_metric(self, name, value, units=None, lower_is_better=False, classifiers=None): self.result.add_metric(name, value, units, lower_is_better, classifiers) def add_artifact(self, name, path, kind, description=None, classifiers=None): + if not os.path.exists(path): + path = self.get_path(path) if not os.path.exists(path): msg = 'Attempting to add non-existing artifact: {}' raise HostError(msg.format(path)) @@ -51,9 +81,14 @@ class Output(object): self.result.add_artifact(name, path, kind, description, classifiers) + def add_event(self, message): + self.result.add_event(message) + class RunOutput(Output): + kind = 'run' + @property def logfile(self): return os.path.join(self.basepath, 'run.log') @@ -96,7 +131,7 @@ class RunOutput(Output): self.info = None self.state = None self.result = None - self.jobs = None + self.jobs = [] if (not os.path.isfile(self.statefile) or not os.path.isfile(self.infofile)): msg = '"{}" does not exist or is not a valid WA output directory.' @@ -126,7 +161,7 @@ class RunOutput(Output): def write_target_info(self, ti): write_pod(ti.to_pod(), self.targetfile) - def read_config(self): + def read_target_config(self): if not os.path.isfile(self.targetfile): return None return TargetInfo.from_pod(read_pod(self.targetfile)) @@ -155,8 +190,10 @@ class RunOutput(Output): class JobOutput(Output): + kind = 'job' + def __init__(self, path, id, label, iteration, retry): - self.basepath = path + super(JobOutput, self).__init__(path) self.id = id self.label = label self.iteration = iteration @@ -170,13 +207,17 @@ class Result(object): @staticmethod def from_pod(pod): instance = Result() + instance.status = Status(pod['status']) instance.metrics = [Metric.from_pod(m) for m in pod['metrics']] instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']] + instance.events = [Event.from_pod(e) for e in pod['events']] return instance def __init__(self): + self.status = Status.NEW self.metrics = [] self.artifacts = [] + self.events = [] def add_metric(self, name, value, units=None, lower_is_better=False, classifiers=None): @@ -190,10 +231,15 @@ class Result(object): logger.debug('Adding artifact: {}'.format(artifact)) self.artifacts.append(artifact) + def add_event(self, message): + self.events.append(Event(message)) + def to_pod(self): return dict( + status=str(self.status), metrics=[m.to_pod() for m in self.metrics], artifacts=[a.to_pod() for a in self.artifacts], + events=[e.to_pod() for e in self.events], ) @@ -349,6 +395,43 @@ class Metric(object): return '<{}>'.format(text) +class Event(object): + """ + An event that occured during a run. + + """ + + __slots__ = ['timestamp', 'message'] + + @staticmethod + def from_pod(pod): + instance = Event(pod['message']) + instance.timestamp = pod['timestamp'] + return instance + + @property + def summary(self): + lines = self.message.split('\n') + result = lines[0] + if len(lines) > 1: + result += '[...]' + return result + + def __init__(self, message): + self.timestamp = datetime.utcnow() + self.message = message + + def to_pod(self): + return dict( + timestamp=self.timestamp, + message=self.message, + ) + + def __str__(self): + return '[{}] {}'.format(self.timestamp, self.message) + + __repr__ = __str__ + def init_run_output(path, wa_state, force=False): if os.path.exists(path): @@ -382,7 +465,10 @@ def init_job_output(run_output, job): path = os.path.join(run_output.basepath, output_name) ensure_directory_exists(path) write_pod(Result().to_pod(), os.path.join(path, 'result.json')) - return JobOutput(path, job.id, job.iteration, job.label, job.retries) + job_output = JobOutput(path, job.id, job.iteration, job.label, job.retries) + job_output.status = job.status + run_output.jobs.append(job_output) + return job_output def _save_raw_config(meta_dir, state): diff --git a/wa/framework/processor.py b/wa/framework/processor.py new file mode 100644 index 00000000..aca240ae --- /dev/null +++ b/wa/framework/processor.py @@ -0,0 +1,87 @@ +import logging + +from wa.framework import pluginloader +from wa.framework.exception import ConfigError +from wa.framework.instrumentation import is_installed +from wa.framework.plugin import Plugin +from wa.utils.log import log_error, indent, dedent + + +class ResultProcessor(Plugin): + + kind = 'result_processor' + requires = [] + + def validate(self): + super(ResultProcessor, self).validate() + for instrument in self.requires: + if not is_installed(instrument): + msg = 'Instrument "{}" is required by {}, but is not installed.' + raise ConfigError(msg.format(instrument, self.name)) + + def initialize(self): + pass + + def finalize(self): + pass + + +class ProcessorManager(object): + + def __init__(self, loader=pluginloader): + self.loader = loader + self.logger = logging.getLogger('processor') + self.processors = [] + + def install(self, processor): + if not isinstance(processor, ResultProcessor): + processor = self.loader.get_result_processor(processor) + self.logger.debug('Installing {}'.format(processor.name)) + self.processors.append(processor) + + def validate(self): + for proc in self.processors: + proc.validate() + + def initialize(self): + for proc in self.processors: + proc.initialize() + + def finalize(self): + for proc in self.processors: + proc.finalize() + + def process_job_output(self, context): + self.do_for_each_proc('process_job_output', 'processing using "{}"', + context.job_output, context.target_info, + context.run_output) + + def export_job_output(self, context): + self.do_for_each_proc('export_job_output', 'Exporting using "{}"', + context.job_output, context.target_info, + context.run_output) + + def process_run_output(self, context): + self.do_for_each_proc('process_run_output', 'Processing using "{}"', + context.run_output, context.target_info) + + def export_run_output(self, context): + self.do_for_each_proc('export_run_output', 'Exporting using "{}"', + context.run_output, context.target_info) + + def do_for_each_proc(self, method_name, message, *args): + try: + indent() + for proc in self.processors: + proc_func = getattr(proc, method_name, None) + if proc_func is None: + continue + try: + self.logger.info(message.format(proc.name)) + proc_func(*args) + except Exception as e: + if isinstance(e, KeyboardInterrupt): + raise + log_error(e, self.logger) + finally: + dedent() diff --git a/wa/framework/resource.py b/wa/framework/resource.py index 6ef1992c..1167b14c 100644 --- a/wa/framework/resource.py +++ b/wa/framework/resource.py @@ -12,52 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os -import sys import glob -import shutil import inspect import logging +import os +import re +import shutil +import sys from collections import defaultdict +from devlib.utils.android import ApkInfo + from wa.framework import pluginloader from wa.framework.plugin import Plugin, Parameter from wa.framework.exception import ResourceError from wa.framework.configuration import settings -from wa.utils.misc import ensure_directory_exists as _d -from wa.utils.types import boolean -from wa.utils.types import prioritylist +from wa.utils import log +from wa.utils.misc import ensure_directory_exists as _d, get_object_name +from wa.utils.types import boolean, prioritylist, enum -class GetterPriority(object): - """ - Enumerates standard ResourceGetter priorities. In general, getters should - register under one of these, rather than specifying other priority values. - - :cached: The cached version of the resource. Look here first. This - priority also implies - that the resource at this location is a "cache" and is not - the only version of the resource, so it may be cleared without - losing access to the resource. - :preferred: Take this resource in favour of the environment resource. - :environment: Found somewhere under ~/.workload_automation/ or equivalent, - or from environment variables, external configuration - files, etc. These will override resource supplied with - the package. - :external_package: Resource provided by another package. :package: - Resource provided with the package. :remote: - Resource will be downloaded from a remote location - (such as an HTTP server or a samba share). Try this - only if no other getter was successful. - - """ - cached = 20 - preferred = 10 - environment = 0 - external_package = -5 - package = -10 - remote = -20 +SourcePriority = enum(['package', 'remote', 'lan', 'external_package', 'local', + 'perferred'], start=0, step=10) class __NullOwner(object): @@ -77,6 +54,7 @@ class __NullOwner(object): NO_ONE = __NullOwner() + class Resource(object): """ Represents a resource that needs to be resolved. This can be pretty much @@ -88,96 +66,106 @@ class Resource(object): """ - name = None + kind = None - def __init__(self, owner): + def __init__(self, owner=NO_ONE): self.owner = owner - def delete(self, instance): - """ - Delete an instance of this resource type. This must be implemented - by the concrete subclasses based on what the resource looks like, - e.g. deleting a file or a directory tree, or removing an entry from - a database. - - :note: Implementation should *not* contain any logic for deciding - whether or not a resource should be deleted, only the actual - deletion. The assumption is that if this method is invoked, - then the decision has already been made. - - """ + def match(self, path): raise NotImplementedError() def __str__(self): return '<{}\'s {}>'.format(self.owner, self.name) -class FileResource(Resource): - """ - Base class for all resources that are a regular file in the - file system. +class File(Resource): - """ - - def delete(self, instance): - os.remove(instance) - - -class File(FileResource): - - name = 'file' - - def __init__(self, owner, path, url=None): - super(File, self).__init__(owner) - self.path = path - self.url = url - - def __str__(self): - return '<{}\'s {} {}>'.format(self.owner, self.name, self.path or self.url) - - -class PluginAsset(File): - - name = 'plugin_asset' + kind = 'file' def __init__(self, owner, path): - super(PluginAsset, self).__init__(owner, os.path.join(owner.name, path)) + super(File, self).__init__(owner) + self.path = path - -class Executable(FileResource): - - name = 'executable' - - def __init__(self, owner, platform, filename): - super(Executable, self).__init__(owner) - self.platform = platform - self.filename = filename + def match(self, path): + return self.path == path def __str__(self): - return '<{}\'s {} {}>'.format(self.owner, self.platform, self.filename) + return '<{}\'s {} {}>'.format(self.owner, self.kind, self.path) -class ReventFile(FileResource): - name = 'revent' +class Executable(Resource): - def __init__(self, owner, stage): + kind = 'executable' + + def __init__(self, owner, abi, filename): + super(Executable, self).__init__(owner) + self.abi = abi + self.filename = filename + + def match(self, path): + return self.filename == os.path.basename(path) + + def __str__(self): + return '<{}\'s {} {}>'.format(self.owner, self.abi, self.filename) + + +class ReventFile(Resource): + + kind = 'revent' + + def __init__(self, owner, stage, target): super(ReventFile, self).__init__(owner) self.stage = stage + self.target = target + + def match(self, path): + filename = os.path.basename(path) + parts = filename.split('.') + if len(parts) > 2: + target, stage = parts[:2] + return target == self.target and stage == self.stage + else: + stage = parts[0] + return stage == self.stage -class JarFile(FileResource): +class JarFile(Resource): - name = 'jar' + kind = 'jar' + + def match(self, path): + # An owner always has at most one jar file, so + # always match + return True -class ApkFile(FileResource): +class ApkFile(Resource): - name = 'apk' + kind = 'apk' - def __init__(self, owner, version): + def __init__(self, owner, variant=None, version=None): super(ApkFile, self).__init__(owner) + self.variant = variant self.version = version + def match(self, path): + name_matches = True + version_matches = True + if self.version is not None: + version_matches = apk_version_matches(path, self.version) + if self.variant is not None: + name_matches = file_name_matches(path, self.variant) + return name_matches and version_matches: + + def __str__(self): + text = '<{}\'s apk'.format(self.owner) + if self.variant: + text += ' {}'.format(self.variant) + if self.version: + text += ' {}'.format(self.version) + text += '>' + return text + class ResourceGetter(Plugin): """ @@ -192,10 +180,6 @@ class ResourceGetter(Plugin): :name: Name that uniquely identifies this getter. Must be set by any concrete subclass. - :resource_type: Identifies resource type(s) that this getter can - handle. This must be either a string (for a single type) - or a list of strings for multiple resource types. This - must be set by any concrete subclass. :priority: Priority with which this getter will be invoked. This should be one of the standard priorities specified in ``GetterPriority`` enumeration. If not set, this will default @@ -205,74 +189,12 @@ class ResourceGetter(Plugin): name = None kind = 'resource_getter' - resource_type = None - priority = GetterPriority.environment - def __init__(self, resolver, **kwargs): - super(ResourceGetter, self).__init__(**kwargs) - self.resolver = resolver - - def register(self): - """ - Registers with a resource resolver. Concrete implementations must - override this to invoke ``self.resolver.register()`` method to register - ``self`` for specific resource types. - - """ - if self.resource_type is None: - message = 'No resource type specified for {}' - raise ValueError(message.format(self.name)) - elif isinstance(self.resource_type, list): - for rt in self.resource_type: - self.resolver.register(self, rt, self.priority) - else: - self.resolver.register(self, self.resource_type, self.priority) - - def unregister(self): - """Unregister from a resource resolver.""" - if self.resource_type is None: - message = 'No resource type specified for {}' - raise ValueError(message.format(self.name)) - elif isinstance(self.resource_type, list): - for rt in self.resource_type: - self.resolver.unregister(self, rt) - else: - self.resolver.unregister(self, self.resource_type) - - def get(self, resource, **kwargs): - """ - This will get invoked by the resolver when attempting to resolve a - resource, passing in the resource to be resolved as the first - parameter. Any additional parameters would be specific to a particular - resource type. - - This method will only be invoked for resource types that the getter has - registered for. - - :param resource: an instance of :class:`wlauto.core.resource.Resource`. - - :returns: Implementations of this method must return either the - discovered resource or ``None`` if the resource could not - be discovered. - - """ + def register(self, resolver): raise NotImplementedError() - def delete(self, resource, *args, **kwargs): - """ - Delete the resource if it is discovered. All arguments are passed to a - call to``self.get()``. If that call returns a resource, it is deleted. - - :returns: ``True`` if the specified resource has been discovered - and deleted, and ``False`` otherwise. - - """ - discovered = self.get(resource, *args, **kwargs) - if discovered: - resource.delete(discovered) - return True - else: - return False + def initialize(self): + pass def __str__(self): return ''.format(self.name) @@ -285,23 +207,31 @@ class ResourceResolver(object): """ - def __init__(self, config): + def __init__(self, loader=pluginloader): + self.loader = loader self.logger = logging.getLogger('resolver') - self.getters = defaultdict(prioritylist) - self.config = config + self.getters = [] + self.sources = prioritylist() def load(self): - """ - Discover getters under the specified source. The source could - be either a python package/module or a path. + for gettercls in self.loader.list_plugins('resource_getter'): + self.logger.debug('Loading getter {}'.format(gettercls.name)) + getter = self.loader.get_plugin(name=gettercls.name, + kind="resource_getter") + log.indent() + try: + getter.initialize() + getter.register(self) + finally: + log.dedent() + self.getters.append(getter) - """ + def register(self, source, priority=SourcePriority.local): + msg = 'Registering "{}" with priority "{}"' + self.logger.debug(msg.format(get_object_name(source), priority)) + self.sources.add(source, priority) - for rescls in pluginloader.list_resource_getters(): - getter = self.config.get_plugin(name=rescls.name, kind="resource_getter", resolver=self) - getter.register() - - def get(self, resource, strict=True, *args, **kwargs): + def get(self, resource, strict=True): """ Uses registered getters to attempt to discover a resource of the specified kind and matching the specified criteria. Returns path to the resource that @@ -311,11 +241,13 @@ class ResourceResolver(object): """ self.logger.debug('Resolving {}'.format(resource)) - for getter in self.getters[resource.name]: - self.logger.debug('Trying {}'.format(getter)) - result = getter.get(resource, *args, **kwargs) + for source in self.sources: + source_name = get_object_name(source) + self.logger.debug('Trying {}'.format(source_name)) + result = source(resource) if result is not None: - self.logger.debug('Resource {} found using {}:'.format(resource, getter)) + msg = 'Resource {} found using {}:' + self.logger.debug(msg.format(resource, source_name)) self.logger.debug('\t{}'.format(result)) return result if strict: @@ -323,61 +255,19 @@ class ResourceResolver(object): self.logger.debug('Resource {} not found.'.format(resource)) return None - def register(self, getter, kind, priority=0): - """ - Register the specified resource getter as being able to discover a resource - of the specified kind with the specified priority. - This method would typically be invoked by a getter inside its __init__. - The idea being that getters register themselves for resources they know - they can discover. - - *priorities* - - getters that are registered with the highest priority will be invoked first. If - multiple getters are registered under the same priority, they will be invoked - in the order they were registered (i.e. in the order they were discovered). This is - essentially non-deterministic. - - Generally getters that are more likely to find a resource, or would find a - "better" version of the resource should register with higher (positive) priorities. - Fall-back getters that should only be invoked if a resource is not found by usual - means should register with lower (negative) priorities. - - """ - self.logger.debug('Registering {} for {} resources'.format(getter.name, kind)) - self.getters[kind].add(getter, priority) - - def unregister(self, getter, kind): - """ - Unregister a getter that has been registered earlier. - - """ - self.logger.debug('Unregistering {}'.format(getter.name)) - try: - self.getters[kind].remove(getter) - except ValueError: - raise ValueError('Resource getter {} is not installed.'.format(getter.name)) - -# Utility functions - -def get_from_location_by_extension(resource, location, extension, version=None): - found_files = glob.glob(os.path.join(location, '*.{}'.format(extension))) - if version: - found_files = [ff for ff in found_files - if version.lower() in os.path.basename(ff).lower()] - if len(found_files) == 1: - return found_files[0] - elif not found_files: - return None - else: - raise ResourceError('More than one .{} found in {} for {}.'.format(extension, - location, - resource.owner.name)) +def apk_version_matches(path, version): + info = ApkInfo(path) + if info.version_name == version or info.version_code == version: + return True + return False -def _get_owner_path(resource): - if resource.owner is NO_ONE: - return os.path.join(os.path.dirname(__base_filepath), 'common') - else: - return os.path.dirname(sys.modules[resource.owner.__module__].__file__) +def file_name_matches(path, pattern): + filename = os.path.basename(path) + if pattern in filename: + return True + if re.search(pattern, filename): + return True + return False + diff --git a/wa/framework/resource_getters.py b/wa/framework/resource_getters.py deleted file mode 100644 index 2b49863d..00000000 --- a/wa/framework/resource_getters.py +++ /dev/null @@ -1,510 +0,0 @@ -# Copyright 2013-2015 ARM Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -""" -This module contains the standard set of resource getters used by Workload Automation. - -""" -import os -import sys -import shutil -import inspect -import httplib -import logging -import json - -import requests - -from wa import Parameter, settings, __file__ as __base_filepath -from wa.framework.resource import ResourceGetter, GetterPriority, NO_ONE -from wa.framework.exception import ResourceError -from wa.utils.misc import (ensure_directory_exists as _d, - ensure_file_directory_exists as _f, sha256, urljoin) -from wa.utils.types import boolean - - -logging.getLogger("requests").setLevel(logging.WARNING) -logging.getLogger("urllib3").setLevel(logging.WARNING) - - -class PackageFileGetter(ResourceGetter): - - name = 'package_file' - description = """ - Looks for exactly one file with the specified plugin in the owner's directory. If a version - is specified on invocation of get, it will filter the discovered file based on that version. - Versions are treated as case-insensitive. - """ - - plugin = None - - def register(self): - self.resolver.register(self, self.plugin, GetterPriority.package) - - def get(self, resource, **kwargs): - resource_dir = os.path.dirname(sys.modules[resource.owner.__module__].__file__) - version = kwargs.get('version') - return get_from_location_by_plugin(resource, resource_dir, self.plugin, version) - - -class EnvironmentFileGetter(ResourceGetter): - - name = 'environment_file' - description = """Looks for exactly one file with the specified plugin in the owner's directory. If a version - is specified on invocation of get, it will filter the discovered file based on that version. - Versions are treated as case-insensitive.""" - - plugin = None - - def register(self): - self.resolver.register(self, self.plugin, GetterPriority.environment) - - def get(self, resource, **kwargs): - resource_dir = resource.owner.dependencies_directory - - version = kwargs.get('version') - return get_from_location_by_plugin(resource, resource_dir, self.plugin, version) - - -class ReventGetter(ResourceGetter): - """Implements logic for identifying revent files.""" - - def get_base_location(self, resource): - raise NotImplementedError() - - def register(self): - self.resolver.register(self, 'revent', GetterPriority.package) - - def get(self, resource, **kwargs): - filename = '.'.join([resource.owner.device.model, resource.stage, 'revent']).lower() - location = _d(os.path.join(self.get_base_location(resource), 'revent_files')) - for candidate in os.listdir(location): - if candidate.lower() == filename.lower(): - return os.path.join(location, candidate) - - -class PackageApkGetter(PackageFileGetter): - name = 'package_apk' - plugin = 'apk' - - -class PackageJarGetter(PackageFileGetter): - name = 'package_jar' - plugin = 'jar' - - -class PackageReventGetter(ReventGetter): - - name = 'package_revent' - - def get_base_location(self, resource): - return get_owner_path(resource) - - -class EnvironmentApkGetter(EnvironmentFileGetter): - name = 'environment_apk' - plugin = 'apk' - - -class EnvironmentJarGetter(EnvironmentFileGetter): - name = 'environment_jar' - plugin = 'jar' - - -class EnvironmentReventGetter(ReventGetter): - - name = 'enviroment_revent' - - def get_base_location(self, resource): - return resource.owner.dependencies_directory - - -class ExecutableGetter(ResourceGetter): - - name = 'exe_getter' - resource_type = 'executable' - priority = GetterPriority.environment - - def get(self, resource, **kwargs): - if settings.assets_repository: - path = os.path.join(settings.assets_repository, resource.platform, resource.filename) - if os.path.isfile(path): - return path - - -class PackageExecutableGetter(ExecutableGetter): - - name = 'package_exe_getter' - priority = GetterPriority.package - - def get(self, resource, **kwargs): - path = os.path.join(get_owner_path(resource), 'bin', resource.platform, resource.filename) - if os.path.isfile(path): - return path - - -class EnvironmentExecutableGetter(ExecutableGetter): - - name = 'env_exe_getter' - - def get(self, resource, **kwargs): - paths = [ - os.path.join(resource.owner.dependencies_directory, 'bin', - resource.platform, resource.filename), - os.path.join(settings.user_directory, 'bin', - resource.platform, resource.filename), - ] - for path in paths: - if os.path.isfile(path): - return path - - -class DependencyFileGetter(ResourceGetter): - - name = 'filer' - description = """ - Gets resources from the specified mount point. Copies them the local dependencies - directory, and returns the path to the local copy. - - """ - resource_type = 'file' - relative_path = '' # May be overridden by subclasses. - - priority = GetterPriority.remote - - parameters = [ - Parameter('mount_point', default='/', global_alias='remote_assets_path', - description='Local mount point for the remote filer.'), - ] - - def __init__(self, resolver, **kwargs): - super(DependencyFileGetter, self).__init__(resolver, **kwargs) - - def get(self, resource, **kwargs): - force = kwargs.get('force') - remote_path = os.path.join(self.mount_point, self.relative_path, resource.path) - local_path = os.path.join(resource.owner.dependencies_directory, os.path.basename(resource.path)) - - if not os.path.isfile(local_path) or force: - if not os.path.isfile(remote_path): - return None - self.logger.debug('Copying {} to {}'.format(remote_path, local_path)) - shutil.copy(remote_path, local_path) - - return local_path - - -class PackageCommonDependencyGetter(ResourceGetter): - - name = 'packaged_common_dependency' - resource_type = 'file' - priority = GetterPriority.package - 1 # check after owner-specific locations - - def get(self, resource, **kwargs): - path = os.path.join(settings.package_directory, 'common', resource.path) - if os.path.exists(path): - return path - - -class EnvironmentCommonDependencyGetter(ResourceGetter): - - name = 'environment_common_dependency' - resource_type = 'file' - priority = GetterPriority.environment - 1 # check after owner-specific locations - - def get(self, resource, **kwargs): - path = os.path.join(settings.dependencies_directory, - os.path.basename(resource.path)) - if os.path.exists(path): - return path - - -class PackageDependencyGetter(ResourceGetter): - - name = 'packaged_dependency' - resource_type = 'file' - priority = GetterPriority.package - - def get(self, resource, **kwargs): - owner_path = inspect.getfile(resource.owner.__class__) - path = os.path.join(os.path.dirname(owner_path), resource.path) - if os.path.exists(path): - return path - - -class EnvironmentDependencyGetter(ResourceGetter): - - name = 'environment_dependency' - resource_type = 'file' - priority = GetterPriority.environment - - def get(self, resource, **kwargs): - path = os.path.join(resource.owner.dependencies_directory, os.path.basename(resource.path)) - if os.path.exists(path): - return path - - -class PluginAssetGetter(DependencyFileGetter): - - name = 'plugin_asset' - resource_type = 'plugin_asset' - - -class HttpGetter(ResourceGetter): - - name = 'http_assets' - description = """ - Downloads resources from a server based on an index fetched from the specified URL. - - Given a URL, this will try to fetch ``/index.json``. The index file maps plugin - names to a list of corresponing asset descriptons. Each asset description continas a path - (relative to the base URL) of the resource and a SHA256 hash, so that this Getter can - verify whether the resource on the remote has changed. - - For example, let's assume we want to get the APK file for workload "foo", and that - assets are hosted at ``http://example.com/assets``. This Getter will first try to - donwload ``http://example.com/assests/index.json``. The index file may contian - something like :: - - { - "foo": [ - { - "path": "foo-app.apk", - "sha256": "b14530bb47e04ed655ac5e80e69beaa61c2020450e18638f54384332dffebe86" - }, - { - "path": "subdir/some-other-asset.file", - "sha256": "48d9050e9802246d820625717b72f1c2ba431904b8484ca39befd68d1dbedfff" - } - ] - } - - This Getter will look through the list of assets for "foo" (in this case, two) check - the paths until it finds one matching the resource (in this case, "foo-app.apk"). - Finally, it will try to dowload that file relative to the base URL and plugin name - (in this case, "http://example.com/assets/foo/foo-app.apk"). The downloaded version - will be cached locally, so that in the future, the getter will check the SHA256 hash - of the local file against the one advertised inside index.json, and provided that hasn't - changed, it won't try to download the file again. - - """ - priority = GetterPriority.remote - resource_type = ['apk', 'file', 'jar', 'revent'] - - parameters = [ - Parameter('url', global_alias='remote_assets_url', - description="""URL of the index file for assets on an HTTP server."""), - Parameter('username', - description="""User name for authenticating with assets URL"""), - Parameter('password', - description="""Password for authenticationg with assets URL"""), - Parameter('always_fetch', kind=boolean, default=False, global_alias='always_fetch_remote_assets', - description="""If ``True``, will always attempt to fetch assets from the remote, even if - a local cached copy is available."""), - Parameter('chunk_size', kind=int, default=1024, - description="""Chunk size for streaming large assets."""), - ] - - def __init__(self, resolver, **kwargs): - super(HttpGetter, self).__init__(resolver, **kwargs) - self.index = None - - def get(self, resource, **kwargs): - if not resource.owner: - return # TODO: add support for unowned resources - if not self.index: - self.index = self.fetch_index() - asset = self.resolve_resource(resource) - if not asset: - return - return self.download_asset(asset, resource.owner.name) - - def fetch_index(self): - if not self.url: - return {} - index_url = urljoin(self.url, 'index.json') - response = self.geturl(index_url) - if response.status_code != httplib.OK: - message = 'Could not fetch "{}"; recieved "{} {}"' - self.logger.error(message.format(index_url, response.status_code, response.reason)) - return {} - return json.loads(response.content) - - def download_asset(self, asset, owner_name): - url = urljoin(self.url, owner_name, asset['path']) - local_path = _f(os.path.join(settings.dependencies_directory, '__remote', - owner_name, asset['path'].replace('/', os.sep))) - if os.path.isfile(local_path) and not self.always_fetch: - local_sha = sha256(local_path) - if local_sha == asset['sha256']: - self.logger.debug('Local SHA256 matches; not re-downloading') - return local_path - self.logger.debug('Downloading {}'.format(url)) - response = self.geturl(url, stream=True) - if response.status_code != httplib.OK: - message = 'Could not download asset "{}"; recieved "{} {}"' - self.logger.warning(message.format(url, response.status_code, response.reason)) - return - with open(local_path, 'wb') as wfh: - for chunk in response.iter_content(chunk_size=self.chunk_size): - wfh.write(chunk) - return local_path - - def geturl(self, url, stream=False): - if self.username: - auth = (self.username, self.password) - else: - auth = None - return requests.get(url, auth=auth, stream=stream) - - def resolve_resource(self, resource): - assets = self.index.get(resource.owner.name, {}) - if not assets: - return {} - if resource.name in ['apk', 'jar']: - paths = [a['path'] for a in assets] - version = getattr(resource, 'version', None) - found = get_from_list_by_plugin(resource, paths, resource.name, version) - if found: - for a in assets: - if a['path'] == found: - return a - elif resource.name == 'revent': - filename = '.'.join([resource.owner.device.name, resource.stage, 'revent']).lower() - for asset in assets: - pathname = os.path.basename(asset['path']).lower() - if pathname == filename: - return asset - else: # file - for asset in assets: - if asset['path'].lower() == resource.path.lower(): - return asset - - -class RemoteFilerGetter(ResourceGetter): - - name = 'filer_assets' - description = """ - Finds resources on a (locally mounted) remote filer and caches them locally. - - This assumes that the filer is mounted on the local machine (e.g. as a samba share). - - """ - priority = GetterPriority.remote - resource_type = ['apk', 'file', 'jar', 'revent'] - - parameters = [ - Parameter('remote_path', global_alias='remote_assets_path', default='', - description="""Path, on the local system, where the assets are located."""), - Parameter('always_fetch', kind=boolean, default=False, global_alias='always_fetch_remote_assets', - description="""If ``True``, will always attempt to fetch assets from the remote, even if - a local cached copy is available."""), - ] - - def get(self, resource, **kwargs): - version = kwargs.get('version') - if resource.owner: - remote_path = os.path.join(self.remote_path, resource.owner.name) - local_path = os.path.join(settings.user_directory, '__filer', resource.owner.dependencies_directory) - return self.try_get_resource(resource, version, remote_path, local_path) - else: - result = None - for entry in os.listdir(remote_path): - remote_path = os.path.join(self.remote_path, entry) - local_path = os.path.join(settings.user_directory, '__filer', settings.dependencies_directory, entry) - result = self.try_get_resource(resource, version, remote_path, local_path) - if result: - break - return result - - def try_get_resource(self, resource, version, remote_path, local_path): - if not self.always_fetch: - result = self.get_from(resource, version, local_path) - if result: - return result - if remote_path: - # Didn't find it cached locally; now check the remoted - result = self.get_from(resource, version, remote_path) - if not result: - return result - else: # remote path is not set - return None - # Found it remotely, cache locally, then return it - local_full_path = os.path.join(_d(local_path), os.path.basename(result)) - self.logger.debug('cp {} {}'.format(result, local_full_path)) - shutil.copy(result, local_full_path) - return local_full_path - - def get_from(self, resource, version, location): # pylint: disable=no-self-use - if resource.name in ['apk', 'jar']: - return get_from_location_by_plugin(resource, location, resource.name, version) - elif resource.name == 'file': - filepath = os.path.join(location, resource.path) - if os.path.exists(filepath): - return filepath - elif resource.name == 'revent': - filename = '.'.join([resource.owner.device.model, resource.stage, 'revent']).lower() - alternate_location = os.path.join(location, 'revent_files') - # There tends to be some confusion as to where revent files should - # be placed. This looks both in the plugin's directory, and in - # 'revent_files' subdirectory under it, if it exists. - if os.path.isdir(alternate_location): - for candidate in os.listdir(alternate_location): - if candidate.lower() == filename.lower(): - return os.path.join(alternate_location, candidate) - if os.path.isdir(location): - for candidate in os.listdir(location): - if candidate.lower() == filename.lower(): - return os.path.join(location, candidate) - else: - raise ValueError('Unexpected resource type: {}'.format(resource.name)) - - -# Utility functions - -def get_from_location_by_plugin(resource, location, plugin, version=None): - try: - found_files = [os.path.join(location, f) for f in os.listdir(location)] - except OSError: - return None - try: - return get_from_list_by_plugin(resource, found_files, plugin, version) - except ResourceError: - raise ResourceError('More than one .{} found in {} for {}.'.format(plugin, - location, - resource.owner.name)) - - -def get_from_list_by_plugin(resource, filelist, plugin, version=None): - filelist = [ff for ff in filelist - if os.path.splitext(ff)[1].lower().endswith(plugin)] - if version: - filelist = [ff for ff in filelist if version.lower() in os.path.basename(ff).lower()] - if len(filelist) == 1: - return filelist[0] - elif not filelist: - return None - else: - raise ResourceError('More than one .{} found in {} for {}.'.format(plugin, - filelist, - resource.owner.name)) - - -def get_owner_path(resource): - if resource.owner is NO_ONE: - return os.path.join(os.path.dirname(__base_filepath), 'common') - else: - return os.path.dirname(sys.modules[resource.owner.__module__].__file__) diff --git a/wa/framework/run.py b/wa/framework/run.py index 50c35be5..815ae4c9 100644 --- a/wa/framework/run.py +++ b/wa/framework/run.py @@ -18,7 +18,7 @@ from collections import OrderedDict, Counter from copy import copy from datetime import datetime, timedelta -from wa.framework.configuration.core import RunStatus, JobStatus +from wa.framework.configuration.core import Status class RunInfo(object): @@ -67,7 +67,7 @@ class RunState(object): @staticmethod def from_pod(pod): instance = RunState() - instance.status = RunStatus(pod['status']) + instance.status = Status(pod['status']) instance.timestamp = pod['timestamp'] jss = [JobState.from_pod(j) for j in pod['jobs']] instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss) @@ -76,12 +76,12 @@ class RunState(object): @property def num_completed_jobs(self): return sum(1 for js in self.jobs.itervalues() - if js.status > JobStatus.SKIPPED) + if js.status > Status.SKIPPED) def __init__(self): self.jobs = OrderedDict() - self.status = RunStatus.NEW - self.timestamp = datetime.now() + self.status = Status.NEW + self.timestamp = datetime.utcnow() def add_job(self, job): job_state = JobState(job.id, job.label, job.iteration, job.status) @@ -90,7 +90,7 @@ class RunState(object): def update_job(self, job): state = self.jobs[(job.id, job.iteration)] state.status = job.status - state.timestamp = datetime.now() + state.timestamp = datetime.utcnow() def get_status_counts(self): counter = Counter() @@ -110,7 +110,7 @@ class JobState(object): @staticmethod def from_pod(pod): - instance = JobState(pod['id'], pod['label'], JobStatus(pod['status'])) + instance = JobState(pod['id'], pod['label'], Status(pod['status'])) instance.retries = pod['retries'] instance.iteration = pod['iteration'] instance.timestamp = pod['timestamp'] @@ -126,7 +126,7 @@ class JobState(object): self.iteration = iteration self.status = status self.retries = 0 - self.timestamp = datetime.now() + self.timestamp = datetime.utcnow() def to_pod(self): return OrderedDict( diff --git a/wa/framework/target/manager.py b/wa/framework/target/manager.py index 545178e6..043dbcb4 100644 --- a/wa/framework/target/manager.py +++ b/wa/framework/target/manager.py @@ -22,7 +22,7 @@ from wa.utils.serializer import json from devlib import LocalLinuxTarget, LinuxTarget, AndroidTarget from devlib.utils.types import identifier -# from wa.target.manager import AndroidTargetManager, LinuxTargetManager +from devlib.utils.misc import memoized class TargetManager(object): @@ -52,6 +52,7 @@ class TargetManager(object): ] def __init__(self, name, parameters): + self.logger = logging.getLogger('tm') self.target_name = name self.target = None self.assistant = None @@ -82,6 +83,7 @@ class TargetManager(object): if any(parameter in name for parameter in cfg.supported_parameters): cfg.add(name, self.parameters.pop(name)) + @memoized def get_target_info(self): return TargetInfo(self.target) @@ -108,6 +110,7 @@ class TargetManager(object): self.target = instantiate_target(tdesc, self.parameters, connect=False) with signal.wrap('TARGET_CONNECT'): self.target.connect() + self.logger.info('Setting up target') self.target.setup() def _init_assistant(self): diff --git a/wa/processors/__init__.py b/wa/processors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wa/processors/csvproc.py b/wa/processors/csvproc.py new file mode 100644 index 00000000..8e9f68ea --- /dev/null +++ b/wa/processors/csvproc.py @@ -0,0 +1,87 @@ +import csv + +from wa import ResultProcessor, Parameter +from wa.framework.exception import ConfigError +from wa.utils.types import list_of_strings + + +class CsvReportProcessor(ResultProcessor): + + name = 'csv' + description = """ + Creates a ``results.csv`` in the output directory containing results for + all iterations in CSV format, each line containing a single metric. + + """ + + parameters = [ + Parameter('use_all_classifiers', kind=bool, default=False, + global_alias='use_all_classifiers', + description=""" + If set to ``True``, this will add a column for every classifier + that features in at least one collected metric. + + .. note:: This cannot be ``True`` if ``extra_columns`` is set. + + """), + Parameter('extra_columns', kind=list_of_strings, + description=""" + List of classifiers to use as columns. + + .. note:: This cannot be set if ``use_all_classifiers`` is + ``True``. + + """), + ] + + def validate(self): + super(CsvReportProcessor, self).validate() + if self.use_all_classifiers and self.extra_columns: + msg = 'extra_columns cannot be specified when '\ + 'use_all_classifiers is True' + raise ConfigError(msg) + + def initialize(self): + self.results_so_far = [] # pylint: disable=attribute-defined-outside-init + self.artifact_added = False + + def process_job_output(self, output, target_info, run_output): + self.results_so_far.append(output) + self._write_results(self.results_so_far, run_output) + if not self.artifact_added: + run_output.add_artifact('run_result_csv', 'results.csv', 'export') + self.artifact_added = True + + def process_run_result(self, output, target_info): + self.results_so_far.append(output.result) + self._write_results(self.rsults_so_far, output) + if not self.artifact_added: + output.add_artifact('run_result_csv', 'results.csv', 'export') + self.artifact_added = True + + def _write_results(self, results, output): + if self.use_all_classifiers: + classifiers = set([]) + for result in results: + for metric in result.metrics: + classifiers.update(metric.classifiers.keys()) + extra_columns = list(classifiers) + elif self.extra_columns: + extra_columns = self.extra_columns + else: + extra_columns = [] + + outfile = output.get_path('results.csv') + with open(outfile, 'wb') as wfh: + writer = csv.writer(wfh) + writer.writerow(['id', 'workload', 'iteration', 'metric', ] + + extra_columns + ['value', 'units']) + + for o in results: + header = [o.id, o.label, o.iteration] + for metric in o.result.metrics: + row = (header + [metric.name] + + [str(metric.classifiers.get(c, '')) + for c in extra_columns] + + [str(metric.value), metric.units or '']) + writer.writerow(row) diff --git a/wa/processors/status.py b/wa/processors/status.py new file mode 100644 index 00000000..9744519f --- /dev/null +++ b/wa/processors/status.py @@ -0,0 +1,59 @@ +# Copyright 2013-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# pylint: disable=R0201 +import os +import time +from collections import Counter + +from wa import ResultProcessor, Status +from wa.utils.misc import write_table + + +class StatusTxtReporter(ResultProcessor): + name = 'status' + description = """ + Outputs a txt file containing general status information about which runs + failed and which were successful + + """ + + def process_run_output(self, output, target_info): + counter = Counter() + for jo in output.jobs: + counter[jo.status] += 1 + + outfile = output.get_path('status.txt') + self.logger.info('Status available in {}'.format(outfile)) + with open(outfile, 'w') as wfh: + wfh.write('Run name: {}\n'.format(output.info.run_name)) + wfh.write('Run status: {}\n'.format(output.status)) + wfh.write('Date: {}\n'.format(time.strftime("%c"))) + if output.events: + wfh.write('Events:\n') + for event in output.events: + wfh.write('\t{}\n'.format(event.summary)) + + txt = '{}/{} iterations completed without error\n' + wfh.write(txt.format(counter[Status.OK], len(output.jobs))) + wfh.write('\n') + status_lines = [map(str, [o.id, o.label, o.iteration, o.status, + o.event_summary]) + for o in output.jobs] + write_table(status_lines, wfh, align='<<>><') + + output.add_artifact('run_status_summary', 'status.txt', 'export') + diff --git a/wa/utils/misc.py b/wa/utils/misc.py index bb3f647d..bb81a0d6 100644 --- a/wa/utils/misc.py +++ b/wa/utils/misc.py @@ -583,3 +583,18 @@ def merge_dicts_simple(base, other): def touch(path): with open(path, 'w'): pass + + +def get_object_name(obj): + if hasattr(obj, 'name'): + return obj.name + elif hasattr(obj, 'im_func'): + return '{}.{}'.format(get_object_name(obj.im_class), + obj.im_func.func_name) + elif hasattr(obj, 'func_name'): + return obj.func_name + elif hasattr(obj, '__name__'): + return obj.__name__ + elif hasattr(obj, '__class__'): + return obj.__class__.__name__ + return None diff --git a/wa/workloads/dhrystone/__init__.py b/wa/workloads/dhrystone/__init__.py index 60b66d78..106d0483 100644 --- a/wa/workloads/dhrystone/__init__.py +++ b/wa/workloads/dhrystone/__init__.py @@ -18,10 +18,7 @@ import os import re -from wa import Workload, Parameter, ConfigError - - -this_dir = os.path.dirname(__file__) +from wa import Workload, Parameter, ConfigError, Executable class Dhrystone(Workload): @@ -75,7 +72,8 @@ class Dhrystone(Workload): ] def initialize(self, context): - host_exe = os.path.join(this_dir, 'dhrystone') + resource = Executable(self, self.target.abi, 'dhrystone') + host_exe = context.resolver.get(resource) Dhrystone.target_exe = self.target.install(host_exe) def setup(self, context): diff --git a/wa/workloads/dhrystone/bin/arm64/dhrystone b/wa/workloads/dhrystone/bin/arm64/dhrystone new file mode 100755 index 00000000..d88dca21 Binary files /dev/null and b/wa/workloads/dhrystone/bin/arm64/dhrystone differ diff --git a/wa/workloads/dhrystone/bin/armeabi/dhrystone b/wa/workloads/dhrystone/bin/armeabi/dhrystone new file mode 100755 index 00000000..cf05362e Binary files /dev/null and b/wa/workloads/dhrystone/bin/armeabi/dhrystone differ diff --git a/wa/workloads/dhrystone/dhrystone b/wa/workloads/dhrystone/dhrystone deleted file mode 100755 index 68cd9b71..00000000 Binary files a/wa/workloads/dhrystone/dhrystone and /dev/null differ diff --git a/wa/workloads/dhrystone/src/build.sh b/wa/workloads/dhrystone/src/Makefile old mode 100755 new mode 100644 similarity index 80% rename from wa/workloads/dhrystone/src/build.sh rename to wa/workloads/dhrystone/src/Makefile index 61fcce5d..4a7ce8f5 --- a/wa/workloads/dhrystone/src/build.sh +++ b/wa/workloads/dhrystone/src/Makefile @@ -13,11 +13,5 @@ # limitations under the License. # - -ndk-build -if [[ -f libs/armeabi/dhrystone ]]; then - echo "Dhrystone binary updated." - cp libs/armeabi/dhrystone .. - rm -rf libs - rm -rf obj -fi +dhrystone: dhrystone.c + $(CROSS_COMPILE)gcc -O3 -static dhrystone.c -o dhrystone diff --git a/wa/workloads/dhrystone/src/jni/dhrystone.c b/wa/workloads/dhrystone/src/dhrystone.c similarity index 100% rename from wa/workloads/dhrystone/src/jni/dhrystone.c rename to wa/workloads/dhrystone/src/dhrystone.c diff --git a/wa/workloads/dhrystone/src/jni/Android.mk b/wa/workloads/dhrystone/src/jni/Android.mk deleted file mode 100644 index 2f974319..00000000 --- a/wa/workloads/dhrystone/src/jni/Android.mk +++ /dev/null @@ -1,11 +0,0 @@ -LOCAL_PATH:= $(call my-dir) - -include $(CLEAR_VARS) -LOCAL_SRC_FILES:= dhrystone.c -LOCAL_MODULE := dhrystone -LOCAL_MODULE_TAGS := optional -LOCAL_STATIC_LIBRARIES := libc -LOCAL_SHARED_LIBRARIES := liblog -LOCAL_LDLIBS := -llog -LOCAL_CFLAGS := -O2 -include $(BUILD_EXECUTABLE)