diff --git a/devlib/connection.py b/devlib/connection.py index b10fde3..f098a54 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -62,7 +62,7 @@ class ConnectionBase(InitCheckpoint): Base class for all connections. """ def __init__(self): - self._current_bg_cmds = WeakSet() + self._current_bg_cmds = set() self._closed = False self._close_lock = threading.Lock() self.busybox = None @@ -123,8 +123,18 @@ class BackgroundCommand(ABC): def __init__(self, conn): self.conn = conn + conn._current_bg_cmds.add(self) + + def _deregister(self): + try: + self.conn._current_bg_cmds.remove(self) + except KeyError: + pass @abstractmethod + def _send_signal(self, sig): + pass + def send_signal(self, sig): """ Send a POSIX signal to the background command's process group ID @@ -133,6 +143,11 @@ class BackgroundCommand(ABC): :param signal: Signal to send. :type signal: signal.Signals """ + try: + self._send_signal(signal.SIGKILL) + finally: + # Deregister if the command has finished + self.poll() def kill(self): """ @@ -145,8 +160,11 @@ class BackgroundCommand(ABC): Try to gracefully terminate the process by sending ``SIGTERM``, then waiting for ``kill_timeout`` to send ``SIGKILL``. """ - if self.poll() is None: - self._cancel(kill_timeout=kill_timeout) + try: + if self.poll() is None: + self._cancel(kill_timeout=kill_timeout) + finally: + self._deregister() @abstractmethod def _cancel(self, kill_timeout): @@ -156,10 +174,17 @@ class BackgroundCommand(ABC): pass @abstractmethod + def _wait(self): + pass + def wait(self): """ Block until the background command completes, and return its exit code. """ + try: + self._wait() + finally: + self._deregister() def communicate(self, input=b'', timeout=None): """ @@ -177,10 +202,17 @@ class BackgroundCommand(ABC): pass @abstractmethod + def _poll(self): + pass + def poll(self): """ Return exit code if the command has exited, None otherwise. """ + retcode = self._poll() + if retcode is not None: + self._deregister() + return retcode @property @abstractmethod @@ -217,6 +249,9 @@ class BackgroundCommand(ABC): """ @abstractmethod + def _close(self): + pass + def close(self): """ Close all opened streams and then wait for command completion. @@ -226,6 +261,10 @@ class BackgroundCommand(ABC): .. note:: If the command is writing to its stdout/stderr, it might be blocked on that and die when the streams are closed. """ + try: + return self._close() + finally: + self._deregister() def __enter__(self): return self @@ -243,7 +282,7 @@ class PopenBackgroundCommand(BackgroundCommand): super().__init__(conn=conn) self.popen = popen - def send_signal(self, sig): + def _send_signal(self, sig): return os.killpg(self.popen.pid, sig) @property @@ -262,13 +301,13 @@ class PopenBackgroundCommand(BackgroundCommand): def pid(self): return self.popen.pid - def wait(self): + def _wait(self): return self.popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.popen, input, timeout) - def poll(self): + def _poll(self): return self.popen.poll() def _cancel(self, kill_timeout): @@ -279,17 +318,15 @@ class PopenBackgroundCommand(BackgroundCommand): except subprocess.TimeoutExpired: os.killpg(os.getpgid(popen.pid), signal.SIGKILL) - def close(self): + def _close(self): self.popen.__exit__(None, None, None) return self.popen.returncode def __enter__(self): + super().__enter__() self.popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.popen.__exit__(*args, **kwargs) - class ParamikoBackgroundCommand(BackgroundCommand): """ @@ -306,7 +343,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): self.redirect_thread = redirect_thread self.cmd = cmd - def send_signal(self, sig): + def _send_signal(self, sig): # If the command has already completed, we don't want to send a signal # to another process that might have gotten that PID in the meantime. if self.poll() is not None: @@ -320,7 +357,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): def pid(self): return self._pid - def wait(self): + def _wait(self): status = self.chan.recv_exit_status() # Ensure that the redirection thread is finished copying the content # from paramiko to the pipe. @@ -406,7 +443,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): else: return (_stdout, _stderr) - def poll(self): + def _poll(self): # Wait for the redirection thread to finish, otherwise we would # indicate the caller that the command is finished and that the streams # are safe to drain, but actually the redirection thread is not @@ -440,7 +477,7 @@ class ParamikoBackgroundCommand(BackgroundCommand): def stderr(self): return self._stderr - def close(self): + def _close(self): for x in (self.stdin, self.stdout, self.stderr): if x is not None: x.close() @@ -464,7 +501,7 @@ class AdbBackgroundCommand(BackgroundCommand): self.adb_popen = adb_popen self._pid = pid - def send_signal(self, sig): + def _send_signal(self, sig): self.conn.execute( _kill_pgid_cmd(self.pid, sig, self.conn.busybox), as_root=self.as_root, @@ -486,14 +523,13 @@ class AdbBackgroundCommand(BackgroundCommand): def pid(self): return self._pid - def wait(self): + def _wait(self): return self.adb_popen.wait() def _communicate(self, input, timeout): return _popen_communicate(self, self.adb_popen, input, timeout) - - def poll(self): + def _poll(self): return self.adb_popen.poll() def _cancel(self, kill_timeout): @@ -504,17 +540,15 @@ class AdbBackgroundCommand(BackgroundCommand): self.send_signal(signal.SIGKILL) self.adb_popen.kill() - def close(self): + def _close(self): self.adb_popen.__exit__(None, None, None) return self.adb_popen.returncode def __enter__(self): + super().__enter__() self.adb_popen.__enter__() return self - def __exit__(self, *args, **kwargs): - self.adb_popen.__exit__(*args, **kwargs) - class TransferManagerBase(ABC): diff --git a/devlib/host.py b/devlib/host.py index d067d80..68c535d 100644 --- a/devlib/host.py +++ b/devlib/host.py @@ -142,7 +142,6 @@ class LocalConnection(ConnectionBase): preexec_fn=preexec_fn, ) bg_cmd = PopenBackgroundCommand(self, popen) - self._current_bg_cmds.add(bg_cmd) return bg_cmd def _close(self): diff --git a/devlib/utils/android.py b/devlib/utils/android.py index 50bca30..1cae582 100755 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -368,7 +368,6 @@ class AdbConnection(ConnectionBase): if as_root and self.connected_as_root: as_root = False bg_cmd = self._background(command, stdout, stderr, as_root) - self._current_bg_cmds.add(bg_cmd) return bg_cmd def _background(self, command, stdout, stderr, as_root): diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index c3f71f5..b848521 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -553,10 +553,7 @@ class SshConnection(SshConnectionBase): def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): with _handle_paramiko_exceptions(command): - bg_cmd = self._background(command, stdout, stderr, as_root) - - self._current_bg_cmds.add(bg_cmd) - return bg_cmd + return self._background(command, stdout, stderr, as_root) def _background(self, command, stdout, stderr, as_root): orig_command = command