1
0
mirror of https://github.com/ARM-software/devlib.git synced 2025-01-31 02:00:45 +00:00
devlib/devlib/connection.py
Metin Kaya 38d8053f2f devlib: Remove unused imports
Also import 'warnings' before 'wrapt' module to address a pylint
warning.

Signed-off-by: Metin Kaya <metin.kaya@arm.com>
2024-01-23 06:51:59 -08:00

723 lines
20 KiB
Python

# Copyright 2024 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABC, abstractmethod
from contextlib import contextmanager, nullcontext
from shlex import quote
import os
import signal
import subprocess
import threading
import time
import logging
import select
import fcntl
from devlib.utils.misc import InitCheckpoint
_KILL_TIMEOUT = 3
def _kill_pgid_cmd(pgid, sig, busybox):
return '{} kill -{} -{}'.format(busybox, sig.value, pgid)
def _popen_communicate(bg, popen, input, timeout):
try:
stdout, stderr = popen.communicate(input=input, timeout=timeout)
except subprocess.TimeoutExpired:
bg.cancel()
raise
ret = popen.returncode
if ret:
raise subprocess.CalledProcessError(
ret,
popen.args,
stdout,
stderr,
)
else:
return (stdout, stderr)
class ConnectionBase(InitCheckpoint):
"""
Base class for all connections.
"""
def __init__(
self,
poll_transfers=False,
start_transfer_poll_delay=30,
total_transfer_timeout=3600,
transfer_poll_period=30,
):
self._current_bg_cmds = set()
self._closed = False
self._close_lock = threading.Lock()
self.busybox = None
self.logger = logging.getLogger('Connection')
self.transfer_manager = TransferManager(
self,
start_transfer_poll_delay=start_transfer_poll_delay,
total_transfer_timeout=total_transfer_timeout,
transfer_poll_period=transfer_poll_period,
) if poll_transfers else NoopTransferManager()
def cancel_running_command(self):
bg_cmds = set(self._current_bg_cmds)
for bg_cmd in bg_cmds:
bg_cmd.cancel()
@abstractmethod
def _close(self):
"""
Close the connection.
The public :meth:`close` method makes sure that :meth:`_close` will
only be called once, and will serialize accesses to it if it happens to
be called from multiple threads at once.
"""
def close(self):
def finish_bg():
bg_cmds = set(self._current_bg_cmds)
n = len(bg_cmds)
if n:
self.logger.debug(f'Canceling {n} background commands before closing connection')
for bg_cmd in bg_cmds:
bg_cmd.cancel()
# Locking the closing allows any thread to safely call close() as long
# as the connection can be closed from a thread that is not the one it
# started its life in.
with self._close_lock:
if not self._closed:
finish_bg()
self._close()
self._closed = True
# Ideally, that should not be relied upon but that will improve the chances
# of the connection being properly cleaned up when it's not in use anymore.
def __del__(self):
# Since __del__ will be called if an exception is raised in __init__
# (e.g. we cannot connect), we only run close() when we are sure
# __init__ has completed successfully.
if self.initialized:
self.close()
class BackgroundCommand(ABC):
"""
Allows managing a running background command using a subset of the
:class:`subprocess.Popen` API.
Instances of this class can be used as context managers, with the same
semantic as :class:`subprocess.Popen`.
"""
def __init__(self, conn):
self.conn = conn
# Poll currently opened background commands on that connection to make
# them deregister themselves if they are completed. This avoids
# accumulating terminated commands and therefore leaking associated
# resources if the user is not careful and does not use the context
# manager API.
for bg_cmd in set(conn._current_bg_cmds):
try:
bg_cmd.poll()
# We don't want anything to fail here because of another command
except Exception:
pass
conn._current_bg_cmds.add(self)
def _deregister(self):
try:
self.conn._current_bg_cmds.remove(self)
except KeyError:
pass
@abstractmethod
def _send_signal(self, sig):
pass
def send_signal(self, sig):
"""
Send a POSIX signal to the background command's process group ID
(PGID).
:param signal: Signal to send.
:type signal: signal.Signals
"""
try:
return self._send_signal(sig)
finally:
# Deregister if the command has finished
self.poll()
def kill(self):
"""
Send SIGKILL to the background command.
"""
self.send_signal(signal.SIGKILL)
def cancel(self, kill_timeout=_KILL_TIMEOUT):
"""
Try to gracefully terminate the process by sending ``SIGTERM``, then
waiting for ``kill_timeout`` to send ``SIGKILL``.
"""
try:
if self.poll() is None:
return self._cancel(kill_timeout=kill_timeout)
finally:
self._deregister()
@abstractmethod
def _cancel(self, kill_timeout):
"""
Method to override in subclasses to implement :meth:`cancel`.
"""
pass
@abstractmethod
def _wait(self):
pass
def wait(self):
"""
Block until the background command completes, and return its exit code.
"""
try:
return self._wait()
finally:
self._deregister()
def communicate(self, input=b'', timeout=None):
"""
Block until the background command completes while reading stdout and stderr.
Return ``tuple(stdout, stderr)``. If the return code is non-zero,
raises a :exc:`subprocess.CalledProcessError` exception.
"""
try:
return self._communicate(input=input, timeout=timeout)
finally:
self.close()
@abstractmethod
def _communicate(self, input, timeout):
pass
@abstractmethod
def _poll(self):
pass
def poll(self):
"""
Return exit code if the command has exited, None otherwise.
"""
retcode = self._poll()
if retcode is not None:
self._deregister()
return retcode
@property
@abstractmethod
def stdin(self):
"""
File-like object connected to the background's command stdin.
"""
@property
@abstractmethod
def stdout(self):
"""
File-like object connected to the background's command stdout.
"""
@property
@abstractmethod
def stderr(self):
"""
File-like object connected to the background's command stderr.
"""
@property
@abstractmethod
def pid(self):
"""
Process Group ID (PGID) of the background command.
Since the command is usually wrapped in shell processes for IO
redirections, sudo etc, the PID cannot be assumed to be the actual PID
of the command passed by the user. It's is guaranteed to be a PGID
instead, which means signals sent to it as such will target all
subprocesses involved in executing that command.
"""
@abstractmethod
def _close(self):
pass
def close(self):
"""
Close all opened streams and then wait for command completion.
:returns: Exit code of the command.
.. note:: If the command is writing to its stdout/stderr, it might be
blocked on that and die when the streams are closed.
"""
try:
return self._close()
finally:
self._deregister()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
class PopenBackgroundCommand(BackgroundCommand):
"""
:class:`subprocess.Popen`-based background command.
"""
def __init__(self, conn, popen):
super().__init__(conn=conn)
self.popen = popen
def _send_signal(self, sig):
return os.killpg(self.popen.pid, sig)
@property
def stdin(self):
return self.popen.stdin
@property
def stdout(self):
return self.popen.stdout
@property
def stderr(self):
return self.popen.stderr
@property
def pid(self):
return self.popen.pid
def _wait(self):
return self.popen.wait()
def _communicate(self, input, timeout):
return _popen_communicate(self, self.popen, input, timeout)
def _poll(self):
return self.popen.poll()
def _cancel(self, kill_timeout):
popen = self.popen
os.killpg(os.getpgid(popen.pid), signal.SIGTERM)
try:
popen.wait(timeout=kill_timeout)
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(popen.pid), signal.SIGKILL)
def _close(self):
self.popen.__exit__(None, None, None)
return self.popen.returncode
def __enter__(self):
super().__enter__()
self.popen.__enter__()
return self
class ParamikoBackgroundCommand(BackgroundCommand):
"""
:mod:`paramiko`-based background command.
"""
def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread):
super().__init__(conn=conn)
self.chan = chan
self.as_root = as_root
self._pid = pid
self._stdin = stdin
self._stdout = stdout
self._stderr = stderr
self.redirect_thread = redirect_thread
self.cmd = cmd
def _send_signal(self, sig):
# If the command has already completed, we don't want to send a signal
# to another process that might have gotten that PID in the meantime.
if self.poll() is not None:
return
# Use -PGID to target a process group rather than just the process
# itself
cmd = _kill_pgid_cmd(self.pid, sig, self.conn.busybox)
self.conn.execute(cmd, as_root=self.as_root)
@property
def pid(self):
return self._pid
def _wait(self):
status = self.chan.recv_exit_status()
# Ensure that the redirection thread is finished copying the content
# from paramiko to the pipe.
self.redirect_thread.join()
return status
def _communicate(self, input, timeout):
stdout = self._stdout
stderr = self._stderr
stdin = self._stdin
chan = self.chan
# For some reason, file descriptors in the read-list of select() can
# still end up blocking in .read(), so make the non-blocking to avoid a
# deadlock. Since _communicate() will consume all input and all output
# until the command dies, we can do whatever we want with the pipe
# without affecting external users.
for s in (stdout, stderr):
fcntl.fcntl(s.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
out = {stdout: [], stderr: []}
ret = None
can_send = True
select_timeout = 1
if timeout is not None:
select_timeout = min(select_timeout, 1)
def create_out():
return (
b''.join(out[stdout]),
b''.join(out[stderr])
)
start = time.monotonic()
while ret is None:
# Even if ret is not None anymore, we need to drain the streams
ret = self.poll()
if timeout is not None and ret is None and time.monotonic() - start >= timeout:
self.cancel()
_stdout, _stderr = create_out()
raise subprocess.TimeoutExpired(self.cmd, timeout, _stdout, _stderr)
can_send &= (not chan.closed) & bool(input)
wlist = [chan] if can_send else []
if can_send and chan.send_ready():
try:
n = chan.send(input)
# stdin might have been closed already
except OSError:
can_send = False
chan.shutdown_write()
else:
input = input[n:]
if not input:
# Send EOF on stdin
chan.shutdown_write()
rs, ws, _ = select.select(
[x for x in (stdout, stderr) if not x.closed],
wlist,
[],
select_timeout,
)
for r in rs:
chunk = r.read()
if chunk:
out[r].append(chunk)
_stdout, _stderr = create_out()
if ret:
raise subprocess.CalledProcessError(
ret,
self.cmd,
_stdout,
_stderr,
)
else:
return (_stdout, _stderr)
def _poll(self):
# Wait for the redirection thread to finish, otherwise we would
# indicate the caller that the command is finished and that the streams
# are safe to drain, but actually the redirection thread is not
# finished yet, which would end up in lost data.
if self.redirect_thread.is_alive():
return None
elif self.chan.exit_status_ready():
return self.wait()
else:
return None
def _cancel(self, kill_timeout):
self.send_signal(signal.SIGTERM)
# Check if the command terminated quickly
time.sleep(10e-3)
# Otherwise wait for the full timeout and kill it
if self.poll() is None:
time.sleep(kill_timeout)
self.send_signal(signal.SIGKILL)
self.wait()
@property
def stdin(self):
return self._stdin
@property
def stdout(self):
return self._stdout
@property
def stderr(self):
return self._stderr
def _close(self):
for x in (self.stdin, self.stdout, self.stderr):
if x is not None:
x.close()
exit_code = self.wait()
thread = self.redirect_thread
if thread:
thread.join()
return exit_code
class AdbBackgroundCommand(BackgroundCommand):
"""
``adb``-based background command.
"""
def __init__(self, conn, adb_popen, pid, as_root):
super().__init__(conn=conn)
self.as_root = as_root
self.adb_popen = adb_popen
self._pid = pid
def _send_signal(self, sig):
self.conn.execute(
_kill_pgid_cmd(self.pid, sig, self.conn.busybox),
as_root=self.as_root,
)
@property
def stdin(self):
return self.adb_popen.stdin
@property
def stdout(self):
return self.adb_popen.stdout
@property
def stderr(self):
return self.adb_popen.stderr
@property
def pid(self):
return self._pid
def _wait(self):
return self.adb_popen.wait()
def _communicate(self, input, timeout):
return _popen_communicate(self, self.adb_popen, input, timeout)
def _poll(self):
return self.adb_popen.poll()
def _cancel(self, kill_timeout):
self.send_signal(signal.SIGTERM)
try:
self.adb_popen.wait(timeout=kill_timeout)
except subprocess.TimeoutExpired:
self.send_signal(signal.SIGKILL)
self.adb_popen.kill()
def _close(self):
self.adb_popen.__exit__(None, None, None)
return self.adb_popen.returncode
def __enter__(self):
super().__enter__()
self.adb_popen.__enter__()
return self
class TransferManager:
def __init__(self, conn, transfer_poll_period=30, start_transfer_poll_delay=30, total_transfer_timeout=3600):
self.conn = conn
self.transfer_poll_period = transfer_poll_period
self.total_transfer_timeout = total_transfer_timeout
self.start_transfer_poll_delay = start_transfer_poll_delay
self.logger = logging.getLogger('FileTransfer')
@contextmanager
def manage(self, sources, dest, direction, handle):
excep = None
stop_thread = threading.Event()
def monitor():
nonlocal excep
def cancel(reason):
self.logger.warning(
f'Cancelling file transfer {sources} -> {dest} due to: {reason}'
)
handle.cancel()
start_t = time.monotonic()
stop_thread.wait(self.start_transfer_poll_delay)
while not stop_thread.wait(self.transfer_poll_period):
if not handle.isactive():
cancel(reason='transfer inactive')
elif time.monotonic() - start_t > self.total_transfer_timeout:
cancel(reason='transfer timed out')
excep = TimeoutError(f'{direction}: {sources} -> {dest}')
m_thread = threading.Thread(target=monitor, daemon=True)
try:
m_thread.start()
yield self
finally:
stop_thread.set()
m_thread.join()
if excep is not None:
raise excep
class NoopTransferManager:
def manage(self, *args, **kwargs):
return nullcontext(self)
class TransferHandleBase(ABC):
def __init__(self, manager):
self.manager = manager
@property
def logger(self):
return self.manager.logger
@abstractmethod
def isactive(self):
pass
@abstractmethod
def cancel(self):
pass
class PopenTransferHandle(TransferHandleBase):
def __init__(self, bg_cmd, dest, direction, *args, **kwargs):
super().__init__(*args, **kwargs)
if direction == 'push':
sample_size = self._push_dest_size
elif direction == 'pull':
sample_size = self._pull_dest_size
else:
raise ValueError(f'Unknown direction: {direction}')
self.sample_size = lambda: sample_size(dest)
self.bg_cmd = bg_cmd
self.last_sample = 0
@staticmethod
def _pull_dest_size(dest):
if os.path.isdir(dest):
return sum(
os.stat(os.path.join(dirpath, f)).st_size
for dirpath, _, fnames in os.walk(dest)
for f in fnames
)
else:
return os.stat(dest).st_size
def _push_dest_size(self, dest):
conn = self.manager.conn
cmd = '{} du -s -- {}'.format(quote(conn.busybox), quote(dest))
out = conn.execute(cmd)
return int(out.split()[0])
def cancel(self):
self.bg_cmd.cancel()
def isactive(self):
try:
curr_size = self.sample_size()
except Exception as e:
self.logger.debug(f'File size polling failed: {e}')
return True
else:
self.logger.debug(f'Polled file transfer, destination size: {curr_size}')
if curr_size:
active = curr_size > self.last_sample
self.last_sample = curr_size
return active
# If the file is empty it will never grow in size, so we assume
# everything is going well.
else:
return True
class SSHTransferHandle(TransferHandleBase):
def __init__(self, handle, *args, **kwargs):
super().__init__(*args, **kwargs)
# SFTPClient or SSHClient
self.handle = handle
self.progressed = False
self.transferred = 0
self.to_transfer = 0
def cancel(self):
self.handle.close()
def isactive(self):
progressed = self.progressed
if progressed:
self.progressed = False
pc = (self.transferred / self.to_transfer) * 100
self.logger.debug(
f'Polled transfer: {pc:.2f}% [{self.transferred}B/{self.to_transfer}B]'
)
return progressed
def progress_cb(self, transferred, to_transfer):
self.progressed = True
self.transferred = transferred
self.to_transfer = to_transfer