diff --git a/devlib/bin/scripts/devlib-signal-target b/devlib/bin/scripts/devlib-signal-target new file mode 100644 index 0000000..26ac6fe --- /dev/null +++ b/devlib/bin/scripts/devlib-signal-target @@ -0,0 +1,20 @@ +( + # If there is no data dir, it means we are not running as a background + # command so we just do nothing + if [ -e "$_DEVLIB_BG_CMD_DATA_DIR" ]; then + pid_file="$_DEVLIB_BG_CMD_DATA_DIR/pid" + # Atomically check if the PID file already exist and make the write + # fail if it already does. This way we don't have any race condition + # with the Python API, as there is either no PID or the same PID for + # the duration of the command + set -o noclobber + if ! printf "%u\n" $$ > "$pid_file"; then + echo "$0 was already called for this command" >&2 + exit 1 + fi + fi +) || exit $? + +# Use exec so that the PID of the command we run is the same as the current $$ +# PID that we just registered +exec "$@" diff --git a/devlib/connection.py b/devlib/connection.py index 4609975..99055a3 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -17,6 +17,7 @@ from abc import ABC, abstractmethod from contextlib import contextmanager, nullcontext from shlex import quote import os +from pathlib import Path import signal import subprocess import threading @@ -25,14 +26,11 @@ import logging import select import fcntl -from devlib.utils.misc import InitCheckpoint +from devlib.utils.misc import InitCheckpoint, memoized _KILL_TIMEOUT = 3 -def _kill_pgid_cmd(pgid, sig, busybox): - return '{} kill -{} -{}'.format(busybox, sig.value, pgid) - def _popen_communicate(bg, popen, input, timeout): try: stdout, stderr = popen.communicate(input=input, timeout=timeout) @@ -130,8 +128,11 @@ class BackgroundCommand(ABC): semantic as :class:`subprocess.Popen`. """ - def __init__(self, conn): + def __init__(self, conn, data_dir, cmd, as_root): self.conn = conn + self._data_dir = data_dir + self.as_root = as_root + self.cmd = cmd # Poll currently opened background commands on that connection to make # them deregister themselves if they are completed. This avoids @@ -147,15 +148,65 @@ class BackgroundCommand(ABC): conn._current_bg_cmds.add(self) + @classmethod + def from_factory(cls, conn, cmd, as_root, make_init_kwargs): + cmd, data_dir = cls._with_data_dir(conn, cmd) + return cls( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + **make_init_kwargs(cmd), + ) + def _deregister(self): try: self.conn._current_bg_cmds.remove(self) except KeyError: pass - @abstractmethod - def _send_signal(self, sig): - pass + @property + def _pid_file(self): + return str(Path(self._data_dir, 'pid')) + + @property + @memoized + def _targeted_pid(self): + """ + PID of the process pointed at by ``devlib-signal-target`` command. + """ + path = quote(self._pid_file) + busybox = quote(self.conn.busybox) + + def execute(cmd): + return self.conn.execute(cmd, as_root=self.as_root) + + while self.poll() is None: + try: + pid = execute(f'{busybox} cat {path}') + except subprocess.CalledProcessError: + time.sleep(0.01) + else: + if pid.endswith('\n'): + return int(pid.strip()) + else: + # We got a partial write in the PID file + continue + + raise ValueError(f'The background commmand did not use devlib-signal-target wrapper to designate which command should be the target of signals') + + @classmethod + def _with_data_dir(cls, conn, cmd): + busybox = quote(conn.busybox) + data_dir = conn.execute(f'{busybox} mktemp -d').strip() + cmd = f'_DEVLIB_BG_CMD_DATA_DIR={data_dir} exec {busybox} sh -c {quote(cmd)}' + return cmd, data_dir + + def _cleanup_data_dir(self): + path = quote(self._data_dir) + busybox = quote(self.conn.busybox) + cmd = f'{busybox} rm -r {path} || true' + self.conn.execute(cmd, as_root=self.as_root) def send_signal(self, sig): """ @@ -165,8 +216,29 @@ class BackgroundCommand(ABC): :param signal: Signal to send. :type signal: signal.Signals """ + + def execute(cmd): + return self.conn.execute(cmd, as_root=self.as_root) + + def send(sig): + busybox = quote(self.conn.busybox) + # If the command has already completed, we don't want to send a + # signal to another process that might have gotten that PID in the + # meantime. + if self.poll() is None: + if sig in (signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL): + # Use -PGID to target a process group rather than just the + # process itself. This will work in any condition and will + # not require cooperation from the command. + execute(f'{busybox} kill -{sig.value} -{self.pid}') + else: + # Other signals require cooperation from the shell command + # so that it points to a specific process using + # devlib-signal-target + pid = self._targeted_pid + execute(f'{busybox} kill -{sig.value} {pid}') try: - return self._send_signal(sig) + return send(sig) finally: # Deregister if the command has finished self.poll() @@ -287,6 +359,7 @@ class BackgroundCommand(ABC): return self._close() finally: self._deregister() + self._cleanup_data_dir() def __enter__(self): return self @@ -300,13 +373,15 @@ class PopenBackgroundCommand(BackgroundCommand): :class:`subprocess.Popen`-based background command. """ - def __init__(self, conn, popen): - super().__init__(conn=conn) + def __init__(self, conn, data_dir, cmd, as_root, popen): + super().__init__( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + ) self.popen = popen - def _send_signal(self, sig): - return os.killpg(self.popen.pid, sig) - @property def stdin(self): return self.popen.stdin @@ -354,26 +429,20 @@ class ParamikoBackgroundCommand(BackgroundCommand): """ :mod:`paramiko`-based background command. """ - def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread): - super().__init__(conn=conn) + def __init__(self, conn, data_dir, cmd, as_root, chan, pid, stdin, stdout, stderr, redirect_thread): + super().__init__( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + ) + self.chan = chan - self.as_root = as_root self._pid = pid self._stdin = stdin self._stdout = stdout self._stderr = stderr self.redirect_thread = redirect_thread - self.cmd = cmd - - def _send_signal(self, sig): - # If the command has already completed, we don't want to send a signal - # to another process that might have gotten that PID in the meantime. - if self.poll() is not None: - return - # Use -PGID to target a process group rather than just the process - # itself - cmd = _kill_pgid_cmd(self.pid, sig, self.conn.busybox) - self.conn.execute(cmd, as_root=self.as_root) @property def pid(self): @@ -517,18 +586,16 @@ class AdbBackgroundCommand(BackgroundCommand): ``adb``-based background command. """ - def __init__(self, conn, adb_popen, pid, as_root): - super().__init__(conn=conn) - self.as_root = as_root + def __init__(self, conn, data_dir, cmd, as_root, adb_popen, pid): + super().__init__( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + ) self.adb_popen = adb_popen self._pid = pid - def _send_signal(self, sig): - self.conn.execute( - _kill_pgid_cmd(self.pid, sig, self.conn.busybox), - as_root=self.as_root, - ) - @property def stdin(self): return self.adb_popen.stdin @@ -638,7 +705,7 @@ class TransferHandleBase(ABC): class PopenTransferHandle(TransferHandleBase): - def __init__(self, bg_cmd, dest, direction, *args, **kwargs): + def __init__(self, popen, dest, direction, *args, **kwargs): super().__init__(*args, **kwargs) if direction == 'push': @@ -650,7 +717,7 @@ class PopenTransferHandle(TransferHandleBase): self.sample_size = lambda: sample_size(dest) - self.bg_cmd = bg_cmd + self.popen = popen self.last_sample = 0 @staticmethod @@ -671,7 +738,7 @@ class PopenTransferHandle(TransferHandleBase): return int(out.split()[0]) def cancel(self): - self.bg_cmd.cancel() + self.popen.terminate() def isactive(self): try: diff --git a/devlib/host.py b/devlib/host.py index a9958a3..a65b00f 100644 --- a/devlib/host.py +++ b/devlib/host.py @@ -147,16 +147,25 @@ class LocalConnection(ConnectionBase): def preexec_fn(): os.setpgrp() - popen = subprocess.Popen( - command, - stdout=stdout, - stderr=stderr, - stdin=subprocess.PIPE, - shell=True, - preexec_fn=preexec_fn, + def make_init_kwargs(command): + popen = subprocess.Popen( + command, + stdout=stdout, + stderr=stderr, + stdin=subprocess.PIPE, + shell=True, + preexec_fn=preexec_fn, + ) + return dict( + popen=popen, + ) + + return PopenBackgroundCommand.from_factory( + conn=self, + cmd=command, + as_root=as_root, + make_init_kwargs=make_init_kwargs, ) - bg_cmd = PopenBackgroundCommand(self, popen) - return bg_cmd def _close(self): pass diff --git a/devlib/target.py b/devlib/target.py index adceac9..110d1ad 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -281,7 +281,7 @@ class Target(object): @property def shutils(self): if self._shutils is None: - self._setup_shutils() + self._setup_scripts() return self._shutils def is_running(self, comm): @@ -591,7 +591,7 @@ class Target(object): @asyn.asyncf async def setup(self, executables=None): - await self._setup_shutils.asyn() + await self._setup_scripts.asyn() for host_exe in (executables or []): # pylint: disable=superfluous-parens await self.install.asyn(host_exe) @@ -1586,8 +1586,9 @@ fi # internal methods @asyn.asyncf - async def _setup_shutils(self): - shutils_ifile = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts', 'shutils.in') + async def _setup_scripts(self): + scripts = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts') + shutils_ifile = os.path.join(scripts, 'shutils.in') with open(shutils_ifile) as fh: lines = fh.readlines() with tempfile.TemporaryDirectory() as folder: @@ -1598,6 +1599,8 @@ fi ofile.write(line) self._shutils = await self.install.asyn(shutils_ofile) + await self.install.asyn(os.path.join(scripts, 'devlib-signal-target')) + @asyn.asyncf @call_conn async def _execute_util(self, command, timeout=None, check_exit_code=True, as_root=False): diff --git a/devlib/utils/android.py b/devlib/utils/android.py index c77a864..001cb93 100755 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -40,7 +40,7 @@ from shlex import quote from devlib.exception import TargetTransientError, TargetStableError, HostError, TargetTransientCalledProcessError, TargetStableCalledProcessError, AdbRootError from devlib.utils.misc import check_output, which, ABI_MAP, redirect_streams, get_subprocess -from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenBackgroundCommand, PopenTransferHandle +from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenTransferHandle logger = logging.getLogger('android') @@ -341,7 +341,7 @@ class AdbConnection(ConnectionBase): if timeout: adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server, adb_port=self.adb_port) else: - bg_cmd = adb_command_background( + popen = adb_command_popen( device=self.device, conn=self, command=command, @@ -351,12 +351,12 @@ class AdbConnection(ConnectionBase): handle = PopenTransferHandle( manager=self.transfer_manager, - bg_cmd=bg_cmd, + popen=popen, dest=dest, direction=action ) - with bg_cmd, self.transfer_manager.manage(sources, dest, action, handle): - bg_cmd.communicate() + with popen, self.transfer_manager.manage(sources, dest, action, handle): + popen.communicate() # pylint: disable=unused-argument def execute(self, command, timeout=None, check_exit_code=False, @@ -387,12 +387,18 @@ class AdbConnection(ConnectionBase): return bg_cmd def _background(self, command, stdout, stderr, as_root): - adb_shell, pid = adb_background_shell(self, command, stdout, stderr, as_root) - bg_cmd = AdbBackgroundCommand( + def make_init_kwargs(command): + adb_popen, pid = adb_background_shell(self, command, stdout, stderr, as_root) + return dict( + adb_popen=adb_popen, + pid=pid, + ) + + bg_cmd = AdbBackgroundCommand.from_factory( conn=self, - adb_popen=adb_shell, - pid=pid, - as_root=as_root + cmd=command, + as_root=as_root, + make_init_kwargs=make_init_kwargs, ) return bg_cmd @@ -756,12 +762,11 @@ def adb_command(device, command, timeout=None, adb_server=None, adb_port=None): return output -def adb_command_background(device, conn, command, adb_server=None, adb_port=None): - full_command = get_adb_command(device, command, adb_server, adb_port) - logger.debug(full_command) - popen = get_subprocess(full_command, shell=True) - cmd = PopenBackgroundCommand(conn=conn, popen=popen) - return cmd +def adb_command_popen(device, conn, command, adb_server=None, adb_port=None): + command = get_adb_command(device, command, adb_server, adb_port) + logger.debug(command) + popen = get_subprocess(command, shell=True) + return popen def grant_app_permissions(target, package): diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index 81aa405..caa0f44 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -613,138 +613,142 @@ class SshConnection(SshConnectionBase): return self._background(command, stdout, stderr, as_root) def _background(self, command, stdout, stderr, as_root): - orig_command = command - stdout, stderr, command = redirect_streams(stdout, stderr, command) + def make_init_kwargs(command): + _stdout, _stderr, _command = redirect_streams(stdout, stderr, command) - command = "printf '%s\n' $$; exec sh -c {}".format(quote(command)) - channel = self._make_channel() + _command = "printf '%s\n' $$; exec sh -c {}".format(quote(_command)) + channel = self._make_channel() - def executor(cmd, timeout): - channel.exec_command(cmd) - # Read are not buffered so we will always get the data as soon as - # they arrive - return ( - channel.makefile_stdin('w', 0), - channel.makefile(), - channel.makefile_stderr(), + def executor(cmd, timeout): + channel.exec_command(cmd) + # Read are not buffered so we will always get the data as soon as + # they arrive + return ( + channel.makefile_stdin('w', 0), + channel.makefile(), + channel.makefile_stderr(), + ) + + stdin, stdout_in, stderr_in = self._execute_command( + _command, + as_root=as_root, + log=False, + timeout=None, + executor=executor, ) + pid = stdout_in.readline() + if not pid: + _stderr = stderr_in.read() + if channel.exit_status_ready(): + ret = channel.recv_exit_status() + else: + ret = 126 + raise subprocess.CalledProcessError( + ret, + _command, + b'', + _stderr, + ) + pid = int(pid) - stdin, stdout_in, stderr_in = self._execute_command( - command, - as_root=as_root, - log=False, - timeout=None, - executor=executor, - ) - pid = stdout_in.readline() - if not pid: - stderr = stderr_in.read() - if channel.exit_status_ready(): - ret = channel.recv_exit_status() - else: - ret = 126 - raise subprocess.CalledProcessError( - ret, - command, - b'', - stderr, - ) - pid = int(pid) + def create_out_stream(stream_in, stream_out): + """ + Create a pair of file-like objects. The first one is used to read + data and the second one to write. + """ - def create_out_stream(stream_in, stream_out): - """ - Create a pair of file-like objects. The first one is used to read - data and the second one to write. - """ + if stream_out == subprocess.DEVNULL: + r, w = None, None + # When asked for a pipe, we just give the file-like object as the + # reading end and no writing end, since paramiko already writes to + # it + elif stream_out == subprocess.PIPE: + r, w = os.pipe() + r = os.fdopen(r, 'rb') + w = os.fdopen(w, 'wb') + # Turn a file descriptor into a file-like object + elif isinstance(stream_out, int) and stream_out >= 0: + r = os.fdopen(stream_in, 'rb') + w = os.fdopen(stream_out, 'wb') + # file-like object + else: + r = stream_in + w = stream_out - if stream_out == subprocess.DEVNULL: - r, w = None, None - # When asked for a pipe, we just give the file-like object as the - # reading end and no writing end, since paramiko already writes to - # it - elif stream_out == subprocess.PIPE: - r, w = os.pipe() - r = os.fdopen(r, 'rb') - w = os.fdopen(w, 'wb') - # Turn a file descriptor into a file-like object - elif isinstance(stream_out, int) and stream_out >= 0: - r = os.fdopen(stream_in, 'rb') - w = os.fdopen(stream_out, 'wb') - # file-like object - else: - r = stream_in - w = stream_out + return (r, w) - return (r, w) + out_streams = { + name: create_out_stream(stream_in, stream_out) + for stream_in, stream_out, name in ( + (stdout_in, _stdout, 'stdout'), + (stderr_in, _stderr, 'stderr'), + ) + } - out_streams = { - name: create_out_stream(stream_in, stream_out) - for stream_in, stream_out, name in ( - (stdout_in, stdout, 'stdout'), - (stderr_in, stderr, 'stderr'), - ) - } + def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout): + def callback(out_streams, name, chunk): + try: + r, w = out_streams[name] + except KeyError: + return out_streams + + try: + w.write(chunk) + # Write failed + except ValueError: + # Since that stream is now closed, stop trying to write to it + del out_streams[name] + # If that was the last open stream, we raise an + # exception so the thread can terminate. + if not out_streams: + raise - def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout): - def callback(out_streams, name, chunk): - try: - r, w = out_streams[name] - except KeyError: return out_streams try: - w.write(chunk) - # Write failed + _read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams)) + # The streams closed while we were writing to it, the job is done here except ValueError: - # Since that stream is now closed, stop trying to write to it - del out_streams[name] - # If that was the last open stream, we raise an - # exception so the thread can terminate. - if not out_streams: - raise + pass - return out_streams + # Make sure the writing end are closed proper since we are not + # going to write anything anymore + for r, w in out_streams.values(): + w.flush() + if r is not w and w is not None: + w.close() - try: - _read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams)) - # The streams closed while we were writing to it, the job is done here - except ValueError: - pass + # If there is anything we need to redirect to, spawn a thread taking + # care of that + select_timeout = 1 + thread_out_streams = { + name: (r, w) + for name, (r, w) in out_streams.items() + if w is not None + } + redirect_thread = threading.Thread( + target=redirect_thread_f, + args=(stdout_in, stderr_in, thread_out_streams, select_timeout), + # The thread will die when the main thread dies + daemon=True, + ) + redirect_thread.start() - # Make sure the writing end are closed proper since we are not - # going to write anything anymore - for r, w in out_streams.values(): - w.flush() - if r is not w and w is not None: - w.close() + return dict( + chan=channel, + pid=pid, + stdin=stdin, + # We give the reading end to the consumer of the data + stdout=out_streams['stdout'][0], + stderr=out_streams['stderr'][0], + redirect_thread=redirect_thread, + ) - # If there is anything we need to redirect to, spawn a thread taking - # care of that - select_timeout = 1 - thread_out_streams = { - name: (r, w) - for name, (r, w) in out_streams.items() - if w is not None - } - redirect_thread = threading.Thread( - target=redirect_thread_f, - args=(stdout_in, stderr_in, thread_out_streams, select_timeout), - # The thread will die when the main thread dies - daemon=True, - ) - redirect_thread.start() - - return ParamikoBackgroundCommand( + return ParamikoBackgroundCommand.from_factory( conn=self, + cmd=command, as_root=as_root, - chan=channel, - pid=pid, - stdin=stdin, - # We give the reading end to the consumer of the data - stdout=out_streams['stdout'][0], - stderr=out_streams['stderr'][0], - redirect_thread=redirect_thread, - cmd=orig_command, + make_init_kwargs=make_init_kwargs, ) def _close(self):