1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-19 04:21:17 +00:00

Merge pull request #54 from ep1cman/daq_merge

Added ability to merge DAQ channels
This commit is contained in:
setrofim 2015-11-13 11:00:36 +00:00
commit 9fe4887626

View File

@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
# #
# pylint: disable=W0613,E1101,access-member-before-definition,attribute-defined-outside-init # pylint: disable=W0613,E1101,access-member-before-definition,attribute-defined-outside-init
from __future__ import division from __future__ import division
import os import os
@ -21,14 +20,16 @@ import sys
import csv import csv
import shutil import shutil
import tempfile import tempfile
from collections import OrderedDict from collections import OrderedDict, defaultdict
from string import ascii_lowercase
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from wlauto import Instrument, Parameter from wlauto import Instrument, Parameter
from wlauto.core import signal from wlauto.core import signal
from wlauto.exceptions import ConfigError, InstrumentError, DeviceError from wlauto.exceptions import ConfigError, InstrumentError, DeviceError
from wlauto.utils.misc import ensure_directory_exists as _d from wlauto.utils.misc import ensure_directory_exists as _d
from wlauto.utils.types import list_of_ints, list_of_strs from wlauto.utils.types import list_of_ints, list_of_strs, boolean
daqpower_path = os.path.join(os.path.dirname(__file__), '..', '..', 'external', 'daq_server', 'src') daqpower_path = os.path.join(os.path.dirname(__file__), '..', '..', 'external', 'daq_server', 'src')
sys.path.insert(0, daqpower_path) sys.path.insert(0, daqpower_path)
@ -52,6 +53,15 @@ GPIO_ROOT = '/sys/class/gpio'
TRACE_MARKER_PATH = '/sys/kernel/debug/tracing/trace_marker' TRACE_MARKER_PATH = '/sys/kernel/debug/tracing/trace_marker'
def dict_or_bool(value):
"""
Ensures that either a dictionary or a boolean is used as a parameter.
"""
if isinstance(value, dict):
return value
return boolean(value)
class Daq(Instrument): class Daq(Instrument):
name = 'daq' name = 'daq'
@ -158,6 +168,21 @@ class Daq(Instrument):
to facillitate syncing kernel trace events to DAQ power to facillitate syncing kernel trace events to DAQ power
trace. trace.
"""), """),
Parameter('merge_channels', kind=dict_or_bool, default=False,
description="""
If set to ``True``, channels with consecutive letter suffixes will be summed.
e.g. If you have channels A7a, A7b, A7c, A15a, A15b they will be summed to A7, A15
You can also manually specify the name of channels to be merged and the name of the
result like so:
merge_channels:
A15: [A15dvfs, A15ram]
NonCPU: [GPU, RoS, Mem]
In the above exaples the DAQ channels labeled A15a and A15b will be summed together
with the results being saved as 'channel' ''a''. A7, GPU and RoS will be summed to 'c'
""")
] ]
def initialize(self, context): def initialize(self, context):
@ -198,6 +223,10 @@ class Daq(Instrument):
self.logger.debug('Downloading data files.') self.logger.debug('Downloading data files.')
output_directory = _d(os.path.join(context.output_directory, 'daq')) output_directory = _d(os.path.join(context.output_directory, 'daq'))
self._execute_command('get_data', output_directory=output_directory) self._execute_command('get_data', output_directory=output_directory)
if self.merge_channels:
self._merge_channels(context)
for entry in os.listdir(output_directory): for entry in os.listdir(output_directory):
context.add_iteration_artifact('DAQ_{}'.format(os.path.splitext(entry)[0]), context.add_iteration_artifact('DAQ_{}'.format(os.path.splitext(entry)[0]),
path=os.path.join('daq', entry), path=os.path.join('daq', entry),
@ -224,7 +253,6 @@ class Daq(Instrument):
self._metrics |= set(metrics) self._metrics |= set(metrics)
rows = _get_rows(reader, writer, self.negative_samples) rows = _get_rows(reader, writer, self.negative_samples)
#data = [map(float, d) for d in zip(*rows)]
data = zip(*rows) data = zip(*rows)
if writer: if writer:
@ -249,7 +277,7 @@ class Daq(Instrument):
unexport_path = self.device.path.join(GPIO_ROOT, 'unexport') unexport_path = self.device.path.join(GPIO_ROOT, 'unexport')
self.device.set_sysfile_value(unexport_path, self.gpio_sync, verify=False) self.device.set_sysfile_value(unexport_path, self.gpio_sync, verify=False)
def validate(self): def validate(self): # pylint: disable=too-many-branches
if not daq: if not daq:
raise ImportError(import_error_mesg) raise ImportError(import_error_mesg)
self._results = None self._results = None
@ -273,6 +301,28 @@ class Daq(Instrument):
self.device_config.validate() self.device_config.validate()
except ConfigurationError, ex: except ConfigurationError, ex:
raise ConfigError('DAQ configuration: ' + ex.message) # Re-raise as a WA error raise ConfigError('DAQ configuration: ' + ex.message) # Re-raise as a WA error
self.grouped_suffixes = defaultdict(str)
if isinstance(self.merge_channels, bool) and self.merge_channels:
# Create a dict of potential prefixes and a list of their suffixes
grouped_suffixes = {label[:-1]: label for label in sorted(self.labels) if len(label) > 1}
# Only merge channels if more than one channel has the same prefix and the prefixes
# are consecutive letters starting with 'a'.
self.label_map = {}
for channel, suffixes in grouped_suffixes.iteritems():
if len(suffixes) > 1:
if "".join([s[-1] for s in suffixes]) in ascii_lowercase[:len(suffixes)]:
self.label_map[channel] = suffixes
elif isinstance(self.merge_channels, dict):
# Check if given channel names match labels
for old_names in self.merge_channels.values():
for name in old_names:
if name not in self.labels:
raise ConfigError("No channel with label {} specified".format(name))
self.label_map = self.merge_channels
self.merge_channels = True
else: # Should never reach here
raise AssertionError("Merge files is of invalid type")
def before_overall_results_processing(self, context): def before_overall_results_processing(self, context):
if self._results: if self._results:
@ -315,6 +365,27 @@ class Daq(Instrument):
raise InstrumentError('DAQ: Unexpected result: {} - {}'.format(result.status, result.message)) raise InstrumentError('DAQ: Unexpected result: {} - {}'.format(result.status, result.message))
return (result.status, result.data) return (result.status, result.data)
def _merge_channels(self, context): # pylint: disable=r0914
output_directory = _d(os.path.join(context.output_directory, 'daq'))
for name, labels in self.label_map.iteritems():
summed = None
for label in labels:
path = os.path.join(output_directory, "{}.csv".format(label))
with open(path) as fh:
reader = csv.reader(fh)
metrics = reader.next()
rows = _get_rows(reader, None, self.negative_samples)
if summed:
summed = [[x + y for x, y in zip(a, b)] for a, b in zip(rows, summed)]
else:
summed = rows
output_path = os.path.join(output_directory, "{}.csv".format(name))
with open(output_path, 'wb') as wfh:
writer = csv.writer(wfh)
writer.writerow(metrics)
for row in summed:
writer.writerow(row)
def _send_daq_command(q, *args, **kwargs): def _send_daq_command(q, *args, **kwargs):
result = daq.execute_command(*args, **kwargs) result = daq.execute_command(*args, **kwargs)