mirror of
https://github.com/ARM-software/workload-automation.git
synced 2025-01-19 04:21:17 +00:00
112 lines
3.7 KiB
Python
112 lines
3.7 KiB
Python
|
# Copyright 2013-2015 ARM Limited
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
#
|
||
|
|
||
|
|
||
|
import time
|
||
|
from contextlib import contextmanager
|
||
|
|
||
|
import serial
|
||
|
import fdpexpect
|
||
|
# Adding pexpect exceptions into this module's namespace
|
||
|
from pexpect import EOF, TIMEOUT # NOQA pylint: disable=W0611
|
||
|
|
||
|
from wlauto.exceptions import HostError
|
||
|
from wlauto.utils.log import LogWriter
|
||
|
|
||
|
|
||
|
class PexpectLogger(LogWriter):
|
||
|
|
||
|
def __init__(self, kind):
|
||
|
"""
|
||
|
File-like object class designed to be used for logging with pexpect or
|
||
|
fdpexect. Each complete line (terminated by new line character) gets logged
|
||
|
at DEBUG level. In complete lines are buffered until the next new line.
|
||
|
|
||
|
:param kind: This specified which of pexpect logfile attributes this logger
|
||
|
will be set to. It should be "read" for logfile_read, "send" for
|
||
|
logfile_send, and "" (emtpy string) for logfile.
|
||
|
|
||
|
"""
|
||
|
if kind not in ('read', 'send', ''):
|
||
|
raise ValueError('kind must be "read", "send" or ""; got {}'.format(kind))
|
||
|
self.kind = kind
|
||
|
logger_name = 'serial_{}'.format(kind) if kind else 'serial'
|
||
|
super(PexpectLogger, self).__init__(logger_name)
|
||
|
|
||
|
|
||
|
def pulse_dtr(conn, state=True, duration=0.1):
|
||
|
"""Set the DTR line of the specified serial connection to the specified state
|
||
|
for the specified duration (note: the initial state of the line is *not* checked."""
|
||
|
conn.setDTR(state)
|
||
|
time.sleep(duration)
|
||
|
conn.setDTR(not state)
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def open_serial_connection(timeout, get_conn=False, init_dtr=None, *args, **kwargs):
|
||
|
"""
|
||
|
Opens a serial connection to a device.
|
||
|
|
||
|
:param timeout: timeout for the fdpexpect spawn object.
|
||
|
:param conn: ``bool`` that specfies whether the underlying connection
|
||
|
object should be yielded as well.
|
||
|
:param init_dtr: specifies the initial DTR state stat should be set.
|
||
|
|
||
|
All arguments are passed into the __init__ of serial.Serial. See
|
||
|
pyserial documentation for details:
|
||
|
|
||
|
http://pyserial.sourceforge.net/pyserial_api.html#serial.Serial
|
||
|
|
||
|
:returns: a pexpect spawn object connected to the device.
|
||
|
See: http://pexpect.sourceforge.net/pexpect.html
|
||
|
|
||
|
"""
|
||
|
if init_dtr is not None:
|
||
|
kwargs['dsrdtr'] = True
|
||
|
try:
|
||
|
conn = serial.Serial(*args, **kwargs)
|
||
|
except serial.SerialException as e:
|
||
|
raise HostError(e.message)
|
||
|
if init_dtr is not None:
|
||
|
conn.setDTR(init_dtr)
|
||
|
conn.nonblocking()
|
||
|
conn.flushOutput()
|
||
|
target = fdpexpect.fdspawn(conn.fileno(), timeout=timeout)
|
||
|
target.logfile_read = PexpectLogger('read')
|
||
|
target.logfile_send = PexpectLogger('send')
|
||
|
|
||
|
# Monkey-patching sendline to introduce a short delay after
|
||
|
# chacters are sent to the serial. If two sendline s are issued
|
||
|
# one after another the second one might start putting characters
|
||
|
# into the serial device before the first one has finished, causing
|
||
|
# corruption. The delay prevents that.
|
||
|
tsln = target.sendline
|
||
|
|
||
|
def sendline(x):
|
||
|
tsln(x)
|
||
|
time.sleep(0.1)
|
||
|
|
||
|
target.sendline = sendline
|
||
|
|
||
|
if get_conn:
|
||
|
yield target, conn
|
||
|
else:
|
||
|
yield target
|
||
|
|
||
|
target.close() # Closes the file descriptor used by the conn.
|
||
|
del conn
|
||
|
|
||
|
|