mirror of
https://github.com/ARM-software/workload-automation.git
synced 2025-09-03 03:42:35 +01:00
Initial commit of open source Workload Automation.
This commit is contained in:
214
wlauto/utils/uefi.py
Normal file
214
wlauto/utils/uefi.py
Normal file
@@ -0,0 +1,214 @@
|
||||
# Copyright 2014-2015 ARM Limited
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
|
||||
import re
|
||||
import time
|
||||
import logging
|
||||
|
||||
from wlauto.utils.serial_port import TIMEOUT
|
||||
|
||||
|
||||
logger = logging.getLogger('UEFI')
|
||||
|
||||
|
||||
class UefiMenu(object):
|
||||
"""
|
||||
Allows navigating UEFI menu over serial (it relies on a pexpect connection).
|
||||
|
||||
"""
|
||||
|
||||
option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M)
|
||||
prompt_regex = re.compile(r'^([^\r\n]+):\s*', re.M)
|
||||
invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M)
|
||||
|
||||
load_delay = 1 # seconds
|
||||
default_timeout = 60 # seconds
|
||||
|
||||
def __init__(self, conn, prompt='The default boot selection will start in'):
|
||||
"""
|
||||
:param conn: A serial connection as returned by ``pexect.spawn()``.
|
||||
:param prompt: The starting prompt to wait for during ``open()``.
|
||||
|
||||
"""
|
||||
self.conn = conn
|
||||
self.start_prompt = prompt
|
||||
self.options = {}
|
||||
self.prompt = None
|
||||
|
||||
def open(self, timeout=default_timeout):
|
||||
"""
|
||||
"Open" the UEFI menu by sending an interrupt on STDIN after seeing the
|
||||
starting prompt (configurable upon creation of the ``UefiMenu`` object.
|
||||
|
||||
"""
|
||||
self.conn.expect(self.start_prompt, timeout)
|
||||
self.conn.sendline('')
|
||||
time.sleep(self.load_delay)
|
||||
|
||||
def create_entry(self, name, image, args, fdt_support, initrd=None, fdt_path=None):
|
||||
"""Create a new UEFI entry using the parameters. The menu is assumed
|
||||
to be at the top level. Upon return, the menu will be at the top level."""
|
||||
logger.debug('Creating UEFI entry {}'.format(name))
|
||||
self.nudge()
|
||||
self.select('Boot Manager')
|
||||
self.select('Add Boot Device Entry')
|
||||
self.select('NOR Flash')
|
||||
self.enter(image)
|
||||
self.enter('y' if fdt_support else 'n')
|
||||
if initrd:
|
||||
self.enter('y')
|
||||
self.enter(initrd)
|
||||
else:
|
||||
self.enter('n')
|
||||
self.enter(args)
|
||||
self.enter(name)
|
||||
|
||||
if fdt_path:
|
||||
self.select('Update FDT path')
|
||||
self.enter(fdt_path)
|
||||
|
||||
self.select('Return to main menu')
|
||||
|
||||
def delete_entry(self, name):
|
||||
"""Delete the specified UEFI entry. The menu is assumed
|
||||
to be at the top level. Upon return, the menu will be at the top level."""
|
||||
logger.debug('Removing UEFI entry {}'.format(name))
|
||||
self.nudge()
|
||||
self.select('Boot Manager')
|
||||
self.select('Remove Boot Device Entry')
|
||||
self.select(name)
|
||||
self.select('Return to main menu')
|
||||
|
||||
def select(self, option, timeout=default_timeout):
|
||||
"""
|
||||
Select the specified option from the current menu.
|
||||
|
||||
:param option: Could be an ``int`` index of the option, or a string/regex to
|
||||
match option text against.
|
||||
:param timeout: If a non-``int`` option is specified, the option list may need
|
||||
need to be parsed (if it hasn't been already), this may block
|
||||
and the timeout is used to cap that , resulting in a ``TIMEOUT``
|
||||
exception.
|
||||
:param delay: A fixed delay to wait after sending the input to the serial connection.
|
||||
This should be set if input this action is known to result in a
|
||||
long-running operation.
|
||||
|
||||
"""
|
||||
if isinstance(option, basestring):
|
||||
option = self.get_option_index(option, timeout)
|
||||
self.enter(option)
|
||||
|
||||
def enter(self, value, delay=load_delay):
|
||||
"""Like ``select()`` except no resolution is performed -- the value is sent directly
|
||||
to the serial connection."""
|
||||
# Empty the buffer first, so that only response to the input about to
|
||||
# be sent will be processed by subsequent commands.
|
||||
value = str(value)
|
||||
self._reset()
|
||||
self.write_characters(value)
|
||||
# TODO: in case the value is long an complicated, things may get
|
||||
# screwed up (e.g. there may be line breaks injected), additionally,
|
||||
# special chars might cause regex to fail. To avoid these issues i'm
|
||||
# only matching against the first 5 chars of the value. This is
|
||||
# entirely arbitrary and I'll probably have to find a better way of
|
||||
# doing this at some point.
|
||||
self.conn.expect(value[:5], timeout=delay)
|
||||
time.sleep(self.load_delay)
|
||||
|
||||
def read_menu(self, timeout=default_timeout):
|
||||
"""Parse serial output to get the menu options and the following prompt."""
|
||||
attempting_timeout_retry = False
|
||||
attempting_invalid_retry = False
|
||||
while True:
|
||||
index = self.conn.expect([self.option_regex, self.prompt_regex, self.invalid_regex, TIMEOUT],
|
||||
timeout=timeout)
|
||||
match = self.conn.match
|
||||
if index == 0: # matched menu option
|
||||
self.options[match.group(1)] = match.group(2)
|
||||
elif index == 1: # matched prompt
|
||||
self.prompt = match.group(1)
|
||||
break
|
||||
elif index == 2: # matched invalid selection
|
||||
# We've sent an invalid input (which includes an empty line) at
|
||||
# the top-level menu. To get back the menu options, it seems we
|
||||
# need to enter what the error reports as the max + 1, so...
|
||||
if not attempting_invalid_retry:
|
||||
attempting_invalid_retry = True
|
||||
val = int(match.group(1)) + 1
|
||||
self.empty_buffer()
|
||||
self.enter(val)
|
||||
else: # OK, that didn't work; panic!
|
||||
raise RuntimeError('Could not read menu entries stuck on "{}" prompt'.format(self.prompt))
|
||||
elif index == 3: # timed out
|
||||
if not attempting_timeout_retry:
|
||||
attempting_timeout_retry = True
|
||||
self.nudge()
|
||||
else: # Didn't help. Run away!
|
||||
raise RuntimeError('Did not see a valid UEFI menu.')
|
||||
else:
|
||||
raise AssertionError('Unexpected response waiting for UEFI menu') # should never get here
|
||||
|
||||
def get_option_index(self, text, timeout=default_timeout):
|
||||
"""Returns the menu index of the specified option text (uses regex matching). If the option
|
||||
is not in the current menu, ``LookupError`` will be raised."""
|
||||
if not self.prompt:
|
||||
self.read_menu(timeout)
|
||||
for k, v in self.options.iteritems():
|
||||
if re.search(text, v):
|
||||
return k
|
||||
raise LookupError(text)
|
||||
|
||||
def has_option(self, text, timeout=default_timeout):
|
||||
"""Returns ``True`` if at least one of the options in the current menu has
|
||||
matched (using regex) the specified text."""
|
||||
try:
|
||||
self.get_option_index(text, timeout)
|
||||
return True
|
||||
except LookupError:
|
||||
return False
|
||||
|
||||
def nudge(self):
|
||||
"""Send a little nudge to ensure there is something to read. This is useful when you're not
|
||||
sure if all out put from the serial has been read already."""
|
||||
self.enter('')
|
||||
|
||||
def empty_buffer(self):
|
||||
"""Read everything from the serial and clear the internal pexpect buffer. This ensures
|
||||
that the next ``expect()`` call will time out (unless further input will be sent to the
|
||||
serial beforehand. This is used to create a "known" state and avoid unexpected matches."""
|
||||
try:
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
self.conn.read_nonblocking(size=1024, timeout=0.1)
|
||||
except TIMEOUT:
|
||||
pass
|
||||
self.conn.buffer = ''
|
||||
|
||||
def write_characters(self, line):
|
||||
"""Write a single line out to serial charcter-by-character. This will ensure that nothing will
|
||||
be dropped for longer lines."""
|
||||
line = line.rstrip('\r\n')
|
||||
for c in line:
|
||||
self.conn.send(c)
|
||||
time.sleep(0.05)
|
||||
self.conn.sendline('')
|
||||
|
||||
def _reset(self):
|
||||
self.options = {}
|
||||
self.prompt = None
|
||||
self.empty_buffer()
|
||||
|
||||
|
Reference in New Issue
Block a user