diff --git a/devlib/connection.py b/devlib/connection.py index 1aee3b2..4772bd4 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -27,6 +27,8 @@ import subprocess import threading import time import logging +import select +import fcntl from devlib.utils.misc import InitCheckpoint @@ -36,6 +38,24 @@ _KILL_TIMEOUT = 3 def _kill_pgid_cmd(pgid, sig, busybox): return '{} kill -{} -{}'.format(busybox, sig.value, pgid) +def _popen_communicate(bg, popen, input, timeout): + try: + stdout, stderr = popen.communicate(input=input, timeout=timeout) + except subprocess.TimeoutExpired: + bg.cancel() + raise + + ret = popen.returncode + if ret: + raise subprocess.CalledProcessError( + ret, + popen.args, + stdout, + stderr, + ) + else: + return (stdout, stderr) + class ConnectionBase(InitCheckpoint): """ @@ -126,6 +146,21 @@ class BackgroundCommand(ABC): Block until the background command completes, and return its exit code. """ + def communicate(self, input=b'', timeout=None): + """ + Block until the background command completes while reading stdout and stderr. + Return ``tuple(stdout, stderr)``. If the return code is non-zero, + raises a :exc:`subprocess.CalledProcessError` exception. + """ + try: + return self._communicate(input=input, timeout=timeout) + finally: + self.close() + + @abstractmethod + def _communicate(self, input, timeout): + pass + @abstractmethod def poll(self): """ @@ -214,6 +249,9 @@ class PopenBackgroundCommand(BackgroundCommand): def wait(self): return self.popen.wait() + def _communicate(self, input, timeout): + return _popen_communicate(self, self.popen, input, timeout) + def poll(self): return self.popen.poll() @@ -273,6 +311,85 @@ class ParamikoBackgroundCommand(BackgroundCommand): self.redirect_thread.join() return status + def _communicate(self, input, timeout): + stdout = self._stdout + stderr = self._stderr + stdin = self._stdin + chan = self.chan + + # For some reason, file descriptors in the read-list of select() can + # still end up blocking in .read(), so make the non-blocking to avoid a + # deadlock. Since _communicate() will consume all input and all output + # until the command dies, we can do whatever we want with the pipe + # without affecting external users. + for s in (stdout, stderr): + fcntl.fcntl(s.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + + out = {stdout: [], stderr: []} + ret = None + can_send = True + + select_timeout = 1 + if timeout is not None: + select_timeout = min(select_timeout, 1) + + def create_out(): + return ( + b''.join(out[stdout]), + b''.join(out[stderr]) + ) + + start = monotonic() + + while ret is None: + # Even if ret is not None anymore, we need to drain the streams + ret = self.poll() + + if timeout is not None and ret is None and monotonic() - start >= timeout: + self.cancel() + _stdout, _stderr = create_out() + raise subprocess.TimeoutExpired(self.cmd, timeout, _stdout, _stderr) + + can_send &= (not chan.closed) & bool(input) + wlist = [chan] if can_send else [] + + if can_send and chan.send_ready(): + try: + n = chan.send(input) + # stdin might have been closed already + except OSError: + can_send = False + chan.shutdown_write() + else: + input = input[n:] + if not input: + # Send EOF on stdin + chan.shutdown_write() + + rs, ws, _ = select.select( + [x for x in (stdout, stderr) if not x.closed], + wlist, + [], + select_timeout, + ) + + for r in rs: + chunk = r.read() + if chunk: + out[r].append(chunk) + + _stdout, _stderr = create_out() + + if ret: + raise subprocess.CalledProcessError( + ret, + self.cmd, + _stdout, + _stderr, + ) + else: + return (_stdout, _stderr) + def poll(self): # Wait for the redirection thread to finish, otherwise we would # indicate the caller that the command is finished and that the streams @@ -356,6 +473,10 @@ class AdbBackgroundCommand(BackgroundCommand): def wait(self): return self.adb_popen.wait() + def _communicate(self, input, timeout): + return _popen_communicate(self, self.adb_popen, input, timeout) + + def poll(self): return self.adb_popen.poll()