1
0
mirror of https://github.com/ARM-software/devlib.git synced 2025-01-31 02:00:45 +00:00
devlib/devlib/connection.py
Marc Bonnici bc9478c324 connection/send_signal: Use signal value instead of name
Some targets do not support killing via signal name so use the signal
number for greater compatibility.
2020-07-06 17:24:48 +01:00

352 lines
9.1 KiB
Python

# Copyright 2019 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import time
import subprocess
import signal
import threading
from weakref import WeakSet
from abc import ABC, abstractmethod
from devlib.utils.misc import InitCheckpoint
_KILL_TIMEOUT = 3
def _kill_pgid_cmd(pgid, sig):
return 'kill -{} -{}'.format(sig.value, pgid)
class ConnectionBase(InitCheckpoint):
"""
Base class for all connections.
"""
def __init__(self):
self._current_bg_cmds = WeakSet()
self._closed = False
self._close_lock = threading.Lock()
def cancel_running_command(self):
bg_cmds = set(self._current_bg_cmds)
for bg_cmd in bg_cmds:
bg_cmd.cancel()
@abstractmethod
def _close(self):
"""
Close the connection.
The public :meth:`close` method makes sure that :meth:`_close` will
only be called once, and will serialize accesses to it if it happens to
be called from multiple threads at once.
"""
def close(self):
# Locking the closing allows any thread to safely call close() as long
# as the connection can be closed from a thread that is not the one it
# started its life in.
with self._close_lock:
if not self._closed:
self._close()
self._closed = True
# Ideally, that should not be relied upon but that will improve the chances
# of the connection being properly cleaned up when it's not in use anymore.
def __del__(self):
# Since __del__ will be called if an exception is raised in __init__
# (e.g. we cannot connect), we only run close() when we are sure
# __init__ has completed successfully.
if self.initialized:
self.close()
class BackgroundCommand(ABC):
"""
Allows managing a running background command using a subset of the
:class:`subprocess.Popen` API.
Instances of this class can be used as context managers, with the same
semantic as :class:`subprocess.Popen`.
"""
@abstractmethod
def send_signal(self, sig):
"""
Send a POSIX signal to the background command's process group ID
(PGID).
:param signal: Signal to send.
:type signal: signal.Signals
"""
def kill(self):
"""
Send SIGKILL to the background command.
"""
self.send_signal(signal.SIGKILL)
@abstractmethod
def cancel(self, kill_timeout=_KILL_TIMEOUT):
"""
Try to gracefully terminate the process by sending ``SIGTERM``, then
waiting for ``kill_timeout`` to send ``SIGKILL``.
"""
@abstractmethod
def wait(self):
"""
Block until the background command completes, and return its exit code.
"""
@abstractmethod
def poll(self):
"""
Return exit code if the command has exited, None otherwise.
"""
@property
@abstractmethod
def stdin(self):
"""
File-like object connected to the background's command stdin.
"""
@property
@abstractmethod
def stdout(self):
"""
File-like object connected to the background's command stdout.
"""
@property
@abstractmethod
def stderr(self):
"""
File-like object connected to the background's command stderr.
"""
@property
@abstractmethod
def pid(self):
"""
Process Group ID (PGID) of the background command.
Since the command is usually wrapped in shell processes for IO
redirections, sudo etc, the PID cannot be assumed to be the actual PID
of the command passed by the user. It's is guaranteed to be a PGID
instead, which means signals sent to it as such will target all
subprocesses involved in executing that command.
"""
@abstractmethod
def close(self):
"""
Close all opened streams and then wait for command completion.
:returns: Exit code of the command.
.. note:: If the command is writing to its stdout/stderr, it might be
blocked on that and die when the streams are closed.
"""
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
class PopenBackgroundCommand(BackgroundCommand):
"""
:class:`subprocess.Popen`-based background command.
"""
def __init__(self, popen):
self.popen = popen
def send_signal(self, sig):
return os.killpg(self.popen.pid, sig)
@property
def stdin(self):
return self.popen.stdin
@property
def stdout(self):
return self.popen.stdout
@property
def stderr(self):
return self.popen.stderr
@property
def pid(self):
return self.popen.pid
def wait(self):
return self.popen.wait()
def poll(self):
return self.popen.poll()
def cancel(self, kill_timeout=_KILL_TIMEOUT):
popen = self.popen
popen.send_signal(signal.SIGTERM)
try:
popen.wait(timeout=_KILL_TIMEOUT)
except subprocess.TimeoutExpired:
popen.kill()
def close(self):
self.popen.__exit__(None, None, None)
return self.popen.returncode
def __enter__(self):
self.popen.__enter__()
return self
def __exit__(self, *args, **kwargs):
self.popen.__exit__(*args, **kwargs)
class ParamikoBackgroundCommand(BackgroundCommand):
"""
:mod:`paramiko`-based background command.
"""
def __init__(self, conn, chan, pid, as_root, stdin, stdout, stderr, redirect_thread):
self.chan = chan
self.as_root = as_root
self.conn = conn
self._pid = pid
self._stdin = stdin
self._stdout = stdout
self._stderr = stderr
self.redirect_thread = redirect_thread
def send_signal(self, sig):
# If the command has already completed, we don't want to send a signal
# to another process that might have gotten that PID in the meantime.
if self.poll() is not None:
return
# Use -PGID to target a process group rather than just the process
# itself
cmd = _kill_pgid_cmd(self.pid, sig)
self.conn.execute(cmd, as_root=self.as_root)
@property
def pid(self):
return self._pid
def wait(self):
return self.chan.recv_exit_status()
def poll(self):
if self.chan.exit_status_ready():
return self.wait()
else:
return None
def cancel(self, kill_timeout=_KILL_TIMEOUT):
self.send_signal(signal.SIGTERM)
# Check if the command terminated quickly
time.sleep(10e-3)
# Otherwise wait for the full timeout and kill it
if self.poll() is None:
time.sleep(kill_timeout)
self.send_signal(signal.SIGKILL)
self.wait()
@property
def stdin(self):
return self._stdin
@property
def stdout(self):
return self._stdout
@property
def stderr(self):
return self._stderr
def close(self):
for x in (self.stdin, self.stdout, self.stderr):
if x is not None:
x.close()
exit_code = self.wait()
thread = self.redirect_thread
if thread:
thread.join()
return exit_code
class AdbBackgroundCommand(BackgroundCommand):
"""
``adb``-based background command.
"""
def __init__(self, conn, adb_popen, pid, as_root):
self.conn = conn
self.as_root = as_root
self.adb_popen = adb_popen
self._pid = pid
def send_signal(self, sig):
self.conn.execute(
_kill_pgid_cmd(self.pid, sig),
as_root=self.as_root,
)
@property
def stdin(self):
return self.adb_popen.stdin
@property
def stdout(self):
return self.adb_popen.stdout
@property
def stderr(self):
return self.adb_popen.stderr
@property
def pid(self):
return self._pid
def wait(self):
return self.adb_popen.wait()
def poll(self):
return self.adb_popen.poll()
def cancel(self, kill_timeout=_KILL_TIMEOUT):
self.send_signal(signal.SIGTERM)
try:
self.adb_popen.wait(timeout=_KILL_TIMEOUT)
except subprocess.TimeoutExpired:
self.send_signal(signal.SIGKILL)
self.adb_popen.kill()
def close(self):
self.adb_popen.__exit__(None, None, None)
return self.adb_popen.returncode
def __enter__(self):
self.adb_popen.__enter__()
return self
def __exit__(self, *args, **kwargs):
self.adb_popen.__exit__(*args, **kwargs)