From 6492b95edd53b53913a1ae066c817d40dfdf98c6 Mon Sep 17 00:00:00 2001 From: Sebastian Goscik Date: Thu, 5 Nov 2015 11:11:38 +0000 Subject: [PATCH] Added ability to merge DAQ channels If the parameter merge_channels is set to true (false by default). DAQ channels that have consecutive letters on the end of their names will be summed together. E.g: A7, A15a, A15b becomes A7, A15 You can also manually specify a channel mapping by setting merge_channels to a dict. . --- wlauto/instrumentation/daq/__init__.py | 81 ++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 5 deletions(-) diff --git a/wlauto/instrumentation/daq/__init__.py b/wlauto/instrumentation/daq/__init__.py index 62c799af..8c3d60e1 100644 --- a/wlauto/instrumentation/daq/__init__.py +++ b/wlauto/instrumentation/daq/__init__.py @@ -13,7 +13,6 @@ # limitations under the License. # - # pylint: disable=W0613,E1101,access-member-before-definition,attribute-defined-outside-init from __future__ import division import os @@ -21,14 +20,16 @@ import sys import csv import shutil import tempfile -from collections import OrderedDict +from collections import OrderedDict, defaultdict +from string import ascii_lowercase + from multiprocessing import Process, Queue from wlauto import Instrument, Parameter from wlauto.core import signal from wlauto.exceptions import ConfigError, InstrumentError, DeviceError from wlauto.utils.misc import ensure_directory_exists as _d -from wlauto.utils.types import list_of_ints, list_of_strs +from wlauto.utils.types import list_of_ints, list_of_strs, boolean daqpower_path = os.path.join(os.path.dirname(__file__), '..', '..', 'external', 'daq_server', 'src') sys.path.insert(0, daqpower_path) @@ -52,6 +53,15 @@ GPIO_ROOT = '/sys/class/gpio' TRACE_MARKER_PATH = '/sys/kernel/debug/tracing/trace_marker' +def dict_or_bool(value): + """ + Ensures that either a dictionary or a boolean is used as a parameter. + """ + if isinstance(value, dict): + return value + return boolean(value) + + class Daq(Instrument): name = 'daq' @@ -158,6 +168,21 @@ class Daq(Instrument): to facillitate syncing kernel trace events to DAQ power trace. """), + Parameter('merge_channels', kind=dict_or_bool, default=False, + description=""" + If set to ``True``, channels with consecutive letter suffixes will be summed. + e.g. If you have channels A7a, A7b, A7c, A15a, A15b they will be summed to A7, A15 + + You can also manually specify the name of channels to be merged and the name of the + result like so: + + merge_channels: + A15: [A15dvfs, A15ram] + NonCPU: [GPU, RoS, Mem] + + In the above exaples the DAQ channels labeled A15a and A15b will be summed together + with the results being saved as 'channel' ''a''. A7, GPU and RoS will be summed to 'c' + """) ] def initialize(self, context): @@ -198,6 +223,10 @@ class Daq(Instrument): self.logger.debug('Downloading data files.') output_directory = _d(os.path.join(context.output_directory, 'daq')) self._execute_command('get_data', output_directory=output_directory) + + if self.merge_channels: + self._merge_channels(context) + for entry in os.listdir(output_directory): context.add_iteration_artifact('DAQ_{}'.format(os.path.splitext(entry)[0]), path=os.path.join('daq', entry), @@ -224,7 +253,6 @@ class Daq(Instrument): self._metrics |= set(metrics) rows = _get_rows(reader, writer, self.negative_samples) - #data = [map(float, d) for d in zip(*rows)] data = zip(*rows) if writer: @@ -249,7 +277,7 @@ class Daq(Instrument): unexport_path = self.device.path.join(GPIO_ROOT, 'unexport') self.device.set_sysfile_value(unexport_path, self.gpio_sync, verify=False) - def validate(self): + def validate(self): # pylint: disable=too-many-branches if not daq: raise ImportError(import_error_mesg) self._results = None @@ -273,6 +301,28 @@ class Daq(Instrument): self.device_config.validate() except ConfigurationError, ex: raise ConfigError('DAQ configuration: ' + ex.message) # Re-raise as a WA error + self.grouped_suffixes = defaultdict(str) + if isinstance(self.merge_channels, bool) and self.merge_channels: + # Create a dict of potential prefixes and a list of their suffixes + grouped_suffixes = {label[:-1]: label for label in sorted(self.labels) if len(label) > 1} + # Only merge channels if more than one channel has the same prefix and the prefixes + # are consecutive letters starting with 'a'. + self.label_map = {} + for channel, suffixes in grouped_suffixes.iteritems(): + if len(suffixes) > 1: + if "".join([s[-1] for s in suffixes]) in ascii_lowercase[:len(suffixes)]: + self.label_map[channel] = suffixes + + elif isinstance(self.merge_channels, dict): + # Check if given channel names match labels + for old_names in self.merge_channels.values(): + for name in old_names: + if name not in self.labels: + raise ConfigError("No channel with label {} specified".format(name)) + self.label_map = self.merge_channels + self.merge_channels = True + else: # Should never reach here + raise AssertionError("Merge files is of invalid type") def before_overall_results_processing(self, context): if self._results: @@ -315,6 +365,27 @@ class Daq(Instrument): raise InstrumentError('DAQ: Unexpected result: {} - {}'.format(result.status, result.message)) return (result.status, result.data) + def _merge_channels(self, context): # pylint: disable=r0914 + output_directory = _d(os.path.join(context.output_directory, 'daq')) + for name, labels in self.label_map.iteritems(): + summed = None + for label in labels: + path = os.path.join(output_directory, "{}.csv".format(label)) + with open(path) as fh: + reader = csv.reader(fh) + metrics = reader.next() + rows = _get_rows(reader, None, self.negative_samples) + if summed: + summed = [[x + y for x, y in zip(a, b)] for a, b in zip(rows, summed)] + else: + summed = rows + output_path = os.path.join(output_directory, "{}.csv".format(name)) + with open(output_path, 'wb') as wfh: + writer = csv.writer(wfh) + writer.writerow(metrics) + for row in summed: + writer.writerow(row) + def _send_daq_command(q, *args, **kwargs): result = daq.execute_command(*args, **kwargs)