1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-18 20:11:20 +00:00

Merge pull request #371 from setrofim/next

Result processors, resource resolvers,  plus minor fixes
This commit is contained in:
setrofim 2017-03-22 15:54:50 +00:00 committed by GitHub
commit b6faab6abc
24 changed files with 918 additions and 817 deletions

View File

@ -1,6 +1,7 @@
from wa.framework import pluginloader, log, signal
from wa.framework.command import Command
from wa.framework.configuration import settings
from wa.framework.configuration.core import Status
from wa.framework.exception import HostError, JobError, InstrumentError, ConfigError
from wa.framework.exception import (ResultProcessorError, ResourceError,
CommandError, ToolError)
@ -10,4 +11,7 @@ from wa.framework.exception import WorkerThreadError, PluginLoaderError
from wa.framework.instrumentation import (Instrument, very_slow, slow, normal, fast,
very_fast)
from wa.framework.plugin import Plugin, Parameter
from wa.framework.processor import ResultProcessor
from wa.framework.resource import (NO_ONE, JarFile, ApkFile, ReventFile, File,
Executable)
from wa.framework.workload import Workload

View File

@ -32,11 +32,10 @@ KIND_MAP = {
dict: OrderedDict,
}
RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING',
'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
Status = enum(['UNKNOWN', 'NEW', 'PENDING',
'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING',
'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
JobStatus = enum(['NEW', 'PENDING', 'RUNNING',
'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK'])
##########################
@ -549,12 +548,11 @@ class MetaConfiguration(Configuration):
plugin_packages = [
'wa.commands',
'wa.workloads',
'wa.instrumentation',
#'wa.result_processors',
#'wa.managers',
'wa.framework.getters',
'wa.framework.target.descriptor',
'wa.framework.resource_getters',
'wa.instrumentation',
'wa.processors',
'wa.workloads',
]
config_points = [
@ -741,9 +739,9 @@ class RunConfiguration(Configuration):
),
ConfigurationPoint(
'retry_on_status',
kind=list_of(JobStatus),
kind=list_of(Status),
default=['FAILED', 'PARTIAL'],
allowed_values=JobStatus.values,
allowed_values=Status.values[Status.RUNNING.value:],
description='''
This is list of statuses on which a job will be cosidered to have
failed and will be automatically retried up to ``max_retries``
@ -774,6 +772,17 @@ class RunConfiguration(Configuration):
.. note:: this number does not include the original attempt
''',
),
ConfigurationPoint(
'result_processors',
kind=toggle_set,
default=['csv', 'status'],
description='''
The list of output processors to be used for this run. Output processors
post-process data generated by workloads and instruments, e.g. to
generate additional reports, format the output in a certain way, or
export the output to an exeternal location.
''',
),
]
configuration = {cp.name: cp for cp in config_points + meta_data}

View File

@ -4,7 +4,7 @@ from itertools import izip_longest, groupby, chain
from wa.framework import pluginloader
from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration,
JobGenerator, JobStatus, settings)
JobGenerator, Status, settings)
from wa.framework.configuration.parsers import ConfigParser
from wa.framework.configuration.plugin_cache import PluginCache
from wa.framework.exception import NotFoundError
@ -88,12 +88,23 @@ class ConfigManager(object):
for name in self.enabled_instruments:
try:
instruments.append(self.get_plugin(name, kind='instrument',
target=target))
target=target))
except NotFoundError:
msg = 'Instrument "{}" not found'
raise NotFoundError(msg.format(name))
return instruments
def get_processors(self):
processors = []
for name in self.run_config.result_processors:
try:
proc = self.plugin_cache.get_plugin(name, kind='result_processor')
except NotFoundError:
msg = 'Result processor "{}" not found'
raise NotFoundError(msg.format(name))
processors.append(proc)
return processors
def finalize(self):
if not self.agenda:
msg = 'Attempting to finalize config before agenda has been set'

View File

@ -95,6 +95,9 @@ class PluginCache(object):
def is_global_alias(self, name):
return name in self._list_of_global_aliases
def list_plugins(self, kind=None):
return self.loader.list_plugins(kind)
def get_plugin_config(self, plugin_name, generic_name=None):
config = obj_dict(not_in_dict=['name'])
config.name = plugin_name

View File

@ -28,12 +28,13 @@ from itertools import izip_longest
import wa.framework.signal as signal
from wa.framework import instrumentation, pluginloader
from wa.framework.configuration.core import settings, RunStatus, JobStatus
from wa.framework.configuration.core import settings, Status
from wa.framework.exception import (WAError, ConfigError, TimeoutError,
InstrumentError, TargetError,
TargetNotRespondingError)
from wa.framework.output import init_job_output
from wa.framework.plugin import Artifact
from wa.framework.processor import ProcessorManager
from wa.framework.resource import ResourceResolver
from wa.framework.run import RunState
from wa.framework.target.info import TargetInfo
@ -95,23 +96,36 @@ class ExecutionContext(object):
self.tm = tm
self.run_output = output
self.run_state = output.state
self.target_info = self.tm.get_target_info()
self.logger.debug('Loading resource discoverers')
self.resolver = ResourceResolver(cm)
self.resolver = ResourceResolver(cm.plugin_cache)
self.resolver.load()
self.job_queue = None
self.completed_jobs = None
self.current_job = None
self.successful_jobs = 0
self.failed_jobs = 0
def start_run(self):
self.output.info.start_time = datetime.now()
self.output.info.start_time = datetime.utcnow()
self.output.write_info()
self.job_queue = copy(self.cm.jobs)
self.completed_jobs = []
self.run_state.status = RunStatus.STARTED
self.run_state.status = Status.STARTED
self.output.status = Status.STARTED
self.output.write_state()
def end_run(self):
self.output.info.end_time = datetime.now()
if self.successful_jobs:
if self.failed_jobs:
status = Status.PARTIAL
else:
status = Status.OK
else:
status = Status.FAILED
self.run_state.status = status
self.output.status = status
self.output.info.end_time = datetime.utcnow()
self.output.info.duration = self.output.info.end_time -\
self.output.info.start_time
self.output.write_info()
@ -144,7 +158,7 @@ class ExecutionContext(object):
def skip_remaining_jobs(self):
while self.job_queue:
job = self.job_queue.pop(0)
job.status = JobStatus.SKIPPED
job.status = Status.SKIPPED
self.run_state.update_job(job)
self.completed_jobs.append(job)
self.write_state()
@ -166,6 +180,9 @@ class ExecutionContext(object):
classifiers=None):
self.run_output.add_artifact(name, path, kind, description, classifiers)
def add_event(self, message):
self.output.add_event(message)
class Executor(object):
"""
@ -228,8 +245,14 @@ class Executor(object):
instrumentation.install(instrument)
instrumentation.validate()
self.logger.info('Installing result processors')
pm = ProcessorManager()
for proc in config_manager.get_processors():
pm.install(proc)
pm.validate()
self.logger.info('Starting run')
runner = Runner(context)
runner = Runner(context, pm)
signal.send(signal.RUN_STARTED, self)
runner.run()
self.execute_postamble(context, output)
@ -244,7 +267,7 @@ class Executor(object):
counter = context.run_state.get_status_counts()
parts = []
for status in reversed(JobStatus.values):
for status in reversed(Status.values):
if status in counter:
parts.append('{} {}'.format(counter[status], status))
self.logger.info(status_summary + ', '.join(parts))
@ -272,9 +295,10 @@ class Runner(object):
"""
def __init__(self, context):
def __init__(self, context, pm):
self.logger = logging.getLogger('runner')
self.context = context
self.pm = pm
self.output = self.context.output
self.config = self.context.cm
@ -290,6 +314,7 @@ class Runner(object):
except KeyboardInterrupt:
self.context.skip_remaining_jobs()
except Exception as e:
self.context.add_event(e.message)
if (not getattr(e, 'logged', None) and
not isinstance(e, KeyboardInterrupt)):
log.log_error(e, self.logger)
@ -302,6 +327,7 @@ class Runner(object):
def initialize_run(self):
self.logger.info('Initializing run')
self.context.start_run()
self.pm.initialize()
log.indent()
for job in self.context.job_queue:
job.initialize(self.context)
@ -311,6 +337,9 @@ class Runner(object):
def finalize_run(self):
self.logger.info('Finalizing run')
self.context.end_run()
self.pm.process_run_output(self.context)
self.pm.export_run_output(self.context)
self.pm.finalize()
def run_next_job(self, context):
job = context.start_job()
@ -319,12 +348,13 @@ class Runner(object):
try:
log.indent()
self.do_run_job(job, context)
job.status = JobStatus.OK
job.status = Status.OK
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
job.status = Status.ABORTED
raise
except Exception as e:
job.status = JobStatus.FAILED
job.status = Status.FAILED
context.add_event(e.message)
if not getattr(e, 'logged', None):
log.log_error(e, self.logger)
e.logged = True
@ -337,7 +367,7 @@ class Runner(object):
self.check_job(job)
def do_run_job(self, job, context):
job.status = JobStatus.RUNNING
job.status = Status.RUNNING
self.send(signal.JOB_STARTED)
with signal.wrap('JOB_TARGET_CONFIG', self):
@ -353,15 +383,18 @@ class Runner(object):
try:
with signal.wrap('JOB_OUTPUT_PROCESSED', self):
job.process_output(context)
self.pm.process_job_output(context)
self.pm.export_job_output(context)
except Exception:
job.status = JobStatus.PARTIAL
job.status = Status.PARTIAL
raise
except KeyboardInterrupt:
job.status = JobStatus.ABORTED
job.status = Status.ABORTED
self.logger.info('Got CTRL-C. Aborting.')
raise
except Exception as e:
job.status = JobStatus.FAILED
job.status = Status.FAILED
if not getattr(e, 'logged', None):
log.log_error(e, self.logger)
e.logged = True
@ -380,15 +413,17 @@ class Runner(object):
self.logger.error(msg.format(job.id, job.status, job.iteration))
self.context.move_failed(job)
job.retries += 1
job.status = JobStatus.PENDING
job.status = Status.PENDING
self.context.job_queue.insert(0, job)
self.context.write_state()
else:
msg = 'Job {} iteration {} completed with status {}. '\
'Max retries exceeded.'
self.logger.error(msg.format(job.id, job.status, job.iteration))
self.context.failed_jobs += 1
else: # status not in retry_on_status
self.logger.info('Job completed with status {}'.format(job.status))
self.context.successful_jobs += 1
def send(self, s):
signal.send(s, self, self.context)

331
wa/framework/getters.py Normal file
View File

@ -0,0 +1,331 @@
# Copyright 2013-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the standard set of resource getters used by Workload Automation.
"""
import httplib
import inspect
import json
import logging
import os
import re
import shutil
import sys
import requests
from devlib.utils.android import ApkInfo
from wa import Parameter, settings, __file__ as __base_filepath
from wa.framework.resource import ResourceGetter, SourcePriority, NO_ONE
from wa.framework.exception import ResourceError
from wa.utils.misc import (ensure_directory_exists as _d,
ensure_file_directory_exists as _f, sha256, urljoin)
from wa.utils.types import boolean, caseless_string
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logger = logging.getLogger('resource')
def get_by_extension(path, ext):
if not ext.startswith('.'):
ext = '.' + ext
ext = caseless_string(ext)
found = []
for entry in os.listdir(path):
entry_ext = os.path.splitext(entry)
if entry_ext == ext:
found.append(os.path.join(path, entry))
return found
def get_generic_resource(resource, files):
matches = []
for f in files:
if resource.match(f):
matches.append(f)
if not matches:
return None
if len(matches) > 1:
msg = 'Multiple matches for {}: {}'
return ResourceError(msg.format(resource, matches))
return matches[0]
def get_from_location(basepath, resource):
if resource.kind == 'file':
path = os.path.join(basepath, resource.path)
if os.path.exists(path):
return path
elif resource.kind == 'executable':
path = os.path.join(basepath, 'bin', resource.abi, resource.filename)
if os.path.exists(path):
return path
elif resource.kind in ['apk', 'jar', 'revent']:
files = get_by_extension(basepath, resource.kind)
return get_generic_resource(resource, files)
return None
class Package(ResourceGetter):
name = 'package'
def register(self, resolver):
resolver.register(self.get, SourcePriority.package)
def get(self, resource):
if resource.owner == NO_ONE:
basepath = os.path.join(os.path.dirname(__base_filepath), 'assets')
else:
modname = resource.owner.__module__
basepath = os.path.dirname(sys.modules[modname].__file__)
return get_from_location(basepath, resource)
class UserDirectory(ResourceGetter):
name = 'user'
def register(self, resolver):
resolver.register(self.get, SourcePriority.local)
def get(self, resource):
basepath = settings.dependencies_directory
return get_from_location(basepath, resource)
class Http(ResourceGetter):
name = 'http'
description = """
Downloads resources from a server based on an index fetched from the
specified URL.
Given a URL, this will try to fetch ``<URL>/index.json``. The index file
maps extension names to a list of corresponing asset descriptons. Each
asset description continas a path (relative to the base URL) of the
resource and a SHA256 hash, so that this Getter can verify whether the
resource on the remote has changed.
For example, let's assume we want to get the APK file for workload "foo",
and that assets are hosted at ``http://example.com/assets``. This Getter
will first try to donwload ``http://example.com/assests/index.json``. The
index file may contian something like ::
{
"foo": [
{
"path": "foo-app.apk",
"sha256": "b14530bb47e04ed655ac5e80e69beaa61c2020450e18638f54384332dffebe86"
},
{
"path": "subdir/some-other-asset.file",
"sha256": "48d9050e9802246d820625717b72f1c2ba431904b8484ca39befd68d1dbedfff"
}
]
}
This Getter will look through the list of assets for "foo" (in this case,
two) check the paths until it finds one matching the resource (in this
case, "foo-app.apk"). Finally, it will try to dowload that file relative
to the base URL and extension name (in this case,
"http://example.com/assets/foo/foo-app.apk"). The downloaded version will
be cached locally, so that in the future, the getter will check the SHA256
hash of the local file against the one advertised inside index.json, and
provided that hasn't changed, it won't try to download the file again.
"""
parameters = [
Parameter('url', global_alias='remote_assets_url',
description="""
URL of the index file for assets on an HTTP server.
"""),
Parameter('username',
description="""
User name for authenticating with assets URL
"""),
Parameter('password',
description="""
Password for authenticationg with assets URL
"""),
Parameter('always_fetch', kind=boolean, default=False,
global_alias='always_fetch_remote_assets',
description="""
If ``True``, will always attempt to fetch assets from the
remote, even if a local cached copy is available.
"""),
Parameter('chunk_size', kind=int, default=1024,
description="""
Chunk size for streaming large assets.
"""),
]
def __init__(self, **kwargs):
super(Http, self).__init__(**kwargs)
self.logger = logger
self.index = None
def register(self, resolver):
resolver.register(self.get, SourcePriority.remote)
def get(self, resource):
if not resource.owner:
return # TODO: add support for unowned resources
if not self.index:
self.index = self.fetch_index()
asset = self.resolve_resource(resource)
if not asset:
return
return self.download_asset(asset, resource.owner.name)
def fetch_index(self):
if not self.url:
return {}
index_url = urljoin(self.url, 'index.json')
response = self.geturl(index_url)
if response.status_code != httplib.OK:
message = 'Could not fetch "{}"; recieved "{} {}"'
self.logger.error(message.format(index_url,
response.status_code,
response.reason))
return {}
return json.loads(response.content)
def download_asset(self, asset, owner_name):
url = urljoin(self.url, owner_name, asset['path'])
local_path = _f(os.path.join(settings.dependencies_directory, '__remote',
owner_name, asset['path'].replace('/', os.sep)))
if os.path.exists(local_path) and not self.always_fetch:
local_sha = sha256(local_path)
if local_sha == asset['sha256']:
self.logger.debug('Local SHA256 matches; not re-downloading')
return local_path
self.logger.debug('Downloading {}'.format(url))
response = self.geturl(url, stream=True)
if response.status_code != httplib.OK:
message = 'Could not download asset "{}"; recieved "{} {}"'
self.logger.warning(message.format(url,
response.status_code,
response.reason))
return
with open(local_path, 'wb') as wfh:
for chunk in response.iter_content(chunk_size=self.chunk_size):
wfh.write(chunk)
return local_path
def geturl(self, url, stream=False):
if self.username:
auth = (self.username, self.password)
else:
auth = None
return requests.get(url, auth=auth, stream=stream)
def resolve_resource(self, resource):
# pylint: disable=too-many-branches,too-many-locals
assets = self.index.get(resource.owner.name, {})
if not assets:
return {}
asset_map = {a['path']: a for a in assets}
if resource.kind in ['apk', 'jar', 'revent']:
if resource.kind == 'apk' and resource.version:
# TODO: modify the index format to attach version info to the
# APK entries.
msg = 'Versions of APKs cannot be fetched over HTTP at this time'
self.logger.warning(msg)
return {}
path = get_generic_resource(resource, asset_map.keys())
if path:
return asset_map[path]
elif resource.kind == 'executable':
path = '/'.join(['bin', resource.abi, resource.filename])
for asset in assets:
if asset['path'].lower() == path.lower():
return asset
else: # file
for asset in assets:
if asset['path'].lower() == resource.path.lower():
return asset
class Filer(ResourceGetter):
name = 'filer'
description = """
Finds resources on a (locally mounted) remote filer and caches them
locally.
This assumes that the filer is mounted on the local machine (e.g. as a
samba share).
"""
parameters = [
Parameter('remote_path', global_alias='remote_assets_path', default='',
description="""
Path, on the local system, where the assets are located.
"""),
Parameter('always_fetch', kind=boolean, default=False,
global_alias='always_fetch_remote_assets',
description="""
If ``True``, will always attempt to fetch assets from the
remote, even if a local cached copy is available.
"""),
]
def register(self, resolver):
resolver.register(self.get, SourcePriority.lan)
def get(self, resource):
if resource.owner:
remote_path = os.path.join(self.remote_path, resource.owner.name)
local_path = os.path.join(settings.dependencies_directory, '__filer',
resource.owner.dependencies_directory)
return self.try_get_resource(resource, remote_path, local_path)
else: # No owner
result = None
for entry in os.listdir(remote_path):
remote_path = os.path.join(self.remote_path, entry)
local_path = os.path.join(settings.dependencies_directory, '__filer',
settings.dependencies_directory, entry)
result = self.try_get_resource(resource, remote_path, local_path)
if result:
break
return result
def try_get_resource(self, resource, remote_path, local_path):
if not self.always_fetch:
result = get_from_location(local_path, resource)
if result:
return result
if remote_path:
# Didn't find it cached locally; now check the remoted
result = get_from_location(remote_path, resource)
if not result:
return result
else: # remote path is not set
return None
# Found it remotely, cache locally, then return it
local_full_path = os.path.join(_d(local_path), os.path.basename(result))
self.logger.debug('cp {} {}'.format(result, local_full_path))
shutil.copy(result, local_full_path)

View File

@ -1,7 +1,7 @@
import logging
from wa.framework import pluginloader, signal
from wa.framework.configuration.core import JobStatus
from wa.framework.configuration.core import Status
class Job(object):
@ -18,15 +18,25 @@ class Job(object):
def classifiers(self):
return self.spec.classifiers
@property
def status(self):
return self._status
@status.setter
def status(self, value):
self._status = value
if self.output:
self.output.status = value
def __init__(self, spec, iteration, context):
self.logger = logging.getLogger('job')
self.spec = spec
self.iteration = iteration
self.context = context
self.status = JobStatus.NEW
self.workload = None
self.output = None
self.retries = 0
self._status = Status.NEW
def load(self, target, loader=pluginloader):
self.logger.info('Loading job {}'.format(self.id))
@ -40,7 +50,7 @@ class Job(object):
self.logger.info('Initializing job {}'.format(self.id))
with signal.wrap('WORKLOAD_INITIALIZED', self, context):
self.workload.initialize(context)
self.status = JobStatus.PENDING
self.status = Status.PENDING
context.update_job_state(self)
def configure_target(self, context):

View File

@ -5,10 +5,11 @@ import string
import sys
import uuid
from copy import copy
from datetime import timedelta
from datetime import datetime, timedelta
from wa.framework.configuration.core import JobSpec, RunStatus
from wa.framework.configuration.manager import ConfigManager
from wa.framework.configuration.core import JobSpec, Status
from wa.framework.configuration.execution import ConfigManager
from wa.framework.exception import HostError
from wa.framework.run import RunState, RunInfo
from wa.framework.target.info import TargetInfo
from wa.utils.misc import touch, ensure_directory_exists
@ -21,13 +22,37 @@ logger = logging.getLogger('output')
class Output(object):
kind = None
@property
def resultfile(self):
return os.path.join(self.basepath, 'result.json')
@property
def event_summary(self):
num_events = len(self.events)
if num_events:
lines = self.events[0].message.split('\n')
message = '({} event(s)): {}'
if num_events > 1 or len(lines) > 1:
message += '[...]'
return message.format(num_events, lines[0])
return ''
@property
def status(self):
if self.result is None:
return None
return self.result.status
@status.setter
def status(self, value):
self.result.status = value
def __init__(self, path):
self.basepath = path
self.result = None
self.events = []
def reload(self):
pod = read_pod(self.resultfile)
@ -36,11 +61,16 @@ class Output(object):
def write_result(self):
write_pod(self.result.to_pod(), self.resultfile)
def get_path(self, subpath):
return os.path.join(self.basepath, subpath.strip(os.sep))
def add_metric(self, name, value, units=None, lower_is_better=False,
classifiers=None):
self.result.add_metric(name, value, units, lower_is_better, classifiers)
def add_artifact(self, name, path, kind, description=None, classifiers=None):
if not os.path.exists(path):
path = self.get_path(path)
if not os.path.exists(path):
msg = 'Attempting to add non-existing artifact: {}'
raise HostError(msg.format(path))
@ -51,9 +81,14 @@ class Output(object):
self.result.add_artifact(name, path, kind, description, classifiers)
def add_event(self, message):
self.result.add_event(message)
class RunOutput(Output):
kind = 'run'
@property
def logfile(self):
return os.path.join(self.basepath, 'run.log')
@ -96,7 +131,7 @@ class RunOutput(Output):
self.info = None
self.state = None
self.result = None
self.jobs = None
self.jobs = []
if (not os.path.isfile(self.statefile) or
not os.path.isfile(self.infofile)):
msg = '"{}" does not exist or is not a valid WA output directory.'
@ -126,7 +161,7 @@ class RunOutput(Output):
def write_target_info(self, ti):
write_pod(ti.to_pod(), self.targetfile)
def read_config(self):
def read_target_config(self):
if not os.path.isfile(self.targetfile):
return None
return TargetInfo.from_pod(read_pod(self.targetfile))
@ -155,8 +190,10 @@ class RunOutput(Output):
class JobOutput(Output):
kind = 'job'
def __init__(self, path, id, label, iteration, retry):
self.basepath = path
super(JobOutput, self).__init__(path)
self.id = id
self.label = label
self.iteration = iteration
@ -170,13 +207,17 @@ class Result(object):
@staticmethod
def from_pod(pod):
instance = Result()
instance.status = Status(pod['status'])
instance.metrics = [Metric.from_pod(m) for m in pod['metrics']]
instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']]
instance.events = [Event.from_pod(e) for e in pod['events']]
return instance
def __init__(self):
self.status = Status.NEW
self.metrics = []
self.artifacts = []
self.events = []
def add_metric(self, name, value, units=None, lower_is_better=False,
classifiers=None):
@ -190,10 +231,15 @@ class Result(object):
logger.debug('Adding artifact: {}'.format(artifact))
self.artifacts.append(artifact)
def add_event(self, message):
self.events.append(Event(message))
def to_pod(self):
return dict(
status=str(self.status),
metrics=[m.to_pod() for m in self.metrics],
artifacts=[a.to_pod() for a in self.artifacts],
events=[e.to_pod() for e in self.events],
)
@ -349,6 +395,43 @@ class Metric(object):
return '<{}>'.format(text)
class Event(object):
"""
An event that occured during a run.
"""
__slots__ = ['timestamp', 'message']
@staticmethod
def from_pod(pod):
instance = Event(pod['message'])
instance.timestamp = pod['timestamp']
return instance
@property
def summary(self):
lines = self.message.split('\n')
result = lines[0]
if len(lines) > 1:
result += '[...]'
return result
def __init__(self, message):
self.timestamp = datetime.utcnow()
self.message = message
def to_pod(self):
return dict(
timestamp=self.timestamp,
message=self.message,
)
def __str__(self):
return '[{}] {}'.format(self.timestamp, self.message)
__repr__ = __str__
def init_run_output(path, wa_state, force=False):
if os.path.exists(path):
@ -382,7 +465,10 @@ def init_job_output(run_output, job):
path = os.path.join(run_output.basepath, output_name)
ensure_directory_exists(path)
write_pod(Result().to_pod(), os.path.join(path, 'result.json'))
return JobOutput(path, job.id, job.iteration, job.label, job.retries)
job_output = JobOutput(path, job.id, job.iteration, job.label, job.retries)
job_output.status = job.status
run_output.jobs.append(job_output)
return job_output
def _save_raw_config(meta_dir, state):

87
wa/framework/processor.py Normal file
View File

@ -0,0 +1,87 @@
import logging
from wa.framework import pluginloader
from wa.framework.exception import ConfigError
from wa.framework.instrumentation import is_installed
from wa.framework.plugin import Plugin
from wa.utils.log import log_error, indent, dedent
class ResultProcessor(Plugin):
kind = 'result_processor'
requires = []
def validate(self):
super(ResultProcessor, self).validate()
for instrument in self.requires:
if not is_installed(instrument):
msg = 'Instrument "{}" is required by {}, but is not installed.'
raise ConfigError(msg.format(instrument, self.name))
def initialize(self):
pass
def finalize(self):
pass
class ProcessorManager(object):
def __init__(self, loader=pluginloader):
self.loader = loader
self.logger = logging.getLogger('processor')
self.processors = []
def install(self, processor):
if not isinstance(processor, ResultProcessor):
processor = self.loader.get_result_processor(processor)
self.logger.debug('Installing {}'.format(processor.name))
self.processors.append(processor)
def validate(self):
for proc in self.processors:
proc.validate()
def initialize(self):
for proc in self.processors:
proc.initialize()
def finalize(self):
for proc in self.processors:
proc.finalize()
def process_job_output(self, context):
self.do_for_each_proc('process_job_output', 'processing using "{}"',
context.job_output, context.target_info,
context.run_output)
def export_job_output(self, context):
self.do_for_each_proc('export_job_output', 'Exporting using "{}"',
context.job_output, context.target_info,
context.run_output)
def process_run_output(self, context):
self.do_for_each_proc('process_run_output', 'Processing using "{}"',
context.run_output, context.target_info)
def export_run_output(self, context):
self.do_for_each_proc('export_run_output', 'Exporting using "{}"',
context.run_output, context.target_info)
def do_for_each_proc(self, method_name, message, *args):
try:
indent()
for proc in self.processors:
proc_func = getattr(proc, method_name, None)
if proc_func is None:
continue
try:
self.logger.info(message.format(proc.name))
proc_func(*args)
except Exception as e:
if isinstance(e, KeyboardInterrupt):
raise
log_error(e, self.logger)
finally:
dedent()

View File

@ -12,52 +12,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import glob
import shutil
import inspect
import logging
import os
import re
import shutil
import sys
from collections import defaultdict
from devlib.utils.android import ApkInfo
from wa.framework import pluginloader
from wa.framework.plugin import Plugin, Parameter
from wa.framework.exception import ResourceError
from wa.framework.configuration import settings
from wa.utils.misc import ensure_directory_exists as _d
from wa.utils.types import boolean
from wa.utils.types import prioritylist
from wa.utils import log
from wa.utils.misc import ensure_directory_exists as _d, get_object_name
from wa.utils.types import boolean, prioritylist, enum
class GetterPriority(object):
"""
Enumerates standard ResourceGetter priorities. In general, getters should
register under one of these, rather than specifying other priority values.
:cached: The cached version of the resource. Look here first. This
priority also implies
that the resource at this location is a "cache" and is not
the only version of the resource, so it may be cleared without
losing access to the resource.
:preferred: Take this resource in favour of the environment resource.
:environment: Found somewhere under ~/.workload_automation/ or equivalent,
or from environment variables, external configuration
files, etc. These will override resource supplied with
the package.
:external_package: Resource provided by another package. :package:
Resource provided with the package. :remote:
Resource will be downloaded from a remote location
(such as an HTTP server or a samba share). Try this
only if no other getter was successful.
"""
cached = 20
preferred = 10
environment = 0
external_package = -5
package = -10
remote = -20
SourcePriority = enum(['package', 'remote', 'lan', 'external_package', 'local',
'perferred'], start=0, step=10)
class __NullOwner(object):
@ -77,6 +54,7 @@ class __NullOwner(object):
NO_ONE = __NullOwner()
class Resource(object):
"""
Represents a resource that needs to be resolved. This can be pretty much
@ -88,96 +66,106 @@ class Resource(object):
"""
name = None
kind = None
def __init__(self, owner):
def __init__(self, owner=NO_ONE):
self.owner = owner
def delete(self, instance):
"""
Delete an instance of this resource type. This must be implemented
by the concrete subclasses based on what the resource looks like,
e.g. deleting a file or a directory tree, or removing an entry from
a database.
:note: Implementation should *not* contain any logic for deciding
whether or not a resource should be deleted, only the actual
deletion. The assumption is that if this method is invoked,
then the decision has already been made.
"""
def match(self, path):
raise NotImplementedError()
def __str__(self):
return '<{}\'s {}>'.format(self.owner, self.name)
class FileResource(Resource):
"""
Base class for all resources that are a regular file in the
file system.
class File(Resource):
"""
def delete(self, instance):
os.remove(instance)
class File(FileResource):
name = 'file'
def __init__(self, owner, path, url=None):
super(File, self).__init__(owner)
self.path = path
self.url = url
def __str__(self):
return '<{}\'s {} {}>'.format(self.owner, self.name, self.path or self.url)
class PluginAsset(File):
name = 'plugin_asset'
kind = 'file'
def __init__(self, owner, path):
super(PluginAsset, self).__init__(owner, os.path.join(owner.name, path))
super(File, self).__init__(owner)
self.path = path
class Executable(FileResource):
name = 'executable'
def __init__(self, owner, platform, filename):
super(Executable, self).__init__(owner)
self.platform = platform
self.filename = filename
def match(self, path):
return self.path == path
def __str__(self):
return '<{}\'s {} {}>'.format(self.owner, self.platform, self.filename)
return '<{}\'s {} {}>'.format(self.owner, self.kind, self.path)
class ReventFile(FileResource):
name = 'revent'
class Executable(Resource):
def __init__(self, owner, stage):
kind = 'executable'
def __init__(self, owner, abi, filename):
super(Executable, self).__init__(owner)
self.abi = abi
self.filename = filename
def match(self, path):
return self.filename == os.path.basename(path)
def __str__(self):
return '<{}\'s {} {}>'.format(self.owner, self.abi, self.filename)
class ReventFile(Resource):
kind = 'revent'
def __init__(self, owner, stage, target):
super(ReventFile, self).__init__(owner)
self.stage = stage
self.target = target
def match(self, path):
filename = os.path.basename(path)
parts = filename.split('.')
if len(parts) > 2:
target, stage = parts[:2]
return target == self.target and stage == self.stage
else:
stage = parts[0]
return stage == self.stage
class JarFile(FileResource):
class JarFile(Resource):
name = 'jar'
kind = 'jar'
def match(self, path):
# An owner always has at most one jar file, so
# always match
return True
class ApkFile(FileResource):
class ApkFile(Resource):
name = 'apk'
kind = 'apk'
def __init__(self, owner, version):
def __init__(self, owner, variant=None, version=None):
super(ApkFile, self).__init__(owner)
self.variant = variant
self.version = version
def match(self, path):
name_matches = True
version_matches = True
if self.version is not None:
version_matches = apk_version_matches(path, self.version)
if self.variant is not None:
name_matches = file_name_matches(path, self.variant)
return name_matches and version_matches:
def __str__(self):
text = '<{}\'s apk'.format(self.owner)
if self.variant:
text += ' {}'.format(self.variant)
if self.version:
text += ' {}'.format(self.version)
text += '>'
return text
class ResourceGetter(Plugin):
"""
@ -192,10 +180,6 @@ class ResourceGetter(Plugin):
:name: Name that uniquely identifies this getter. Must be set by any
concrete subclass.
:resource_type: Identifies resource type(s) that this getter can
handle. This must be either a string (for a single type)
or a list of strings for multiple resource types. This
must be set by any concrete subclass.
:priority: Priority with which this getter will be invoked. This should
be one of the standard priorities specified in
``GetterPriority`` enumeration. If not set, this will default
@ -205,74 +189,12 @@ class ResourceGetter(Plugin):
name = None
kind = 'resource_getter'
resource_type = None
priority = GetterPriority.environment
def __init__(self, resolver, **kwargs):
super(ResourceGetter, self).__init__(**kwargs)
self.resolver = resolver
def register(self):
"""
Registers with a resource resolver. Concrete implementations must
override this to invoke ``self.resolver.register()`` method to register
``self`` for specific resource types.
"""
if self.resource_type is None:
message = 'No resource type specified for {}'
raise ValueError(message.format(self.name))
elif isinstance(self.resource_type, list):
for rt in self.resource_type:
self.resolver.register(self, rt, self.priority)
else:
self.resolver.register(self, self.resource_type, self.priority)
def unregister(self):
"""Unregister from a resource resolver."""
if self.resource_type is None:
message = 'No resource type specified for {}'
raise ValueError(message.format(self.name))
elif isinstance(self.resource_type, list):
for rt in self.resource_type:
self.resolver.unregister(self, rt)
else:
self.resolver.unregister(self, self.resource_type)
def get(self, resource, **kwargs):
"""
This will get invoked by the resolver when attempting to resolve a
resource, passing in the resource to be resolved as the first
parameter. Any additional parameters would be specific to a particular
resource type.
This method will only be invoked for resource types that the getter has
registered for.
:param resource: an instance of :class:`wlauto.core.resource.Resource`.
:returns: Implementations of this method must return either the
discovered resource or ``None`` if the resource could not
be discovered.
"""
def register(self, resolver):
raise NotImplementedError()
def delete(self, resource, *args, **kwargs):
"""
Delete the resource if it is discovered. All arguments are passed to a
call to``self.get()``. If that call returns a resource, it is deleted.
:returns: ``True`` if the specified resource has been discovered
and deleted, and ``False`` otherwise.
"""
discovered = self.get(resource, *args, **kwargs)
if discovered:
resource.delete(discovered)
return True
else:
return False
def initialize(self):
pass
def __str__(self):
return '<ResourceGetter {}>'.format(self.name)
@ -285,23 +207,31 @@ class ResourceResolver(object):
"""
def __init__(self, config):
def __init__(self, loader=pluginloader):
self.loader = loader
self.logger = logging.getLogger('resolver')
self.getters = defaultdict(prioritylist)
self.config = config
self.getters = []
self.sources = prioritylist()
def load(self):
"""
Discover getters under the specified source. The source could
be either a python package/module or a path.
for gettercls in self.loader.list_plugins('resource_getter'):
self.logger.debug('Loading getter {}'.format(gettercls.name))
getter = self.loader.get_plugin(name=gettercls.name,
kind="resource_getter")
log.indent()
try:
getter.initialize()
getter.register(self)
finally:
log.dedent()
self.getters.append(getter)
"""
def register(self, source, priority=SourcePriority.local):
msg = 'Registering "{}" with priority "{}"'
self.logger.debug(msg.format(get_object_name(source), priority))
self.sources.add(source, priority)
for rescls in pluginloader.list_resource_getters():
getter = self.config.get_plugin(name=rescls.name, kind="resource_getter", resolver=self)
getter.register()
def get(self, resource, strict=True, *args, **kwargs):
def get(self, resource, strict=True):
"""
Uses registered getters to attempt to discover a resource of the specified
kind and matching the specified criteria. Returns path to the resource that
@ -311,11 +241,13 @@ class ResourceResolver(object):
"""
self.logger.debug('Resolving {}'.format(resource))
for getter in self.getters[resource.name]:
self.logger.debug('Trying {}'.format(getter))
result = getter.get(resource, *args, **kwargs)
for source in self.sources:
source_name = get_object_name(source)
self.logger.debug('Trying {}'.format(source_name))
result = source(resource)
if result is not None:
self.logger.debug('Resource {} found using {}:'.format(resource, getter))
msg = 'Resource {} found using {}:'
self.logger.debug(msg.format(resource, source_name))
self.logger.debug('\t{}'.format(result))
return result
if strict:
@ -323,61 +255,19 @@ class ResourceResolver(object):
self.logger.debug('Resource {} not found.'.format(resource))
return None
def register(self, getter, kind, priority=0):
"""
Register the specified resource getter as being able to discover a resource
of the specified kind with the specified priority.
This method would typically be invoked by a getter inside its __init__.
The idea being that getters register themselves for resources they know
they can discover.
*priorities*
getters that are registered with the highest priority will be invoked first. If
multiple getters are registered under the same priority, they will be invoked
in the order they were registered (i.e. in the order they were discovered). This is
essentially non-deterministic.
Generally getters that are more likely to find a resource, or would find a
"better" version of the resource should register with higher (positive) priorities.
Fall-back getters that should only be invoked if a resource is not found by usual
means should register with lower (negative) priorities.
"""
self.logger.debug('Registering {} for {} resources'.format(getter.name, kind))
self.getters[kind].add(getter, priority)
def unregister(self, getter, kind):
"""
Unregister a getter that has been registered earlier.
"""
self.logger.debug('Unregistering {}'.format(getter.name))
try:
self.getters[kind].remove(getter)
except ValueError:
raise ValueError('Resource getter {} is not installed.'.format(getter.name))
# Utility functions
def get_from_location_by_extension(resource, location, extension, version=None):
found_files = glob.glob(os.path.join(location, '*.{}'.format(extension)))
if version:
found_files = [ff for ff in found_files
if version.lower() in os.path.basename(ff).lower()]
if len(found_files) == 1:
return found_files[0]
elif not found_files:
return None
else:
raise ResourceError('More than one .{} found in {} for {}.'.format(extension,
location,
resource.owner.name))
def apk_version_matches(path, version):
info = ApkInfo(path)
if info.version_name == version or info.version_code == version:
return True
return False
def _get_owner_path(resource):
if resource.owner is NO_ONE:
return os.path.join(os.path.dirname(__base_filepath), 'common')
else:
return os.path.dirname(sys.modules[resource.owner.__module__].__file__)
def file_name_matches(path, pattern):
filename = os.path.basename(path)
if pattern in filename:
return True
if re.search(pattern, filename):
return True
return False

View File

@ -1,510 +0,0 @@
# Copyright 2013-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the standard set of resource getters used by Workload Automation.
"""
import os
import sys
import shutil
import inspect
import httplib
import logging
import json
import requests
from wa import Parameter, settings, __file__ as __base_filepath
from wa.framework.resource import ResourceGetter, GetterPriority, NO_ONE
from wa.framework.exception import ResourceError
from wa.utils.misc import (ensure_directory_exists as _d,
ensure_file_directory_exists as _f, sha256, urljoin)
from wa.utils.types import boolean
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
class PackageFileGetter(ResourceGetter):
name = 'package_file'
description = """
Looks for exactly one file with the specified plugin in the owner's directory. If a version
is specified on invocation of get, it will filter the discovered file based on that version.
Versions are treated as case-insensitive.
"""
plugin = None
def register(self):
self.resolver.register(self, self.plugin, GetterPriority.package)
def get(self, resource, **kwargs):
resource_dir = os.path.dirname(sys.modules[resource.owner.__module__].__file__)
version = kwargs.get('version')
return get_from_location_by_plugin(resource, resource_dir, self.plugin, version)
class EnvironmentFileGetter(ResourceGetter):
name = 'environment_file'
description = """Looks for exactly one file with the specified plugin in the owner's directory. If a version
is specified on invocation of get, it will filter the discovered file based on that version.
Versions are treated as case-insensitive."""
plugin = None
def register(self):
self.resolver.register(self, self.plugin, GetterPriority.environment)
def get(self, resource, **kwargs):
resource_dir = resource.owner.dependencies_directory
version = kwargs.get('version')
return get_from_location_by_plugin(resource, resource_dir, self.plugin, version)
class ReventGetter(ResourceGetter):
"""Implements logic for identifying revent files."""
def get_base_location(self, resource):
raise NotImplementedError()
def register(self):
self.resolver.register(self, 'revent', GetterPriority.package)
def get(self, resource, **kwargs):
filename = '.'.join([resource.owner.device.model, resource.stage, 'revent']).lower()
location = _d(os.path.join(self.get_base_location(resource), 'revent_files'))
for candidate in os.listdir(location):
if candidate.lower() == filename.lower():
return os.path.join(location, candidate)
class PackageApkGetter(PackageFileGetter):
name = 'package_apk'
plugin = 'apk'
class PackageJarGetter(PackageFileGetter):
name = 'package_jar'
plugin = 'jar'
class PackageReventGetter(ReventGetter):
name = 'package_revent'
def get_base_location(self, resource):
return get_owner_path(resource)
class EnvironmentApkGetter(EnvironmentFileGetter):
name = 'environment_apk'
plugin = 'apk'
class EnvironmentJarGetter(EnvironmentFileGetter):
name = 'environment_jar'
plugin = 'jar'
class EnvironmentReventGetter(ReventGetter):
name = 'enviroment_revent'
def get_base_location(self, resource):
return resource.owner.dependencies_directory
class ExecutableGetter(ResourceGetter):
name = 'exe_getter'
resource_type = 'executable'
priority = GetterPriority.environment
def get(self, resource, **kwargs):
if settings.assets_repository:
path = os.path.join(settings.assets_repository, resource.platform, resource.filename)
if os.path.isfile(path):
return path
class PackageExecutableGetter(ExecutableGetter):
name = 'package_exe_getter'
priority = GetterPriority.package
def get(self, resource, **kwargs):
path = os.path.join(get_owner_path(resource), 'bin', resource.platform, resource.filename)
if os.path.isfile(path):
return path
class EnvironmentExecutableGetter(ExecutableGetter):
name = 'env_exe_getter'
def get(self, resource, **kwargs):
paths = [
os.path.join(resource.owner.dependencies_directory, 'bin',
resource.platform, resource.filename),
os.path.join(settings.user_directory, 'bin',
resource.platform, resource.filename),
]
for path in paths:
if os.path.isfile(path):
return path
class DependencyFileGetter(ResourceGetter):
name = 'filer'
description = """
Gets resources from the specified mount point. Copies them the local dependencies
directory, and returns the path to the local copy.
"""
resource_type = 'file'
relative_path = '' # May be overridden by subclasses.
priority = GetterPriority.remote
parameters = [
Parameter('mount_point', default='/', global_alias='remote_assets_path',
description='Local mount point for the remote filer.'),
]
def __init__(self, resolver, **kwargs):
super(DependencyFileGetter, self).__init__(resolver, **kwargs)
def get(self, resource, **kwargs):
force = kwargs.get('force')
remote_path = os.path.join(self.mount_point, self.relative_path, resource.path)
local_path = os.path.join(resource.owner.dependencies_directory, os.path.basename(resource.path))
if not os.path.isfile(local_path) or force:
if not os.path.isfile(remote_path):
return None
self.logger.debug('Copying {} to {}'.format(remote_path, local_path))
shutil.copy(remote_path, local_path)
return local_path
class PackageCommonDependencyGetter(ResourceGetter):
name = 'packaged_common_dependency'
resource_type = 'file'
priority = GetterPriority.package - 1 # check after owner-specific locations
def get(self, resource, **kwargs):
path = os.path.join(settings.package_directory, 'common', resource.path)
if os.path.exists(path):
return path
class EnvironmentCommonDependencyGetter(ResourceGetter):
name = 'environment_common_dependency'
resource_type = 'file'
priority = GetterPriority.environment - 1 # check after owner-specific locations
def get(self, resource, **kwargs):
path = os.path.join(settings.dependencies_directory,
os.path.basename(resource.path))
if os.path.exists(path):
return path
class PackageDependencyGetter(ResourceGetter):
name = 'packaged_dependency'
resource_type = 'file'
priority = GetterPriority.package
def get(self, resource, **kwargs):
owner_path = inspect.getfile(resource.owner.__class__)
path = os.path.join(os.path.dirname(owner_path), resource.path)
if os.path.exists(path):
return path
class EnvironmentDependencyGetter(ResourceGetter):
name = 'environment_dependency'
resource_type = 'file'
priority = GetterPriority.environment
def get(self, resource, **kwargs):
path = os.path.join(resource.owner.dependencies_directory, os.path.basename(resource.path))
if os.path.exists(path):
return path
class PluginAssetGetter(DependencyFileGetter):
name = 'plugin_asset'
resource_type = 'plugin_asset'
class HttpGetter(ResourceGetter):
name = 'http_assets'
description = """
Downloads resources from a server based on an index fetched from the specified URL.
Given a URL, this will try to fetch ``<URL>/index.json``. The index file maps plugin
names to a list of corresponing asset descriptons. Each asset description continas a path
(relative to the base URL) of the resource and a SHA256 hash, so that this Getter can
verify whether the resource on the remote has changed.
For example, let's assume we want to get the APK file for workload "foo", and that
assets are hosted at ``http://example.com/assets``. This Getter will first try to
donwload ``http://example.com/assests/index.json``. The index file may contian
something like ::
{
"foo": [
{
"path": "foo-app.apk",
"sha256": "b14530bb47e04ed655ac5e80e69beaa61c2020450e18638f54384332dffebe86"
},
{
"path": "subdir/some-other-asset.file",
"sha256": "48d9050e9802246d820625717b72f1c2ba431904b8484ca39befd68d1dbedfff"
}
]
}
This Getter will look through the list of assets for "foo" (in this case, two) check
the paths until it finds one matching the resource (in this case, "foo-app.apk").
Finally, it will try to dowload that file relative to the base URL and plugin name
(in this case, "http://example.com/assets/foo/foo-app.apk"). The downloaded version
will be cached locally, so that in the future, the getter will check the SHA256 hash
of the local file against the one advertised inside index.json, and provided that hasn't
changed, it won't try to download the file again.
"""
priority = GetterPriority.remote
resource_type = ['apk', 'file', 'jar', 'revent']
parameters = [
Parameter('url', global_alias='remote_assets_url',
description="""URL of the index file for assets on an HTTP server."""),
Parameter('username',
description="""User name for authenticating with assets URL"""),
Parameter('password',
description="""Password for authenticationg with assets URL"""),
Parameter('always_fetch', kind=boolean, default=False, global_alias='always_fetch_remote_assets',
description="""If ``True``, will always attempt to fetch assets from the remote, even if
a local cached copy is available."""),
Parameter('chunk_size', kind=int, default=1024,
description="""Chunk size for streaming large assets."""),
]
def __init__(self, resolver, **kwargs):
super(HttpGetter, self).__init__(resolver, **kwargs)
self.index = None
def get(self, resource, **kwargs):
if not resource.owner:
return # TODO: add support for unowned resources
if not self.index:
self.index = self.fetch_index()
asset = self.resolve_resource(resource)
if not asset:
return
return self.download_asset(asset, resource.owner.name)
def fetch_index(self):
if not self.url:
return {}
index_url = urljoin(self.url, 'index.json')
response = self.geturl(index_url)
if response.status_code != httplib.OK:
message = 'Could not fetch "{}"; recieved "{} {}"'
self.logger.error(message.format(index_url, response.status_code, response.reason))
return {}
return json.loads(response.content)
def download_asset(self, asset, owner_name):
url = urljoin(self.url, owner_name, asset['path'])
local_path = _f(os.path.join(settings.dependencies_directory, '__remote',
owner_name, asset['path'].replace('/', os.sep)))
if os.path.isfile(local_path) and not self.always_fetch:
local_sha = sha256(local_path)
if local_sha == asset['sha256']:
self.logger.debug('Local SHA256 matches; not re-downloading')
return local_path
self.logger.debug('Downloading {}'.format(url))
response = self.geturl(url, stream=True)
if response.status_code != httplib.OK:
message = 'Could not download asset "{}"; recieved "{} {}"'
self.logger.warning(message.format(url, response.status_code, response.reason))
return
with open(local_path, 'wb') as wfh:
for chunk in response.iter_content(chunk_size=self.chunk_size):
wfh.write(chunk)
return local_path
def geturl(self, url, stream=False):
if self.username:
auth = (self.username, self.password)
else:
auth = None
return requests.get(url, auth=auth, stream=stream)
def resolve_resource(self, resource):
assets = self.index.get(resource.owner.name, {})
if not assets:
return {}
if resource.name in ['apk', 'jar']:
paths = [a['path'] for a in assets]
version = getattr(resource, 'version', None)
found = get_from_list_by_plugin(resource, paths, resource.name, version)
if found:
for a in assets:
if a['path'] == found:
return a
elif resource.name == 'revent':
filename = '.'.join([resource.owner.device.name, resource.stage, 'revent']).lower()
for asset in assets:
pathname = os.path.basename(asset['path']).lower()
if pathname == filename:
return asset
else: # file
for asset in assets:
if asset['path'].lower() == resource.path.lower():
return asset
class RemoteFilerGetter(ResourceGetter):
name = 'filer_assets'
description = """
Finds resources on a (locally mounted) remote filer and caches them locally.
This assumes that the filer is mounted on the local machine (e.g. as a samba share).
"""
priority = GetterPriority.remote
resource_type = ['apk', 'file', 'jar', 'revent']
parameters = [
Parameter('remote_path', global_alias='remote_assets_path', default='',
description="""Path, on the local system, where the assets are located."""),
Parameter('always_fetch', kind=boolean, default=False, global_alias='always_fetch_remote_assets',
description="""If ``True``, will always attempt to fetch assets from the remote, even if
a local cached copy is available."""),
]
def get(self, resource, **kwargs):
version = kwargs.get('version')
if resource.owner:
remote_path = os.path.join(self.remote_path, resource.owner.name)
local_path = os.path.join(settings.user_directory, '__filer', resource.owner.dependencies_directory)
return self.try_get_resource(resource, version, remote_path, local_path)
else:
result = None
for entry in os.listdir(remote_path):
remote_path = os.path.join(self.remote_path, entry)
local_path = os.path.join(settings.user_directory, '__filer', settings.dependencies_directory, entry)
result = self.try_get_resource(resource, version, remote_path, local_path)
if result:
break
return result
def try_get_resource(self, resource, version, remote_path, local_path):
if not self.always_fetch:
result = self.get_from(resource, version, local_path)
if result:
return result
if remote_path:
# Didn't find it cached locally; now check the remoted
result = self.get_from(resource, version, remote_path)
if not result:
return result
else: # remote path is not set
return None
# Found it remotely, cache locally, then return it
local_full_path = os.path.join(_d(local_path), os.path.basename(result))
self.logger.debug('cp {} {}'.format(result, local_full_path))
shutil.copy(result, local_full_path)
return local_full_path
def get_from(self, resource, version, location): # pylint: disable=no-self-use
if resource.name in ['apk', 'jar']:
return get_from_location_by_plugin(resource, location, resource.name, version)
elif resource.name == 'file':
filepath = os.path.join(location, resource.path)
if os.path.exists(filepath):
return filepath
elif resource.name == 'revent':
filename = '.'.join([resource.owner.device.model, resource.stage, 'revent']).lower()
alternate_location = os.path.join(location, 'revent_files')
# There tends to be some confusion as to where revent files should
# be placed. This looks both in the plugin's directory, and in
# 'revent_files' subdirectory under it, if it exists.
if os.path.isdir(alternate_location):
for candidate in os.listdir(alternate_location):
if candidate.lower() == filename.lower():
return os.path.join(alternate_location, candidate)
if os.path.isdir(location):
for candidate in os.listdir(location):
if candidate.lower() == filename.lower():
return os.path.join(location, candidate)
else:
raise ValueError('Unexpected resource type: {}'.format(resource.name))
# Utility functions
def get_from_location_by_plugin(resource, location, plugin, version=None):
try:
found_files = [os.path.join(location, f) for f in os.listdir(location)]
except OSError:
return None
try:
return get_from_list_by_plugin(resource, found_files, plugin, version)
except ResourceError:
raise ResourceError('More than one .{} found in {} for {}.'.format(plugin,
location,
resource.owner.name))
def get_from_list_by_plugin(resource, filelist, plugin, version=None):
filelist = [ff for ff in filelist
if os.path.splitext(ff)[1].lower().endswith(plugin)]
if version:
filelist = [ff for ff in filelist if version.lower() in os.path.basename(ff).lower()]
if len(filelist) == 1:
return filelist[0]
elif not filelist:
return None
else:
raise ResourceError('More than one .{} found in {} for {}.'.format(plugin,
filelist,
resource.owner.name))
def get_owner_path(resource):
if resource.owner is NO_ONE:
return os.path.join(os.path.dirname(__base_filepath), 'common')
else:
return os.path.dirname(sys.modules[resource.owner.__module__].__file__)

View File

@ -18,7 +18,7 @@ from collections import OrderedDict, Counter
from copy import copy
from datetime import datetime, timedelta
from wa.framework.configuration.core import RunStatus, JobStatus
from wa.framework.configuration.core import Status
class RunInfo(object):
@ -67,7 +67,7 @@ class RunState(object):
@staticmethod
def from_pod(pod):
instance = RunState()
instance.status = RunStatus(pod['status'])
instance.status = Status(pod['status'])
instance.timestamp = pod['timestamp']
jss = [JobState.from_pod(j) for j in pod['jobs']]
instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss)
@ -76,12 +76,12 @@ class RunState(object):
@property
def num_completed_jobs(self):
return sum(1 for js in self.jobs.itervalues()
if js.status > JobStatus.SKIPPED)
if js.status > Status.SKIPPED)
def __init__(self):
self.jobs = OrderedDict()
self.status = RunStatus.NEW
self.timestamp = datetime.now()
self.status = Status.NEW
self.timestamp = datetime.utcnow()
def add_job(self, job):
job_state = JobState(job.id, job.label, job.iteration, job.status)
@ -90,7 +90,7 @@ class RunState(object):
def update_job(self, job):
state = self.jobs[(job.id, job.iteration)]
state.status = job.status
state.timestamp = datetime.now()
state.timestamp = datetime.utcnow()
def get_status_counts(self):
counter = Counter()
@ -110,7 +110,7 @@ class JobState(object):
@staticmethod
def from_pod(pod):
instance = JobState(pod['id'], pod['label'], JobStatus(pod['status']))
instance = JobState(pod['id'], pod['label'], Status(pod['status']))
instance.retries = pod['retries']
instance.iteration = pod['iteration']
instance.timestamp = pod['timestamp']
@ -126,7 +126,7 @@ class JobState(object):
self.iteration = iteration
self.status = status
self.retries = 0
self.timestamp = datetime.now()
self.timestamp = datetime.utcnow()
def to_pod(self):
return OrderedDict(

View File

@ -22,7 +22,7 @@ from wa.utils.serializer import json
from devlib import LocalLinuxTarget, LinuxTarget, AndroidTarget
from devlib.utils.types import identifier
# from wa.target.manager import AndroidTargetManager, LinuxTargetManager
from devlib.utils.misc import memoized
class TargetManager(object):
@ -52,6 +52,7 @@ class TargetManager(object):
]
def __init__(self, name, parameters):
self.logger = logging.getLogger('tm')
self.target_name = name
self.target = None
self.assistant = None
@ -82,6 +83,7 @@ class TargetManager(object):
if any(parameter in name for parameter in cfg.supported_parameters):
cfg.add(name, self.parameters.pop(name))
@memoized
def get_target_info(self):
return TargetInfo(self.target)
@ -108,6 +110,7 @@ class TargetManager(object):
self.target = instantiate_target(tdesc, self.parameters, connect=False)
with signal.wrap('TARGET_CONNECT'):
self.target.connect()
self.logger.info('Setting up target')
self.target.setup()
def _init_assistant(self):

View File

87
wa/processors/csvproc.py Normal file
View File

@ -0,0 +1,87 @@
import csv
from wa import ResultProcessor, Parameter
from wa.framework.exception import ConfigError
from wa.utils.types import list_of_strings
class CsvReportProcessor(ResultProcessor):
name = 'csv'
description = """
Creates a ``results.csv`` in the output directory containing results for
all iterations in CSV format, each line containing a single metric.
"""
parameters = [
Parameter('use_all_classifiers', kind=bool, default=False,
global_alias='use_all_classifiers',
description="""
If set to ``True``, this will add a column for every classifier
that features in at least one collected metric.
.. note:: This cannot be ``True`` if ``extra_columns`` is set.
"""),
Parameter('extra_columns', kind=list_of_strings,
description="""
List of classifiers to use as columns.
.. note:: This cannot be set if ``use_all_classifiers`` is
``True``.
"""),
]
def validate(self):
super(CsvReportProcessor, self).validate()
if self.use_all_classifiers and self.extra_columns:
msg = 'extra_columns cannot be specified when '\
'use_all_classifiers is True'
raise ConfigError(msg)
def initialize(self):
self.results_so_far = [] # pylint: disable=attribute-defined-outside-init
self.artifact_added = False
def process_job_output(self, output, target_info, run_output):
self.results_so_far.append(output)
self._write_results(self.results_so_far, run_output)
if not self.artifact_added:
run_output.add_artifact('run_result_csv', 'results.csv', 'export')
self.artifact_added = True
def process_run_result(self, output, target_info):
self.results_so_far.append(output.result)
self._write_results(self.rsults_so_far, output)
if not self.artifact_added:
output.add_artifact('run_result_csv', 'results.csv', 'export')
self.artifact_added = True
def _write_results(self, results, output):
if self.use_all_classifiers:
classifiers = set([])
for result in results:
for metric in result.metrics:
classifiers.update(metric.classifiers.keys())
extra_columns = list(classifiers)
elif self.extra_columns:
extra_columns = self.extra_columns
else:
extra_columns = []
outfile = output.get_path('results.csv')
with open(outfile, 'wb') as wfh:
writer = csv.writer(wfh)
writer.writerow(['id', 'workload', 'iteration', 'metric', ] +
extra_columns + ['value', 'units'])
for o in results:
header = [o.id, o.label, o.iteration]
for metric in o.result.metrics:
row = (header + [metric.name] +
[str(metric.classifiers.get(c, ''))
for c in extra_columns] +
[str(metric.value), metric.units or ''])
writer.writerow(row)

59
wa/processors/status.py Normal file
View File

@ -0,0 +1,59 @@
# Copyright 2013-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=R0201
import os
import time
from collections import Counter
from wa import ResultProcessor, Status
from wa.utils.misc import write_table
class StatusTxtReporter(ResultProcessor):
name = 'status'
description = """
Outputs a txt file containing general status information about which runs
failed and which were successful
"""
def process_run_output(self, output, target_info):
counter = Counter()
for jo in output.jobs:
counter[jo.status] += 1
outfile = output.get_path('status.txt')
self.logger.info('Status available in {}'.format(outfile))
with open(outfile, 'w') as wfh:
wfh.write('Run name: {}\n'.format(output.info.run_name))
wfh.write('Run status: {}\n'.format(output.status))
wfh.write('Date: {}\n'.format(time.strftime("%c")))
if output.events:
wfh.write('Events:\n')
for event in output.events:
wfh.write('\t{}\n'.format(event.summary))
txt = '{}/{} iterations completed without error\n'
wfh.write(txt.format(counter[Status.OK], len(output.jobs)))
wfh.write('\n')
status_lines = [map(str, [o.id, o.label, o.iteration, o.status,
o.event_summary])
for o in output.jobs]
write_table(status_lines, wfh, align='<<>><')
output.add_artifact('run_status_summary', 'status.txt', 'export')

View File

@ -583,3 +583,18 @@ def merge_dicts_simple(base, other):
def touch(path):
with open(path, 'w'):
pass
def get_object_name(obj):
if hasattr(obj, 'name'):
return obj.name
elif hasattr(obj, 'im_func'):
return '{}.{}'.format(get_object_name(obj.im_class),
obj.im_func.func_name)
elif hasattr(obj, 'func_name'):
return obj.func_name
elif hasattr(obj, '__name__'):
return obj.__name__
elif hasattr(obj, '__class__'):
return obj.__class__.__name__
return None

View File

@ -18,10 +18,7 @@
import os
import re
from wa import Workload, Parameter, ConfigError
this_dir = os.path.dirname(__file__)
from wa import Workload, Parameter, ConfigError, Executable
class Dhrystone(Workload):
@ -75,7 +72,8 @@ class Dhrystone(Workload):
]
def initialize(self, context):
host_exe = os.path.join(this_dir, 'dhrystone')
resource = Executable(self, self.target.abi, 'dhrystone')
host_exe = context.resolver.get(resource)
Dhrystone.target_exe = self.target.install(host_exe)
def setup(self, context):

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -13,11 +13,5 @@
# limitations under the License.
#
ndk-build
if [[ -f libs/armeabi/dhrystone ]]; then
echo "Dhrystone binary updated."
cp libs/armeabi/dhrystone ..
rm -rf libs
rm -rf obj
fi
dhrystone: dhrystone.c
$(CROSS_COMPILE)gcc -O3 -static dhrystone.c -o dhrystone

View File

@ -1,11 +0,0 @@
LOCAL_PATH:= $(call my-dir)
include $(CLEAR_VARS)
LOCAL_SRC_FILES:= dhrystone.c
LOCAL_MODULE := dhrystone
LOCAL_MODULE_TAGS := optional
LOCAL_STATIC_LIBRARIES := libc
LOCAL_SHARED_LIBRARIES := liblog
LOCAL_LDLIBS := -llog
LOCAL_CFLAGS := -O2
include $(BUILD_EXECUTABLE)