1
0
mirror of https://github.com/ARM-software/devlib.git synced 2025-01-31 02:00:45 +00:00

connection: Make BackgroundCommand deregister itself

Instead of loosely tracking the current BackgroundCommand for a
connection in _current_bg_cmds WeakSet attribute, use a normal set and
make the BackgroundCommand deregister itself upon termination.

This allows canceling any outstanding BackgroundCommand when the
connection is closed. Currently, destroying a BackgroundCommand will not
cancel the command but devlib will simply loose track of it, and some
threads will likely fail in the background if they try to use the now
broken connection.
This commit is contained in:
Douglas Raillard 2023-04-06 21:07:29 +01:00 committed by Marc Bonnici
parent 069d2322f1
commit 1239fd922e
4 changed files with 57 additions and 28 deletions

View File

@ -62,7 +62,7 @@ class ConnectionBase(InitCheckpoint):
Base class for all connections. Base class for all connections.
""" """
def __init__(self): def __init__(self):
self._current_bg_cmds = WeakSet() self._current_bg_cmds = set()
self._closed = False self._closed = False
self._close_lock = threading.Lock() self._close_lock = threading.Lock()
self.busybox = None self.busybox = None
@ -123,8 +123,18 @@ class BackgroundCommand(ABC):
def __init__(self, conn): def __init__(self, conn):
self.conn = conn self.conn = conn
conn._current_bg_cmds.add(self)
def _deregister(self):
try:
self.conn._current_bg_cmds.remove(self)
except KeyError:
pass
@abstractmethod @abstractmethod
def _send_signal(self, sig):
pass
def send_signal(self, sig): def send_signal(self, sig):
""" """
Send a POSIX signal to the background command's process group ID Send a POSIX signal to the background command's process group ID
@ -133,6 +143,11 @@ class BackgroundCommand(ABC):
:param signal: Signal to send. :param signal: Signal to send.
:type signal: signal.Signals :type signal: signal.Signals
""" """
try:
self._send_signal(signal.SIGKILL)
finally:
# Deregister if the command has finished
self.poll()
def kill(self): def kill(self):
""" """
@ -145,8 +160,11 @@ class BackgroundCommand(ABC):
Try to gracefully terminate the process by sending ``SIGTERM``, then Try to gracefully terminate the process by sending ``SIGTERM``, then
waiting for ``kill_timeout`` to send ``SIGKILL``. waiting for ``kill_timeout`` to send ``SIGKILL``.
""" """
if self.poll() is None: try:
self._cancel(kill_timeout=kill_timeout) if self.poll() is None:
self._cancel(kill_timeout=kill_timeout)
finally:
self._deregister()
@abstractmethod @abstractmethod
def _cancel(self, kill_timeout): def _cancel(self, kill_timeout):
@ -156,10 +174,17 @@ class BackgroundCommand(ABC):
pass pass
@abstractmethod @abstractmethod
def _wait(self):
pass
def wait(self): def wait(self):
""" """
Block until the background command completes, and return its exit code. Block until the background command completes, and return its exit code.
""" """
try:
self._wait()
finally:
self._deregister()
def communicate(self, input=b'', timeout=None): def communicate(self, input=b'', timeout=None):
""" """
@ -177,10 +202,17 @@ class BackgroundCommand(ABC):
pass pass
@abstractmethod @abstractmethod
def _poll(self):
pass
def poll(self): def poll(self):
""" """
Return exit code if the command has exited, None otherwise. Return exit code if the command has exited, None otherwise.
""" """
retcode = self._poll()
if retcode is not None:
self._deregister()
return retcode
@property @property
@abstractmethod @abstractmethod
@ -217,6 +249,9 @@ class BackgroundCommand(ABC):
""" """
@abstractmethod @abstractmethod
def _close(self):
pass
def close(self): def close(self):
""" """
Close all opened streams and then wait for command completion. Close all opened streams and then wait for command completion.
@ -226,6 +261,10 @@ class BackgroundCommand(ABC):
.. note:: If the command is writing to its stdout/stderr, it might be .. note:: If the command is writing to its stdout/stderr, it might be
blocked on that and die when the streams are closed. blocked on that and die when the streams are closed.
""" """
try:
return self._close()
finally:
self._deregister()
def __enter__(self): def __enter__(self):
return self return self
@ -243,7 +282,7 @@ class PopenBackgroundCommand(BackgroundCommand):
super().__init__(conn=conn) super().__init__(conn=conn)
self.popen = popen self.popen = popen
def send_signal(self, sig): def _send_signal(self, sig):
return os.killpg(self.popen.pid, sig) return os.killpg(self.popen.pid, sig)
@property @property
@ -262,13 +301,13 @@ class PopenBackgroundCommand(BackgroundCommand):
def pid(self): def pid(self):
return self.popen.pid return self.popen.pid
def wait(self): def _wait(self):
return self.popen.wait() return self.popen.wait()
def _communicate(self, input, timeout): def _communicate(self, input, timeout):
return _popen_communicate(self, self.popen, input, timeout) return _popen_communicate(self, self.popen, input, timeout)
def poll(self): def _poll(self):
return self.popen.poll() return self.popen.poll()
def _cancel(self, kill_timeout): def _cancel(self, kill_timeout):
@ -279,17 +318,15 @@ class PopenBackgroundCommand(BackgroundCommand):
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
os.killpg(os.getpgid(popen.pid), signal.SIGKILL) os.killpg(os.getpgid(popen.pid), signal.SIGKILL)
def close(self): def _close(self):
self.popen.__exit__(None, None, None) self.popen.__exit__(None, None, None)
return self.popen.returncode return self.popen.returncode
def __enter__(self): def __enter__(self):
super().__enter__()
self.popen.__enter__() self.popen.__enter__()
return self return self
def __exit__(self, *args, **kwargs):
self.popen.__exit__(*args, **kwargs)
class ParamikoBackgroundCommand(BackgroundCommand): class ParamikoBackgroundCommand(BackgroundCommand):
""" """
@ -306,7 +343,7 @@ class ParamikoBackgroundCommand(BackgroundCommand):
self.redirect_thread = redirect_thread self.redirect_thread = redirect_thread
self.cmd = cmd self.cmd = cmd
def send_signal(self, sig): def _send_signal(self, sig):
# If the command has already completed, we don't want to send a signal # If the command has already completed, we don't want to send a signal
# to another process that might have gotten that PID in the meantime. # to another process that might have gotten that PID in the meantime.
if self.poll() is not None: if self.poll() is not None:
@ -320,7 +357,7 @@ class ParamikoBackgroundCommand(BackgroundCommand):
def pid(self): def pid(self):
return self._pid return self._pid
def wait(self): def _wait(self):
status = self.chan.recv_exit_status() status = self.chan.recv_exit_status()
# Ensure that the redirection thread is finished copying the content # Ensure that the redirection thread is finished copying the content
# from paramiko to the pipe. # from paramiko to the pipe.
@ -406,7 +443,7 @@ class ParamikoBackgroundCommand(BackgroundCommand):
else: else:
return (_stdout, _stderr) return (_stdout, _stderr)
def poll(self): def _poll(self):
# Wait for the redirection thread to finish, otherwise we would # Wait for the redirection thread to finish, otherwise we would
# indicate the caller that the command is finished and that the streams # indicate the caller that the command is finished and that the streams
# are safe to drain, but actually the redirection thread is not # are safe to drain, but actually the redirection thread is not
@ -440,7 +477,7 @@ class ParamikoBackgroundCommand(BackgroundCommand):
def stderr(self): def stderr(self):
return self._stderr return self._stderr
def close(self): def _close(self):
for x in (self.stdin, self.stdout, self.stderr): for x in (self.stdin, self.stdout, self.stderr):
if x is not None: if x is not None:
x.close() x.close()
@ -464,7 +501,7 @@ class AdbBackgroundCommand(BackgroundCommand):
self.adb_popen = adb_popen self.adb_popen = adb_popen
self._pid = pid self._pid = pid
def send_signal(self, sig): def _send_signal(self, sig):
self.conn.execute( self.conn.execute(
_kill_pgid_cmd(self.pid, sig, self.conn.busybox), _kill_pgid_cmd(self.pid, sig, self.conn.busybox),
as_root=self.as_root, as_root=self.as_root,
@ -486,14 +523,13 @@ class AdbBackgroundCommand(BackgroundCommand):
def pid(self): def pid(self):
return self._pid return self._pid
def wait(self): def _wait(self):
return self.adb_popen.wait() return self.adb_popen.wait()
def _communicate(self, input, timeout): def _communicate(self, input, timeout):
return _popen_communicate(self, self.adb_popen, input, timeout) return _popen_communicate(self, self.adb_popen, input, timeout)
def _poll(self):
def poll(self):
return self.adb_popen.poll() return self.adb_popen.poll()
def _cancel(self, kill_timeout): def _cancel(self, kill_timeout):
@ -504,17 +540,15 @@ class AdbBackgroundCommand(BackgroundCommand):
self.send_signal(signal.SIGKILL) self.send_signal(signal.SIGKILL)
self.adb_popen.kill() self.adb_popen.kill()
def close(self): def _close(self):
self.adb_popen.__exit__(None, None, None) self.adb_popen.__exit__(None, None, None)
return self.adb_popen.returncode return self.adb_popen.returncode
def __enter__(self): def __enter__(self):
super().__enter__()
self.adb_popen.__enter__() self.adb_popen.__enter__()
return self return self
def __exit__(self, *args, **kwargs):
self.adb_popen.__exit__(*args, **kwargs)
class TransferManagerBase(ABC): class TransferManagerBase(ABC):

View File

@ -142,7 +142,6 @@ class LocalConnection(ConnectionBase):
preexec_fn=preexec_fn, preexec_fn=preexec_fn,
) )
bg_cmd = PopenBackgroundCommand(self, popen) bg_cmd = PopenBackgroundCommand(self, popen)
self._current_bg_cmds.add(bg_cmd)
return bg_cmd return bg_cmd
def _close(self): def _close(self):

View File

@ -368,7 +368,6 @@ class AdbConnection(ConnectionBase):
if as_root and self.connected_as_root: if as_root and self.connected_as_root:
as_root = False as_root = False
bg_cmd = self._background(command, stdout, stderr, as_root) bg_cmd = self._background(command, stdout, stderr, as_root)
self._current_bg_cmds.add(bg_cmd)
return bg_cmd return bg_cmd
def _background(self, command, stdout, stderr, as_root): def _background(self, command, stdout, stderr, as_root):

View File

@ -553,10 +553,7 @@ class SshConnection(SshConnectionBase):
def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False):
with _handle_paramiko_exceptions(command): with _handle_paramiko_exceptions(command):
bg_cmd = self._background(command, stdout, stderr, as_root) return self._background(command, stdout, stderr, as_root)
self._current_bg_cmds.add(bg_cmd)
return bg_cmd
def _background(self, command, stdout, stderr, as_root): def _background(self, command, stdout, stderr, as_root):
orig_command = command orig_command = command