diff --git a/wlauto/external/daq_server/daqpower-1.0.1.tar.gz b/wlauto/external/daq_server/daqpower-1.0.1.tar.gz deleted file mode 100644 index 671a45e8..00000000 Binary files a/wlauto/external/daq_server/daqpower-1.0.1.tar.gz and /dev/null differ diff --git a/wlauto/external/daq_server/daqpower-1.0.2.tar.gz b/wlauto/external/daq_server/daqpower-1.0.2.tar.gz new file mode 100644 index 00000000..7e63bf0e Binary files /dev/null and b/wlauto/external/daq_server/daqpower-1.0.2.tar.gz differ diff --git a/wlauto/external/daq_server/src/daqpower/__init__.py b/wlauto/external/daq_server/src/daqpower/__init__.py index ed442117..97b0d54a 100644 --- a/wlauto/external/daq_server/src/daqpower/__init__.py +++ b/wlauto/external/daq_server/src/daqpower/__init__.py @@ -14,4 +14,4 @@ # -__version__ = '1.0.1' +__version__ = '1.0.2' diff --git a/wlauto/external/daq_server/src/daqpower/client.py b/wlauto/external/daq_server/src/daqpower/client.py index b129dc77..14fe69d4 100644 --- a/wlauto/external/daq_server/src/daqpower/client.py +++ b/wlauto/external/daq_server/src/daqpower/client.py @@ -128,10 +128,11 @@ class CommandExecutorProtocol(Protocol): response.message = 'No ports were returned.' def processDevicesResponse(self, response): - if 'devices' not in response.data: - self.errorOut('Response did not containt devices data: {} ({}).'.format(response, response.data)) - ports = response.data['devices'] - response.data = ports + if response.status == Status.OK: + if 'devices' not in response.data: + self.errorOut('Response did not containt devices data: {} ({}).'.format(response, response.data)) + devices = response.data['devices'] + response.data = devices def sendPullRequest(self, port_id): self.sendRequest('pull', port_id=port_id) @@ -167,7 +168,7 @@ class CommandExecutorProtocol(Protocol): class CommandExecutorFactory(ClientFactory): protocol = CommandExecutorProtocol - wait_delay = 1 + wait_delay = 1 def __init__(self, config, command, timeout=10, retries=1): self.config = config @@ -186,19 +187,19 @@ class CommandExecutorFactory(ClientFactory): os.makedirs(self.output_directory) def buildProtocol(self, addr): - protocol = CommandExecutorProtocol(self.command, self.timeout, self.retries) + protocol = CommandExecutorProtocol(self.command, self.timeout, self.retries) protocol.factory = self return protocol def initiateFileTransfer(self, filename, port): log.debug('Downloading {} from port {}'.format(filename, port)) filepath = os.path.join(self.output_directory, filename) - session = FileReceiverFactory(filepath, self) + session = FileReceiverFactory(filepath, self) connector = reactor.connectTCP(self.config.host, port, session) self.transfers_in_progress[session] = connector def transferComplete(self, session): - connector = self.transfers_in_progress[session] + connector = self.transfers_in_progress[session] log.debug('Transfer on port {} complete.'.format(connector.port)) del self.transfers_in_progress[session] @@ -321,7 +322,7 @@ def execute_command(server_config, command, **kwargs): # so in the long run, we need to do this properly and get the FDs # from the reactor. after_fds = _get_open_fds() - for fd in (after_fds - before_fds): + for fd in after_fds - before_fds: try: os.close(int(fd[1:])) except OSError: @@ -338,8 +339,7 @@ def _get_open_fds(): if os.name == 'posix': import subprocess pid = os.getpid() - procs = subprocess.check_output( - [ "lsof", '-w', '-Ff', "-p", str( pid ) ] ) + procs = subprocess.check_output(["lsof", '-w', '-Ff', "-p", str(pid)]) return set(procs.split()) else: # TODO: Implement the Windows equivalent. @@ -362,7 +362,7 @@ def run_send_command(): else: log.start_logging('INFO', fmt='%(levelname)-8s %(message)s') - if args.command == 'configure': + if args.command == 'configure': args.device_config.validate() command = Command(args.command, config=args.device_config) elif args.command == 'get_data': diff --git a/wlauto/external/daq_server/src/daqpower/config.py b/wlauto/external/daq_server/src/daqpower/config.py index bfc3280f..86343424 100644 --- a/wlauto/external/daq_server/src/daqpower/config.py +++ b/wlauto/external/daq_server/src/daqpower/config.py @@ -44,9 +44,9 @@ class DeviceConfiguration(Serializable): def __init__(self, **kwargs): # pylint: disable=W0231 try: self.device_id = kwargs.pop('device_id') or self.default_device_id - self.v_range = float(kwargs.pop('v_range') or self.default_v_range) + self.v_range = float(kwargs.pop('v_range') or self.default_v_range) self.dv_range = float(kwargs.pop('dv_range') or self.default_dv_range) - self.sampling_rate = int(kwargs.pop('sampling_rate') or self.default_sampling_rate) + self.sampling_rate = int(kwargs.pop('sampling_rate') or self.default_sampling_rate) self.resistor_values = kwargs.pop('resistor_values') or [] self.channel_map = kwargs.pop('channel_map') or self.default_channel_map self.labels = (kwargs.pop('labels') or @@ -98,7 +98,7 @@ class UpdateDeviceConfig(argparse.Action): setting = option_string.strip('-').replace('-', '_') if setting not in DeviceConfiguration.valid_settings: raise ConfigurationError('Unkown option: {}'.format(option_string)) - setattr(namespace._device_config, setting, values) + setattr(namespace._device_config, setting, values) # pylint: disable=protected-access class UpdateServerConfig(argparse.Action): diff --git a/wlauto/external/daq_server/src/daqpower/daq.py b/wlauto/external/daq_server/src/daqpower/daq.py index 22951ab9..d70399b0 100644 --- a/wlauto/external/daq_server/src/daqpower/daq.py +++ b/wlauto/external/daq_server/src/daqpower/daq.py @@ -42,7 +42,7 @@ Port 0 :sampling_rate: The rate at which DAQ takes a sample from each channel. """ -# pylint: disable=F0401,E1101,W0621 +# pylint: disable=F0401,E1101,W0621,no-name-in-module import os import sys import csv @@ -52,23 +52,41 @@ from Queue import Queue, Empty import numpy -from PyDAQmx import Task -from PyDAQmx.DAQmxFunctions import DAQmxGetSysDevNames +from PyDAQmx import Task, DAQError +try: + from PyDAQmx.DAQmxFunctions import DAQmxGetSysDevNames + CAN_ENUMERATE_DEVICES = True +except ImportError: # earlier driver version + DAQmxGetSysDevNames = None + CAN_ENUMERATE_DEVICES = False + from PyDAQmx.DAQmxTypes import int32, byref, create_string_buffer from PyDAQmx.DAQmxConstants import (DAQmx_Val_Diff, DAQmx_Val_Volts, DAQmx_Val_GroupByScanNumber, DAQmx_Val_Auto, - DAQmx_Val_Acquired_Into_Buffer, DAQmx_Val_Rising, DAQmx_Val_ContSamps) + DAQmx_Val_Rising, DAQmx_Val_ContSamps) + +try: + from PyDAQmx.DAQmxConstants import DAQmx_Val_Acquired_Into_Buffer + callbacks_supported = True +except ImportError: # earlier driver version + DAQmx_Val_Acquired_Into_Buffer = None + callbacks_supported = False + from daqpower import log + def list_available_devices(): """Returns the list of DAQ devices visible to the driver.""" - bufsize = 2048 # Should be plenty for all but the most pathalogical of situations. - buf = create_string_buffer('\000' * bufsize) - DAQmxGetSysDevNames(buf, bufsize) - return buf.value.split(',') + if DAQmxGetSysDevNames: + bufsize = 2048 # Should be plenty for all but the most pathalogical of situations. + buf = create_string_buffer('\000' * bufsize) + DAQmxGetSysDevNames(buf, bufsize) + return buf.value.split(',') + else: + return [] -class ReadSamplesTask(Task): +class ReadSamplesBaseTask(Task): def __init__(self, config, consumer): Task.__init__(self) @@ -93,11 +111,27 @@ class ReadSamplesTask(Task): DAQmx_Val_Rising, DAQmx_Val_ContSamps, self.config.sampling_rate) + + +class ReadSamplesCallbackTask(ReadSamplesBaseTask): + """ + More recent verisons of the driver (on Windows) support callbacks + + """ + + def __init__(self, config, consumer): + ReadSamplesBaseTask.__init__(self, config, consumer) # register callbacks self.AutoRegisterEveryNSamplesEvent(DAQmx_Val_Acquired_Into_Buffer, self.config.sampling_rate // 2, 0) self.AutoRegisterDoneEvent(0) def EveryNCallback(self): + # Note to future self: do NOT try to "optimize" this but re-using the same array and just + # zeroing it out each time. The writes happen asynchronously and if your zero it out too soon, + # you'll see a whole bunch of 0.0's in the output. If you wanna go down that route, you'll need + # cycler through several arrays and have the code that's actually doing the writing zero them out + # mark them as available to be used by this call. But, honestly, numpy array allocation does not + # appear to be a bottleneck at the moment, so the current solution is "good enough". samples_buffer = numpy.zeros((self.sample_buffer_size,), dtype=numpy.float64) self.ReadAnalogF64(DAQmx_Val_Auto, 0.0, DAQmx_Val_GroupByScanNumber, samples_buffer, self.sample_buffer_size, byref(self.samples_read), None) @@ -107,6 +141,51 @@ class ReadSamplesTask(Task): return 0 # The function should return an integer +class ReadSamplesThreadedTask(ReadSamplesBaseTask): + """ + Earlier verisons of the driver (on CentOS) do not support callbacks. So need + to create a thread to periodically poll the buffer + + """ + + def __init__(self, config, consumer): + ReadSamplesBaseTask.__init__(self, config, consumer) + self.poller = DaqPoller(self) + + def StartTask(self): + ReadSamplesBaseTask.StartTask(self) + self.poller.start() + + def StopTask(self): + self.poller.stop() + ReadSamplesBaseTask.StopTask(self) + + +class DaqPoller(threading.Thread): + + def __init__(self, task, wait_period=1): + super(DaqPoller, self).__init__() + self.task = task + self.wait_period = wait_period + self._stop_signal = threading.Event() + self.samples_buffer = numpy.zeros((self.task.sample_buffer_size,), dtype=numpy.float64) + + def run(self): + while not self._stop_signal.is_set(): + # Note to future self: see the comment inside EventNCallback() above + samples_buffer = numpy.zeros((self.task.sample_buffer_size,), dtype=numpy.float64) + try: + self.task.ReadAnalogF64(DAQmx_Val_Auto, self.wait_period, DAQmx_Val_GroupByScanNumber, samples_buffer, + self.task.sample_buffer_size, byref(self.task.samples_read), None) + except DAQError: + pass + self.task.consumer.write((samples_buffer, self.task.samples_read.value)) + + def stop(self): + self._stop_signal.set() + self.join() + + class AsyncWriter(threading.Thread): def __init__(self, wait_period=1): @@ -220,7 +299,10 @@ class DaqRunner(object): def __init__(self, config, output_directory): self.config = config self.processor = SampleProcessor(config.resistor_values, output_directory, config.labels) - self.task = ReadSamplesTask(config, self.processor) + if callbacks_supported: + self.task = ReadSamplesCallbackTask(config, self.processor) + else: + self.task = ReadSamplesThreadedTask(config, self.processor) self.is_running = False def start(self): diff --git a/wlauto/external/daq_server/src/daqpower/server.py b/wlauto/external/daq_server/src/daqpower/server.py index 9aac51a2..4dc23272 100644 --- a/wlauto/external/daq_server/src/daqpower/server.py +++ b/wlauto/external/daq_server/src/daqpower/server.py @@ -18,7 +18,6 @@ from __future__ import division import os import sys -import socket import argparse import shutil import time @@ -37,11 +36,13 @@ from daqpower import log from daqpower.config import DeviceConfiguration from daqpower.common import DaqServerRequest, DaqServerResponse, Status try: - from daqpower.daq import DaqRunner, list_available_devices -except ImportError: + from daqpower.daq import DaqRunner, list_available_devices, CAN_ENUMERATE_DEVICES + __import_error = None +except ImportError as e: # May be using debug mode. + __import_error = e DaqRunner = None - list_available_devices = lambda : ['Dev1'] + list_available_devices = lambda: ['Dev1'] class ProtocolError(Exception): @@ -68,7 +69,7 @@ class DummyDaqRunner(object): log.info('runner started') for i in xrange(self.config.number_of_ports): rows = [['power', 'voltage']] + [[random.gauss(1.0, 1.0), random.gauss(1.0, 0.1)] - for j in xrange(self.num_rows)] + for _ in xrange(self.num_rows)] with open(self.get_port_file_path(self.config.labels[i]), 'wb') as wfh: writer = csv.writer(wfh) writer.writerows(rows) @@ -139,7 +140,7 @@ class DaqServer(object): else: raise ProtocolError('Stop called before a session has been configured.') - def list_devices(self): + def list_devices(self): # pylint: disable=no-self-use return list_available_devices() def list_ports(self): @@ -205,7 +206,10 @@ class DaqControlProtocol(LineReceiver): # pylint: disable=W0223 try: request = DaqServerRequest.deserialize(line) except Exception, e: # pylint: disable=W0703 - self.sendError('Received bad request ({}: {})'.format(e.__class__.__name__, e.message)) + # PyDAQmx exceptions use "mess" rather than the standard "message" + # to pass errors... + message = getattr(e, 'mess', e.message) + self.sendError('Received bad request ({}: {})'.format(e.__class__.__name__, message)) else: self.processRequest(request) @@ -269,8 +273,12 @@ class DaqControlProtocol(LineReceiver): # pylint: disable=W0223 self.sendError('Invalid pull request; port id not provided.') def list_devices(self, request): - devices = self.daq_server.list_devices() - self.sendResponse(Status.OK, data={'devices': devices}) + if CAN_ENUMERATE_DEVICES: + devices = self.daq_server.list_devices() + self.sendResponse(Status.OK, data={'devices': devices}) + else: + message = "Server does not support DAQ device enumration" + self.sendResponse(Status.OKISH, message=message) def list_ports(self, request): port_labels = self.daq_server.list_ports() @@ -303,7 +311,7 @@ class DaqControlProtocol(LineReceiver): # pylint: disable=W0223 def sendLine(self, line): log.info('Responding: {}'.format(line)) - LineReceiver.sendLine(self, line.replace('\r\n','')) + LineReceiver.sendLine(self, line.replace('\r\n', '')) def _initiate_file_transfer(self, filepath): sender_factory = FileSenderFactory(filepath, self.factory) @@ -463,7 +471,7 @@ def run_server(): DaqRunner = DummyDaqRunner else: if not DaqRunner: - raise ImportError('DaqRunner') + raise __import_error # pylint: disable=raising-bad-type if args.verbose or args.debug: log.start_logging('DEBUG') else: @@ -471,7 +479,8 @@ def run_server(): server = DaqServer(args.directory) reactor.listenTCP(args.port, DaqFactory(server)).getHost() - hostname = socket.gethostbyname(socket.gethostname()) + #hostname = socket.gethostbyname(socket.gethostname()) + hostname = '192.168.108.131' log.info('Listening on {}:{}'.format(hostname, args.port)) reactor.run() diff --git a/wlauto/instrumentation/daq/__init__.py b/wlauto/instrumentation/daq/__init__.py index 60d759ee..16e11907 100644 --- a/wlauto/instrumentation/daq/__init__.py +++ b/wlauto/instrumentation/daq/__init__.py @@ -134,8 +134,8 @@ class Daq(Instrument): ] def initialize(self, context): - devices = self._execute_command('list_devices') - if not devices: + status, devices = self._execute_command('list_devices') + if status == daq.Status.OK and not devices: raise InstrumentError('DAQ: server did not report any devices registered with the driver.') self._results = OrderedDict() @@ -235,7 +235,7 @@ class Daq(Instrument): raise InstrumentError('DAQ: {}'.format(result.message)) else: raise InstrumentError('DAQ: Unexpected result: {} - {}'.format(result.status, result.message)) - return result.data + return (result.status, result.data) def _send_daq_command(q, *args, **kwargs):