import os from devlib import SerialTraceCollector from wa import Instrument, Parameter, InstrumentError, hostside class SerialMon(Instrument): name = 'serialmon' description = """ Records the traffic on a serial connection The traffic on a serial connection is monitored and logged to a file. In the event that the device is reset, the instrument will stop monitoring during the reset, and will reconnect once the reset has completed. This is to account for devices (i.e., the Juno) which utilise the serial connection to reset the board. """ parameters = [ Parameter('serial_port', kind=str, default="/dev/ttyS0", description=""" The serial device to monitor. """), Parameter('baudrate', kind=int, default=115200, description=""" The baud-rate to use when connecting to the serial connection. """), ] def __init__(self, target, **kwargs): super(SerialMon, self).__init__(target, **kwargs) self._collector = SerialTraceCollector(target, self.serial_port, self.baudrate) self._collector.reset() def start_logging(self, context): self.logger.debug("Acquiring serial port ({})".format(self.serial_port)) if self._collector.collecting: self.stop_logging(context) self._collector.start() def stop_logging(self, context, filename="serial.log", identifier="job"): self.logger.debug("Releasing serial port ({})".format(self.serial_port)) if self._collector.collecting: self._collector.stop() outpath = os.path.join(context.output_directory, filename) self._collector.get_trace(outpath) context.add_artifact("{}_serial_log".format(identifier), outpath, kind="log") def on_run_start(self, context): self.start_logging(context) def before_job_queue_execution(self, context): self.stop_logging(context, "preamble_serial.log", "preamble") def after_job_queue_execution(self, context): self.start_logging(context) def on_run_end(self, context): self.stop_logging(context, "postamble_serial.log", "postamble") def on_job_start(self, context): self.start_logging(context) def on_job_end(self, context): self.stop_logging(context) @hostside def before_reboot(self, context): self.stop_logging(context)