1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2024-10-06 02:41:11 +01:00

Updated daqpower package

- Now works with earlier versions of the DAQmx driver. This is needed to
  be able to run the server on Linux systems, which support older
  verisions of the driver only.
- DAQ error messages are now properly propaged to the client (PyDAQmx
  uses "mess" rather than "message" attribute to store the message in
  the Exception obejects).
- pylint and pep8 fixes
This commit is contained in:
Sergei Trofimov 2015-06-09 10:59:56 +01:00
parent 0c19d75bf4
commit c239322c4d
8 changed files with 132 additions and 41 deletions

Binary file not shown.

Binary file not shown.

View File

@ -14,4 +14,4 @@
# #
__version__ = '1.0.1' __version__ = '1.0.2'

View File

@ -128,10 +128,11 @@ class CommandExecutorProtocol(Protocol):
response.message = 'No ports were returned.' response.message = 'No ports were returned.'
def processDevicesResponse(self, response): def processDevicesResponse(self, response):
if 'devices' not in response.data: if response.status == Status.OK:
self.errorOut('Response did not containt devices data: {} ({}).'.format(response, response.data)) if 'devices' not in response.data:
ports = response.data['devices'] self.errorOut('Response did not containt devices data: {} ({}).'.format(response, response.data))
response.data = ports devices = response.data['devices']
response.data = devices
def sendPullRequest(self, port_id): def sendPullRequest(self, port_id):
self.sendRequest('pull', port_id=port_id) self.sendRequest('pull', port_id=port_id)
@ -167,7 +168,7 @@ class CommandExecutorProtocol(Protocol):
class CommandExecutorFactory(ClientFactory): class CommandExecutorFactory(ClientFactory):
protocol = CommandExecutorProtocol protocol = CommandExecutorProtocol
wait_delay = 1 wait_delay = 1
def __init__(self, config, command, timeout=10, retries=1): def __init__(self, config, command, timeout=10, retries=1):
self.config = config self.config = config
@ -186,19 +187,19 @@ class CommandExecutorFactory(ClientFactory):
os.makedirs(self.output_directory) os.makedirs(self.output_directory)
def buildProtocol(self, addr): def buildProtocol(self, addr):
protocol = CommandExecutorProtocol(self.command, self.timeout, self.retries) protocol = CommandExecutorProtocol(self.command, self.timeout, self.retries)
protocol.factory = self protocol.factory = self
return protocol return protocol
def initiateFileTransfer(self, filename, port): def initiateFileTransfer(self, filename, port):
log.debug('Downloading {} from port {}'.format(filename, port)) log.debug('Downloading {} from port {}'.format(filename, port))
filepath = os.path.join(self.output_directory, filename) filepath = os.path.join(self.output_directory, filename)
session = FileReceiverFactory(filepath, self) session = FileReceiverFactory(filepath, self)
connector = reactor.connectTCP(self.config.host, port, session) connector = reactor.connectTCP(self.config.host, port, session)
self.transfers_in_progress[session] = connector self.transfers_in_progress[session] = connector
def transferComplete(self, session): def transferComplete(self, session):
connector = self.transfers_in_progress[session] connector = self.transfers_in_progress[session]
log.debug('Transfer on port {} complete.'.format(connector.port)) log.debug('Transfer on port {} complete.'.format(connector.port))
del self.transfers_in_progress[session] del self.transfers_in_progress[session]
@ -321,7 +322,7 @@ def execute_command(server_config, command, **kwargs):
# so in the long run, we need to do this properly and get the FDs # so in the long run, we need to do this properly and get the FDs
# from the reactor. # from the reactor.
after_fds = _get_open_fds() after_fds = _get_open_fds()
for fd in (after_fds - before_fds): for fd in after_fds - before_fds:
try: try:
os.close(int(fd[1:])) os.close(int(fd[1:]))
except OSError: except OSError:
@ -338,8 +339,7 @@ def _get_open_fds():
if os.name == 'posix': if os.name == 'posix':
import subprocess import subprocess
pid = os.getpid() pid = os.getpid()
procs = subprocess.check_output( procs = subprocess.check_output(["lsof", '-w', '-Ff', "-p", str(pid)])
[ "lsof", '-w', '-Ff', "-p", str( pid ) ] )
return set(procs.split()) return set(procs.split())
else: else:
# TODO: Implement the Windows equivalent. # TODO: Implement the Windows equivalent.
@ -362,7 +362,7 @@ def run_send_command():
else: else:
log.start_logging('INFO', fmt='%(levelname)-8s %(message)s') log.start_logging('INFO', fmt='%(levelname)-8s %(message)s')
if args.command == 'configure': if args.command == 'configure':
args.device_config.validate() args.device_config.validate()
command = Command(args.command, config=args.device_config) command = Command(args.command, config=args.device_config)
elif args.command == 'get_data': elif args.command == 'get_data':

View File

@ -44,9 +44,9 @@ class DeviceConfiguration(Serializable):
def __init__(self, **kwargs): # pylint: disable=W0231 def __init__(self, **kwargs): # pylint: disable=W0231
try: try:
self.device_id = kwargs.pop('device_id') or self.default_device_id self.device_id = kwargs.pop('device_id') or self.default_device_id
self.v_range = float(kwargs.pop('v_range') or self.default_v_range) self.v_range = float(kwargs.pop('v_range') or self.default_v_range)
self.dv_range = float(kwargs.pop('dv_range') or self.default_dv_range) self.dv_range = float(kwargs.pop('dv_range') or self.default_dv_range)
self.sampling_rate = int(kwargs.pop('sampling_rate') or self.default_sampling_rate) self.sampling_rate = int(kwargs.pop('sampling_rate') or self.default_sampling_rate)
self.resistor_values = kwargs.pop('resistor_values') or [] self.resistor_values = kwargs.pop('resistor_values') or []
self.channel_map = kwargs.pop('channel_map') or self.default_channel_map self.channel_map = kwargs.pop('channel_map') or self.default_channel_map
self.labels = (kwargs.pop('labels') or self.labels = (kwargs.pop('labels') or
@ -98,7 +98,7 @@ class UpdateDeviceConfig(argparse.Action):
setting = option_string.strip('-').replace('-', '_') setting = option_string.strip('-').replace('-', '_')
if setting not in DeviceConfiguration.valid_settings: if setting not in DeviceConfiguration.valid_settings:
raise ConfigurationError('Unkown option: {}'.format(option_string)) raise ConfigurationError('Unkown option: {}'.format(option_string))
setattr(namespace._device_config, setting, values) setattr(namespace._device_config, setting, values) # pylint: disable=protected-access
class UpdateServerConfig(argparse.Action): class UpdateServerConfig(argparse.Action):

View File

@ -42,7 +42,7 @@ Port 0
:sampling_rate: The rate at which DAQ takes a sample from each channel. :sampling_rate: The rate at which DAQ takes a sample from each channel.
""" """
# pylint: disable=F0401,E1101,W0621 # pylint: disable=F0401,E1101,W0621,no-name-in-module
import os import os
import sys import sys
import csv import csv
@ -52,23 +52,41 @@ from Queue import Queue, Empty
import numpy import numpy
from PyDAQmx import Task from PyDAQmx import Task, DAQError
from PyDAQmx.DAQmxFunctions import DAQmxGetSysDevNames try:
from PyDAQmx.DAQmxFunctions import DAQmxGetSysDevNames
CAN_ENUMERATE_DEVICES = True
except ImportError: # earlier driver version
DAQmxGetSysDevNames = None
CAN_ENUMERATE_DEVICES = False
from PyDAQmx.DAQmxTypes import int32, byref, create_string_buffer from PyDAQmx.DAQmxTypes import int32, byref, create_string_buffer
from PyDAQmx.DAQmxConstants import (DAQmx_Val_Diff, DAQmx_Val_Volts, DAQmx_Val_GroupByScanNumber, DAQmx_Val_Auto, from PyDAQmx.DAQmxConstants import (DAQmx_Val_Diff, DAQmx_Val_Volts, DAQmx_Val_GroupByScanNumber, DAQmx_Val_Auto,
DAQmx_Val_Acquired_Into_Buffer, DAQmx_Val_Rising, DAQmx_Val_ContSamps) DAQmx_Val_Rising, DAQmx_Val_ContSamps)
try:
from PyDAQmx.DAQmxConstants import DAQmx_Val_Acquired_Into_Buffer
callbacks_supported = True
except ImportError: # earlier driver version
DAQmx_Val_Acquired_Into_Buffer = None
callbacks_supported = False
from daqpower import log from daqpower import log
def list_available_devices(): def list_available_devices():
"""Returns the list of DAQ devices visible to the driver.""" """Returns the list of DAQ devices visible to the driver."""
bufsize = 2048 # Should be plenty for all but the most pathalogical of situations. if DAQmxGetSysDevNames:
buf = create_string_buffer('\000' * bufsize) bufsize = 2048 # Should be plenty for all but the most pathalogical of situations.
DAQmxGetSysDevNames(buf, bufsize) buf = create_string_buffer('\000' * bufsize)
return buf.value.split(',') DAQmxGetSysDevNames(buf, bufsize)
return buf.value.split(',')
else:
return []
class ReadSamplesTask(Task): class ReadSamplesBaseTask(Task):
def __init__(self, config, consumer): def __init__(self, config, consumer):
Task.__init__(self) Task.__init__(self)
@ -93,11 +111,27 @@ class ReadSamplesTask(Task):
DAQmx_Val_Rising, DAQmx_Val_Rising,
DAQmx_Val_ContSamps, DAQmx_Val_ContSamps,
self.config.sampling_rate) self.config.sampling_rate)
class ReadSamplesCallbackTask(ReadSamplesBaseTask):
"""
More recent verisons of the driver (on Windows) support callbacks
"""
def __init__(self, config, consumer):
ReadSamplesBaseTask.__init__(self, config, consumer)
# register callbacks # register callbacks
self.AutoRegisterEveryNSamplesEvent(DAQmx_Val_Acquired_Into_Buffer, self.config.sampling_rate // 2, 0) self.AutoRegisterEveryNSamplesEvent(DAQmx_Val_Acquired_Into_Buffer, self.config.sampling_rate // 2, 0)
self.AutoRegisterDoneEvent(0) self.AutoRegisterDoneEvent(0)
def EveryNCallback(self): def EveryNCallback(self):
# Note to future self: do NOT try to "optimize" this but re-using the same array and just
# zeroing it out each time. The writes happen asynchronously and if your zero it out too soon,
# you'll see a whole bunch of 0.0's in the output. If you wanna go down that route, you'll need
# cycler through several arrays and have the code that's actually doing the writing zero them out
# mark them as available to be used by this call. But, honestly, numpy array allocation does not
# appear to be a bottleneck at the moment, so the current solution is "good enough".
samples_buffer = numpy.zeros((self.sample_buffer_size,), dtype=numpy.float64) samples_buffer = numpy.zeros((self.sample_buffer_size,), dtype=numpy.float64)
self.ReadAnalogF64(DAQmx_Val_Auto, 0.0, DAQmx_Val_GroupByScanNumber, samples_buffer, self.ReadAnalogF64(DAQmx_Val_Auto, 0.0, DAQmx_Val_GroupByScanNumber, samples_buffer,
self.sample_buffer_size, byref(self.samples_read), None) self.sample_buffer_size, byref(self.samples_read), None)
@ -107,6 +141,51 @@ class ReadSamplesTask(Task):
return 0 # The function should return an integer return 0 # The function should return an integer
class ReadSamplesThreadedTask(ReadSamplesBaseTask):
"""
Earlier verisons of the driver (on CentOS) do not support callbacks. So need
to create a thread to periodically poll the buffer
"""
def __init__(self, config, consumer):
ReadSamplesBaseTask.__init__(self, config, consumer)
self.poller = DaqPoller(self)
def StartTask(self):
ReadSamplesBaseTask.StartTask(self)
self.poller.start()
def StopTask(self):
self.poller.stop()
ReadSamplesBaseTask.StopTask(self)
class DaqPoller(threading.Thread):
def __init__(self, task, wait_period=1):
super(DaqPoller, self).__init__()
self.task = task
self.wait_period = wait_period
self._stop_signal = threading.Event()
self.samples_buffer = numpy.zeros((self.task.sample_buffer_size,), dtype=numpy.float64)
def run(self):
while not self._stop_signal.is_set():
# Note to future self: see the comment inside EventNCallback() above
samples_buffer = numpy.zeros((self.task.sample_buffer_size,), dtype=numpy.float64)
try:
self.task.ReadAnalogF64(DAQmx_Val_Auto, self.wait_period, DAQmx_Val_GroupByScanNumber, samples_buffer,
self.task.sample_buffer_size, byref(self.task.samples_read), None)
except DAQError:
pass
self.task.consumer.write((samples_buffer, self.task.samples_read.value))
def stop(self):
self._stop_signal.set()
self.join()
class AsyncWriter(threading.Thread): class AsyncWriter(threading.Thread):
def __init__(self, wait_period=1): def __init__(self, wait_period=1):
@ -220,7 +299,10 @@ class DaqRunner(object):
def __init__(self, config, output_directory): def __init__(self, config, output_directory):
self.config = config self.config = config
self.processor = SampleProcessor(config.resistor_values, output_directory, config.labels) self.processor = SampleProcessor(config.resistor_values, output_directory, config.labels)
self.task = ReadSamplesTask(config, self.processor) if callbacks_supported:
self.task = ReadSamplesCallbackTask(config, self.processor)
else:
self.task = ReadSamplesThreadedTask(config, self.processor)
self.is_running = False self.is_running = False
def start(self): def start(self):

View File

@ -18,7 +18,6 @@
from __future__ import division from __future__ import division
import os import os
import sys import sys
import socket
import argparse import argparse
import shutil import shutil
import time import time
@ -37,11 +36,13 @@ from daqpower import log
from daqpower.config import DeviceConfiguration from daqpower.config import DeviceConfiguration
from daqpower.common import DaqServerRequest, DaqServerResponse, Status from daqpower.common import DaqServerRequest, DaqServerResponse, Status
try: try:
from daqpower.daq import DaqRunner, list_available_devices from daqpower.daq import DaqRunner, list_available_devices, CAN_ENUMERATE_DEVICES
except ImportError: __import_error = None
except ImportError as e:
# May be using debug mode. # May be using debug mode.
__import_error = e
DaqRunner = None DaqRunner = None
list_available_devices = lambda : ['Dev1'] list_available_devices = lambda: ['Dev1']
class ProtocolError(Exception): class ProtocolError(Exception):
@ -68,7 +69,7 @@ class DummyDaqRunner(object):
log.info('runner started') log.info('runner started')
for i in xrange(self.config.number_of_ports): for i in xrange(self.config.number_of_ports):
rows = [['power', 'voltage']] + [[random.gauss(1.0, 1.0), random.gauss(1.0, 0.1)] rows = [['power', 'voltage']] + [[random.gauss(1.0, 1.0), random.gauss(1.0, 0.1)]
for j in xrange(self.num_rows)] for _ in xrange(self.num_rows)]
with open(self.get_port_file_path(self.config.labels[i]), 'wb') as wfh: with open(self.get_port_file_path(self.config.labels[i]), 'wb') as wfh:
writer = csv.writer(wfh) writer = csv.writer(wfh)
writer.writerows(rows) writer.writerows(rows)
@ -139,7 +140,7 @@ class DaqServer(object):
else: else:
raise ProtocolError('Stop called before a session has been configured.') raise ProtocolError('Stop called before a session has been configured.')
def list_devices(self): def list_devices(self): # pylint: disable=no-self-use
return list_available_devices() return list_available_devices()
def list_ports(self): def list_ports(self):
@ -205,7 +206,10 @@ class DaqControlProtocol(LineReceiver): # pylint: disable=W0223
try: try:
request = DaqServerRequest.deserialize(line) request = DaqServerRequest.deserialize(line)
except Exception, e: # pylint: disable=W0703 except Exception, e: # pylint: disable=W0703
self.sendError('Received bad request ({}: {})'.format(e.__class__.__name__, e.message)) # PyDAQmx exceptions use "mess" rather than the standard "message"
# to pass errors...
message = getattr(e, 'mess', e.message)
self.sendError('Received bad request ({}: {})'.format(e.__class__.__name__, message))
else: else:
self.processRequest(request) self.processRequest(request)
@ -269,8 +273,12 @@ class DaqControlProtocol(LineReceiver): # pylint: disable=W0223
self.sendError('Invalid pull request; port id not provided.') self.sendError('Invalid pull request; port id not provided.')
def list_devices(self, request): def list_devices(self, request):
devices = self.daq_server.list_devices() if CAN_ENUMERATE_DEVICES:
self.sendResponse(Status.OK, data={'devices': devices}) devices = self.daq_server.list_devices()
self.sendResponse(Status.OK, data={'devices': devices})
else:
message = "Server does not support DAQ device enumration"
self.sendResponse(Status.OKISH, message=message)
def list_ports(self, request): def list_ports(self, request):
port_labels = self.daq_server.list_ports() port_labels = self.daq_server.list_ports()
@ -303,7 +311,7 @@ class DaqControlProtocol(LineReceiver): # pylint: disable=W0223
def sendLine(self, line): def sendLine(self, line):
log.info('Responding: {}'.format(line)) log.info('Responding: {}'.format(line))
LineReceiver.sendLine(self, line.replace('\r\n','')) LineReceiver.sendLine(self, line.replace('\r\n', ''))
def _initiate_file_transfer(self, filepath): def _initiate_file_transfer(self, filepath):
sender_factory = FileSenderFactory(filepath, self.factory) sender_factory = FileSenderFactory(filepath, self.factory)
@ -463,7 +471,7 @@ def run_server():
DaqRunner = DummyDaqRunner DaqRunner = DummyDaqRunner
else: else:
if not DaqRunner: if not DaqRunner:
raise ImportError('DaqRunner') raise __import_error # pylint: disable=raising-bad-type
if args.verbose or args.debug: if args.verbose or args.debug:
log.start_logging('DEBUG') log.start_logging('DEBUG')
else: else:
@ -471,7 +479,8 @@ def run_server():
server = DaqServer(args.directory) server = DaqServer(args.directory)
reactor.listenTCP(args.port, DaqFactory(server)).getHost() reactor.listenTCP(args.port, DaqFactory(server)).getHost()
hostname = socket.gethostbyname(socket.gethostname()) #hostname = socket.gethostbyname(socket.gethostname())
hostname = '192.168.108.131'
log.info('Listening on {}:{}'.format(hostname, args.port)) log.info('Listening on {}:{}'.format(hostname, args.port))
reactor.run() reactor.run()

View File

@ -134,8 +134,8 @@ class Daq(Instrument):
] ]
def initialize(self, context): def initialize(self, context):
devices = self._execute_command('list_devices') status, devices = self._execute_command('list_devices')
if not devices: if status == daq.Status.OK and not devices:
raise InstrumentError('DAQ: server did not report any devices registered with the driver.') raise InstrumentError('DAQ: server did not report any devices registered with the driver.')
self._results = OrderedDict() self._results = OrderedDict()
@ -235,7 +235,7 @@ class Daq(Instrument):
raise InstrumentError('DAQ: {}'.format(result.message)) raise InstrumentError('DAQ: {}'.format(result.message))
else: else:
raise InstrumentError('DAQ: Unexpected result: {} - {}'.format(result.status, result.message)) raise InstrumentError('DAQ: Unexpected result: {} - {}'.format(result.status, result.message))
return result.data return (result.status, result.data)
def _send_daq_command(q, *args, **kwargs): def _send_daq_command(q, *args, **kwargs):