1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-19 12:24:32 +00:00

74 lines
2.5 KiB
Python
Raw Normal View History

import os
from devlib import SerialTraceCollector
from wa import Instrument, Parameter, InstrumentError, hostside
class SerialMon(Instrument):
name = 'serialmon'
description = """
Records the traffic on a serial connection
The traffic on a serial connection is monitored and logged to a
file. In the event that the device is reset, the instrument will
stop monitoring during the reset, and will reconnect once the
reset has completed. This is to account for devices (i.e., the
Juno) which utilise the serial connection to reset the board.
"""
parameters = [
Parameter('serial_port', kind=str, default="/dev/ttyS0",
description="""
The serial device to monitor.
"""),
Parameter('baudrate', kind=int, default=115200,
description="""
The baud-rate to use when connecting to the serial connection.
"""),
]
def __init__(self, target, **kwargs):
super(SerialMon, self).__init__(target, **kwargs)
self._collector = SerialTraceCollector(target, self.serial_port, self.baudrate)
self._collector.reset()
def start_logging(self, context):
self.logger.debug("Acquiring serial port ({})".format(self.serial_port))
if self._collector.collecting:
self.stop_logging(context)
self._collector.start()
def stop_logging(self, context, filename="serial.log", identifier="job"):
self.logger.debug("Releasing serial port ({})".format(self.serial_port))
if self._collector.collecting:
self._collector.stop()
outpath = os.path.join(context.output_directory, filename)
self._collector.get_trace(outpath)
context.add_artifact("{}_serial_log".format(identifier),
outpath, kind="log")
def on_run_start(self, context):
self.start_logging(context)
def before_job_queue_execution(self, context):
self.stop_logging(context, "preamble_serial.log", "preamble")
def after_job_queue_execution(self, context):
self.start_logging(context)
def on_run_end(self, context):
self.stop_logging(context, "postamble_serial.log", "postamble")
def on_job_start(self, context):
self.start_logging(context)
def on_job_end(self, context):
self.stop_logging(context)
@hostside
def before_reboot(self, context):
self.stop_logging(context)