From 1239fd922e097e00c219c539bc4e6e3349e9130a Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 6 Apr 2023 21:07:29 +0100 Subject: [PATCH] connection: Make BackgroundCommand deregister itself Instead of loosely tracking the current BackgroundCommand for a connection in _current_bg_cmds WeakSet attribute, use a normal set and make the BackgroundCommand deregister itself upon termination. This allows canceling any outstanding BackgroundCommand when the connection is closed. Currently, destroying a BackgroundCommand will not cancel the command but devlib will simply loose track of it, and some threads will likely fail in the background if they try to use the now broken connection. --- devlib/connection.py | 78 +++++++++++++++++++++++++++++------------ devlib/host.py | 1 - devlib/utils/android.py | 1 - devlib/utils/ssh.py | 5 +-- 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/devlib/connection.py b/devlib/connection.py index b10fde3..f098a54 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -62,7 +62,7 @@ class ConnectionBase(InitCheckpoint): Base class for all connections. """ def __init__(self): - self._current_bg_cmds = WeakSet() + self._current_bg_cmds = set() self._closed = False self._close_lock = threading.Lock() self.busybox = None @@ -123,8 +123,18 @@ class BackgroundCommand(ABC): def __init__(self, conn): self.conn = conn + conn._current_bg_cmds.add(self) + + def _deregister(self): + try: + self.conn._current_bg_cmds.remove(self) + except KeyError: + pass @abstractmethod + def _send_signal(self, sig): + pass + def send_signal(self, sig): """ Send a POSIX signal to the background command's process group ID @@ -133,6 +143,11 @@ class BackgroundCommand(ABC): :param signal: Signal to send. :type signal: signal.Signals """ + try: + self._send_signal(signal.SIGKILL) + finally: + # Deregister if the command has finished + self.poll() def kill(self): """ @@ -145,8 +160,11 @@ class BackgroundCommand(ABC): Try to gracefully terminate the process by sending ``SIGTERM``, then waiting for ``kill_timeout`` to send ``SIGKILL``. """ - if self.poll() is None: - self._cancel(kill_timeout=kill_timeout) + try: + if self.poll() is None: + self._cancel(kill_timeout=kill_timeout) + finally: + self._deregister() @abstractmethod def _cancel(self, kill_timeout): @@ -156,10 +174,17 @@ class BackgroundCommand(ABC): pass @abstractmethod + def _wait(self): + pass + def wait(self): """ Block until the background command completes, and return its exit code. """ + try: + self._wait() + finally: + self._deregister() def communicate(self, input=b'', timeout=None): """ @@ -177,10 +202,17 @@ class BackgroundCommand(ABC): pass @abstractmethod + def _poll(self): + pass + def poll(self): """ Return exit code if the command has exited, None otherwise. """ + retcode = self._poll() + if retcode is not None: + self._deregister() + return retcode @property @abstractmethod @@ -217,6 +249,9 @@ class BackgroundCommand(ABC): """ @abstractmethod + def _close(self): + pass + def close(self): """ Close all opened streams and then wait for command completion. @@ -226,6 +261,10 @@ class BackgroundCommand(ABC): .. note:: If the command is writing to its stdout/stderr, it might be blocked on that and die when the streams are closed. """ + try: + return self._close() + finally: + self._deregister() def __enter__(self): return self @@ -243,7 +282,7 @@ class PopenBackgroundCommand(BackgroundCommand): super().__init__(conn=conn) self.popen = popen - def send_signal(self, sig): + def _send_signal(self, sig): return os.killpg(self.popen.pid, sig) @property @@ -262,13 +301,13 @@ class PopenBackgroundCommand(BackgroundCommand): def pid(self): return self.popen.pid - def wait(self): + def _wait(self): return self.popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.popen, input, timeout) - def poll(self): + def _poll(self): return self.popen.poll() def _cancel(self, kill_timeout): @@ -279,17 +318,15 @@ class PopenBackgroundCommand(BackgroundCommand): except subprocess.TimeoutExpired: os.killpg(os.getpgid(popen.pid), signal.SIGKILL) - def close(self): + def _close(self): self.popen.__exit__(None, None, None) return self.popen.returncode def __enter__(self): + super().__enter__() self.popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.popen.__exit__(*args, **kwargs) - class ParamikoBackgroundCommand(BackgroundCommand): """ @@ -306,7 +343,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): self.redirect_thread = redirect_thread self.cmd = cmd - def send_signal(self, sig): + def _send_signal(self, sig): # If the command has already completed, we don't want to send a signal # to another process that might have gotten that PID in the meantime. if self.poll() is not None: @@ -320,7 +357,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): def pid(self): return self._pid - def wait(self): + def _wait(self): status = self.chan.recv_exit_status() # Ensure that the redirection thread is finished copying the content # from paramiko to the pipe. @@ -406,7 +443,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): else: return (_stdout, _stderr) - def poll(self): + def _poll(self): # Wait for the redirection thread to finish, otherwise we would # indicate the caller that the command is finished and that the streams # are safe to drain, but actually the redirection thread is not @@ -440,7 +477,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): def stderr(self): return self._stderr - def close(self): + def _close(self): for x in (self.stdin, self.stdout, self.stderr): if x is not None: x.close() @@ -464,7 +501,7 @@ class AdbBackgroundCommand(BackgroundCommand): self.adb_popen = adb_popen self._pid = pid - def send_signal(self, sig): + def _send_signal(self, sig): self.conn.execute( _kill_pgid_cmd(self.pid, sig, self.conn.busybox), as_root=self.as_root, @@ -486,14 +523,13 @@ class AdbBackgroundCommand(BackgroundCommand): def pid(self): return self._pid - def wait(self): + def _wait(self): return self.adb_popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.adb_popen, input, timeout) - - def poll(self): + def _poll(self): return self.adb_popen.poll() def _cancel(self, kill_timeout): @@ -504,17 +540,15 @@ class AdbBackgroundCommand(BackgroundCommand): self.send_signal(signal.SIGKILL) self.adb_popen.kill() - def close(self): + def _close(self): self.adb_popen.__exit__(None, None, None) return self.adb_popen.returncode def __enter__(self): + super().__enter__() self.adb_popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.adb_popen.__exit__(*args, **kwargs) - class TransferManagerBase(ABC): diff --git a/devlib/host.py b/devlib/host.py index d067d80..68c535d 100644 --- a/devlib/host.py +++ b/devlib/host.py @@ -142,7 +142,6 @@ class LocalConnection(ConnectionBase): preexec_fn=preexec_fn, ) bg_cmd = PopenBackgroundCommand(self, popen) - self._current_bg_cmds.add(bg_cmd) return bg_cmd def _close(self): diff --git a/devlib/utils/android.py b/devlib/utils/android.py index 50bca30..1cae582 100755 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -368,7 +368,6 @@ class AdbConnection(ConnectionBase): if as_root and self.connected_as_root: as_root = False bg_cmd = self._background(command, stdout, stderr, as_root) - self._current_bg_cmds.add(bg_cmd) return bg_cmd def _background(self, command, stdout, stderr, as_root): diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index c3f71f5..b848521 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -553,10 +553,7 @@ class SshConnection(SshConnectionBase): def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): with _handle_paramiko_exceptions(command): - bg_cmd = self._background(command, stdout, stderr, as_root) - - self._current_bg_cmds.add(bg_cmd) - return bg_cmd + return self._background(command, stdout, stderr, as_root) def _background(self, command, stdout, stderr, as_root): orig_command = command