1
0
mirror of https://github.com/ARM-software/devlib.git synced 2025-03-04 17:27:51 +00:00

connection: Support all signals in BackgroundCommand.send_signal()

Support sending any signal to background commands, instead of only
supporting properly SIGKILL/SIGTERM/SIGQUIT.

The main issue is figuring out what PID to send the signal to, as the
devlib API allows running a whole snippet of shell script that typically
is wrapped under many layers of sh -c and sudo calls. In order to lift
the ambiguity, the user has access to a "devlib-signal-target" command
that points devlib at what process should be the target of signals:

    # Run a "setup" command, then the main command that will receive the
    # signals
    cmd = 'echo setup; devlib-signal-target echo hello world'
    with target.background(cmd) as bg:
	bg.communicate()

The devlib-signal-target script can only be used once per background
command, so that it is never ambiguous what process is targeted, and so
that the Python code can cache the target PID.  Subsequent invocations
of devlib-signal-target will fail.
This commit is contained in:
Douglas Raillard 2024-05-30 10:41:50 +01:00 committed by Marc Bonnici
parent eb9e0c9870
commit 9ec36e9040
6 changed files with 290 additions and 182 deletions

View File

@ -0,0 +1,20 @@
(
# If there is no data dir, it means we are not running as a background
# command so we just do nothing
if [ -e "$_DEVLIB_BG_CMD_DATA_DIR" ]; then
pid_file="$_DEVLIB_BG_CMD_DATA_DIR/pid"
# Atomically check if the PID file already exist and make the write
# fail if it already does. This way we don't have any race condition
# with the Python API, as there is either no PID or the same PID for
# the duration of the command
set -o noclobber
if ! printf "%u\n" $$ > "$pid_file"; then
echo "$0 was already called for this command" >&2
exit 1
fi
fi
) || exit $?
# Use exec so that the PID of the command we run is the same as the current $$
# PID that we just registered
exec "$@"

View File

@ -17,6 +17,7 @@ from abc import ABC, abstractmethod
from contextlib import contextmanager, nullcontext from contextlib import contextmanager, nullcontext
from shlex import quote from shlex import quote
import os import os
from pathlib import Path
import signal import signal
import subprocess import subprocess
import threading import threading
@ -25,14 +26,11 @@ import logging
import select import select
import fcntl import fcntl
from devlib.utils.misc import InitCheckpoint from devlib.utils.misc import InitCheckpoint, memoized
_KILL_TIMEOUT = 3 _KILL_TIMEOUT = 3
def _kill_pgid_cmd(pgid, sig, busybox):
return '{} kill -{} -{}'.format(busybox, sig.value, pgid)
def _popen_communicate(bg, popen, input, timeout): def _popen_communicate(bg, popen, input, timeout):
try: try:
stdout, stderr = popen.communicate(input=input, timeout=timeout) stdout, stderr = popen.communicate(input=input, timeout=timeout)
@ -130,8 +128,11 @@ class BackgroundCommand(ABC):
semantic as :class:`subprocess.Popen`. semantic as :class:`subprocess.Popen`.
""" """
def __init__(self, conn): def __init__(self, conn, data_dir, cmd, as_root):
self.conn = conn self.conn = conn
self._data_dir = data_dir
self.as_root = as_root
self.cmd = cmd
# Poll currently opened background commands on that connection to make # Poll currently opened background commands on that connection to make
# them deregister themselves if they are completed. This avoids # them deregister themselves if they are completed. This avoids
@ -147,15 +148,65 @@ class BackgroundCommand(ABC):
conn._current_bg_cmds.add(self) conn._current_bg_cmds.add(self)
@classmethod
def from_factory(cls, conn, cmd, as_root, make_init_kwargs):
cmd, data_dir = cls._with_data_dir(conn, cmd)
return cls(
conn=conn,
data_dir=data_dir,
cmd=cmd,
as_root=as_root,
**make_init_kwargs(cmd),
)
def _deregister(self): def _deregister(self):
try: try:
self.conn._current_bg_cmds.remove(self) self.conn._current_bg_cmds.remove(self)
except KeyError: except KeyError:
pass pass
@abstractmethod @property
def _send_signal(self, sig): def _pid_file(self):
pass return str(Path(self._data_dir, 'pid'))
@property
@memoized
def _targeted_pid(self):
"""
PID of the process pointed at by ``devlib-signal-target`` command.
"""
path = quote(self._pid_file)
busybox = quote(self.conn.busybox)
def execute(cmd):
return self.conn.execute(cmd, as_root=self.as_root)
while self.poll() is None:
try:
pid = execute(f'{busybox} cat {path}')
except subprocess.CalledProcessError:
time.sleep(0.01)
else:
if pid.endswith('\n'):
return int(pid.strip())
else:
# We got a partial write in the PID file
continue
raise ValueError(f'The background commmand did not use devlib-signal-target wrapper to designate which command should be the target of signals')
@classmethod
def _with_data_dir(cls, conn, cmd):
busybox = quote(conn.busybox)
data_dir = conn.execute(f'{busybox} mktemp -d').strip()
cmd = f'_DEVLIB_BG_CMD_DATA_DIR={data_dir} exec {busybox} sh -c {quote(cmd)}'
return cmd, data_dir
def _cleanup_data_dir(self):
path = quote(self._data_dir)
busybox = quote(self.conn.busybox)
cmd = f'{busybox} rm -r {path} || true'
self.conn.execute(cmd, as_root=self.as_root)
def send_signal(self, sig): def send_signal(self, sig):
""" """
@ -165,8 +216,29 @@ class BackgroundCommand(ABC):
:param signal: Signal to send. :param signal: Signal to send.
:type signal: signal.Signals :type signal: signal.Signals
""" """
def execute(cmd):
return self.conn.execute(cmd, as_root=self.as_root)
def send(sig):
busybox = quote(self.conn.busybox)
# If the command has already completed, we don't want to send a
# signal to another process that might have gotten that PID in the
# meantime.
if self.poll() is None:
if sig in (signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL):
# Use -PGID to target a process group rather than just the
# process itself. This will work in any condition and will
# not require cooperation from the command.
execute(f'{busybox} kill -{sig.value} -{self.pid}')
else:
# Other signals require cooperation from the shell command
# so that it points to a specific process using
# devlib-signal-target
pid = self._targeted_pid
execute(f'{busybox} kill -{sig.value} {pid}')
try: try:
return self._send_signal(sig) return send(sig)
finally: finally:
# Deregister if the command has finished # Deregister if the command has finished
self.poll() self.poll()
@ -287,6 +359,7 @@ class BackgroundCommand(ABC):
return self._close() return self._close()
finally: finally:
self._deregister() self._deregister()
self._cleanup_data_dir()
def __enter__(self): def __enter__(self):
return self return self
@ -300,13 +373,15 @@ class PopenBackgroundCommand(BackgroundCommand):
:class:`subprocess.Popen`-based background command. :class:`subprocess.Popen`-based background command.
""" """
def __init__(self, conn, popen): def __init__(self, conn, data_dir, cmd, as_root, popen):
super().__init__(conn=conn) super().__init__(
conn=conn,
data_dir=data_dir,
cmd=cmd,
as_root=as_root,
)
self.popen = popen self.popen = popen
def _send_signal(self, sig):
return os.killpg(self.popen.pid, sig)
@property @property
def stdin(self): def stdin(self):
return self.popen.stdin return self.popen.stdin
@ -354,26 +429,20 @@ class ParamikoBackgroundCommand(BackgroundCommand):
""" """
:mod:`paramiko`-based background command. :mod:`paramiko`-based background command.
""" """
def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread): def __init__(self, conn, data_dir, cmd, as_root, chan, pid, stdin, stdout, stderr, redirect_thread):
super().__init__(conn=conn) super().__init__(
conn=conn,
data_dir=data_dir,
cmd=cmd,
as_root=as_root,
)
self.chan = chan self.chan = chan
self.as_root = as_root
self._pid = pid self._pid = pid
self._stdin = stdin self._stdin = stdin
self._stdout = stdout self._stdout = stdout
self._stderr = stderr self._stderr = stderr
self.redirect_thread = redirect_thread self.redirect_thread = redirect_thread
self.cmd = cmd
def _send_signal(self, sig):
# If the command has already completed, we don't want to send a signal
# to another process that might have gotten that PID in the meantime.
if self.poll() is not None:
return
# Use -PGID to target a process group rather than just the process
# itself
cmd = _kill_pgid_cmd(self.pid, sig, self.conn.busybox)
self.conn.execute(cmd, as_root=self.as_root)
@property @property
def pid(self): def pid(self):
@ -517,18 +586,16 @@ class AdbBackgroundCommand(BackgroundCommand):
``adb``-based background command. ``adb``-based background command.
""" """
def __init__(self, conn, adb_popen, pid, as_root): def __init__(self, conn, data_dir, cmd, as_root, adb_popen, pid):
super().__init__(conn=conn) super().__init__(
self.as_root = as_root conn=conn,
data_dir=data_dir,
cmd=cmd,
as_root=as_root,
)
self.adb_popen = adb_popen self.adb_popen = adb_popen
self._pid = pid self._pid = pid
def _send_signal(self, sig):
self.conn.execute(
_kill_pgid_cmd(self.pid, sig, self.conn.busybox),
as_root=self.as_root,
)
@property @property
def stdin(self): def stdin(self):
return self.adb_popen.stdin return self.adb_popen.stdin
@ -638,7 +705,7 @@ class TransferHandleBase(ABC):
class PopenTransferHandle(TransferHandleBase): class PopenTransferHandle(TransferHandleBase):
def __init__(self, bg_cmd, dest, direction, *args, **kwargs): def __init__(self, popen, dest, direction, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
if direction == 'push': if direction == 'push':
@ -650,7 +717,7 @@ class PopenTransferHandle(TransferHandleBase):
self.sample_size = lambda: sample_size(dest) self.sample_size = lambda: sample_size(dest)
self.bg_cmd = bg_cmd self.popen = popen
self.last_sample = 0 self.last_sample = 0
@staticmethod @staticmethod
@ -671,7 +738,7 @@ class PopenTransferHandle(TransferHandleBase):
return int(out.split()[0]) return int(out.split()[0])
def cancel(self): def cancel(self):
self.bg_cmd.cancel() self.popen.terminate()
def isactive(self): def isactive(self):
try: try:

View File

@ -147,16 +147,25 @@ class LocalConnection(ConnectionBase):
def preexec_fn(): def preexec_fn():
os.setpgrp() os.setpgrp()
popen = subprocess.Popen( def make_init_kwargs(command):
command, popen = subprocess.Popen(
stdout=stdout, command,
stderr=stderr, stdout=stdout,
stdin=subprocess.PIPE, stderr=stderr,
shell=True, stdin=subprocess.PIPE,
preexec_fn=preexec_fn, shell=True,
preexec_fn=preexec_fn,
)
return dict(
popen=popen,
)
return PopenBackgroundCommand.from_factory(
conn=self,
cmd=command,
as_root=as_root,
make_init_kwargs=make_init_kwargs,
) )
bg_cmd = PopenBackgroundCommand(self, popen)
return bg_cmd
def _close(self): def _close(self):
pass pass

View File

@ -281,7 +281,7 @@ class Target(object):
@property @property
def shutils(self): def shutils(self):
if self._shutils is None: if self._shutils is None:
self._setup_shutils() self._setup_scripts()
return self._shutils return self._shutils
def is_running(self, comm): def is_running(self, comm):
@ -591,7 +591,7 @@ class Target(object):
@asyn.asyncf @asyn.asyncf
async def setup(self, executables=None): async def setup(self, executables=None):
await self._setup_shutils.asyn() await self._setup_scripts.asyn()
for host_exe in (executables or []): # pylint: disable=superfluous-parens for host_exe in (executables or []): # pylint: disable=superfluous-parens
await self.install.asyn(host_exe) await self.install.asyn(host_exe)
@ -1586,8 +1586,9 @@ fi
# internal methods # internal methods
@asyn.asyncf @asyn.asyncf
async def _setup_shutils(self): async def _setup_scripts(self):
shutils_ifile = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts', 'shutils.in') scripts = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts')
shutils_ifile = os.path.join(scripts, 'shutils.in')
with open(shutils_ifile) as fh: with open(shutils_ifile) as fh:
lines = fh.readlines() lines = fh.readlines()
with tempfile.TemporaryDirectory() as folder: with tempfile.TemporaryDirectory() as folder:
@ -1598,6 +1599,8 @@ fi
ofile.write(line) ofile.write(line)
self._shutils = await self.install.asyn(shutils_ofile) self._shutils = await self.install.asyn(shutils_ofile)
await self.install.asyn(os.path.join(scripts, 'devlib-signal-target'))
@asyn.asyncf @asyn.asyncf
@call_conn @call_conn
async def _execute_util(self, command, timeout=None, check_exit_code=True, as_root=False): async def _execute_util(self, command, timeout=None, check_exit_code=True, as_root=False):

View File

@ -40,7 +40,7 @@ from shlex import quote
from devlib.exception import TargetTransientError, TargetStableError, HostError, TargetTransientCalledProcessError, TargetStableCalledProcessError, AdbRootError from devlib.exception import TargetTransientError, TargetStableError, HostError, TargetTransientCalledProcessError, TargetStableCalledProcessError, AdbRootError
from devlib.utils.misc import check_output, which, ABI_MAP, redirect_streams, get_subprocess from devlib.utils.misc import check_output, which, ABI_MAP, redirect_streams, get_subprocess
from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenBackgroundCommand, PopenTransferHandle from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenTransferHandle
logger = logging.getLogger('android') logger = logging.getLogger('android')
@ -341,7 +341,7 @@ class AdbConnection(ConnectionBase):
if timeout: if timeout:
adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server, adb_port=self.adb_port) adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server, adb_port=self.adb_port)
else: else:
bg_cmd = adb_command_background( popen = adb_command_popen(
device=self.device, device=self.device,
conn=self, conn=self,
command=command, command=command,
@ -351,12 +351,12 @@ class AdbConnection(ConnectionBase):
handle = PopenTransferHandle( handle = PopenTransferHandle(
manager=self.transfer_manager, manager=self.transfer_manager,
bg_cmd=bg_cmd, popen=popen,
dest=dest, dest=dest,
direction=action direction=action
) )
with bg_cmd, self.transfer_manager.manage(sources, dest, action, handle): with popen, self.transfer_manager.manage(sources, dest, action, handle):
bg_cmd.communicate() popen.communicate()
# pylint: disable=unused-argument # pylint: disable=unused-argument
def execute(self, command, timeout=None, check_exit_code=False, def execute(self, command, timeout=None, check_exit_code=False,
@ -387,12 +387,18 @@ class AdbConnection(ConnectionBase):
return bg_cmd return bg_cmd
def _background(self, command, stdout, stderr, as_root): def _background(self, command, stdout, stderr, as_root):
adb_shell, pid = adb_background_shell(self, command, stdout, stderr, as_root) def make_init_kwargs(command):
bg_cmd = AdbBackgroundCommand( adb_popen, pid = adb_background_shell(self, command, stdout, stderr, as_root)
return dict(
adb_popen=adb_popen,
pid=pid,
)
bg_cmd = AdbBackgroundCommand.from_factory(
conn=self, conn=self,
adb_popen=adb_shell, cmd=command,
pid=pid, as_root=as_root,
as_root=as_root make_init_kwargs=make_init_kwargs,
) )
return bg_cmd return bg_cmd
@ -756,12 +762,11 @@ def adb_command(device, command, timeout=None, adb_server=None, adb_port=None):
return output return output
def adb_command_background(device, conn, command, adb_server=None, adb_port=None): def adb_command_popen(device, conn, command, adb_server=None, adb_port=None):
full_command = get_adb_command(device, command, adb_server, adb_port) command = get_adb_command(device, command, adb_server, adb_port)
logger.debug(full_command) logger.debug(command)
popen = get_subprocess(full_command, shell=True) popen = get_subprocess(command, shell=True)
cmd = PopenBackgroundCommand(conn=conn, popen=popen) return popen
return cmd
def grant_app_permissions(target, package): def grant_app_permissions(target, package):

View File

@ -613,138 +613,142 @@ class SshConnection(SshConnectionBase):
return self._background(command, stdout, stderr, as_root) return self._background(command, stdout, stderr, as_root)
def _background(self, command, stdout, stderr, as_root): def _background(self, command, stdout, stderr, as_root):
orig_command = command def make_init_kwargs(command):
stdout, stderr, command = redirect_streams(stdout, stderr, command) _stdout, _stderr, _command = redirect_streams(stdout, stderr, command)
command = "printf '%s\n' $$; exec sh -c {}".format(quote(command)) _command = "printf '%s\n' $$; exec sh -c {}".format(quote(_command))
channel = self._make_channel() channel = self._make_channel()
def executor(cmd, timeout): def executor(cmd, timeout):
channel.exec_command(cmd) channel.exec_command(cmd)
# Read are not buffered so we will always get the data as soon as # Read are not buffered so we will always get the data as soon as
# they arrive # they arrive
return ( return (
channel.makefile_stdin('w', 0), channel.makefile_stdin('w', 0),
channel.makefile(), channel.makefile(),
channel.makefile_stderr(), channel.makefile_stderr(),
)
stdin, stdout_in, stderr_in = self._execute_command(
_command,
as_root=as_root,
log=False,
timeout=None,
executor=executor,
) )
pid = stdout_in.readline()
if not pid:
_stderr = stderr_in.read()
if channel.exit_status_ready():
ret = channel.recv_exit_status()
else:
ret = 126
raise subprocess.CalledProcessError(
ret,
_command,
b'',
_stderr,
)
pid = int(pid)
stdin, stdout_in, stderr_in = self._execute_command( def create_out_stream(stream_in, stream_out):
command, """
as_root=as_root, Create a pair of file-like objects. The first one is used to read
log=False, data and the second one to write.
timeout=None, """
executor=executor,
)
pid = stdout_in.readline()
if not pid:
stderr = stderr_in.read()
if channel.exit_status_ready():
ret = channel.recv_exit_status()
else:
ret = 126
raise subprocess.CalledProcessError(
ret,
command,
b'',
stderr,
)
pid = int(pid)
def create_out_stream(stream_in, stream_out): if stream_out == subprocess.DEVNULL:
""" r, w = None, None
Create a pair of file-like objects. The first one is used to read # When asked for a pipe, we just give the file-like object as the
data and the second one to write. # reading end and no writing end, since paramiko already writes to
""" # it
elif stream_out == subprocess.PIPE:
r, w = os.pipe()
r = os.fdopen(r, 'rb')
w = os.fdopen(w, 'wb')
# Turn a file descriptor into a file-like object
elif isinstance(stream_out, int) and stream_out >= 0:
r = os.fdopen(stream_in, 'rb')
w = os.fdopen(stream_out, 'wb')
# file-like object
else:
r = stream_in
w = stream_out
if stream_out == subprocess.DEVNULL: return (r, w)
r, w = None, None
# When asked for a pipe, we just give the file-like object as the
# reading end and no writing end, since paramiko already writes to
# it
elif stream_out == subprocess.PIPE:
r, w = os.pipe()
r = os.fdopen(r, 'rb')
w = os.fdopen(w, 'wb')
# Turn a file descriptor into a file-like object
elif isinstance(stream_out, int) and stream_out >= 0:
r = os.fdopen(stream_in, 'rb')
w = os.fdopen(stream_out, 'wb')
# file-like object
else:
r = stream_in
w = stream_out
return (r, w) out_streams = {
name: create_out_stream(stream_in, stream_out)
for stream_in, stream_out, name in (
(stdout_in, _stdout, 'stdout'),
(stderr_in, _stderr, 'stderr'),
)
}
out_streams = { def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout):
name: create_out_stream(stream_in, stream_out) def callback(out_streams, name, chunk):
for stream_in, stream_out, name in ( try:
(stdout_in, stdout, 'stdout'), r, w = out_streams[name]
(stderr_in, stderr, 'stderr'), except KeyError:
) return out_streams
}
try:
w.write(chunk)
# Write failed
except ValueError:
# Since that stream is now closed, stop trying to write to it
del out_streams[name]
# If that was the last open stream, we raise an
# exception so the thread can terminate.
if not out_streams:
raise
def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout):
def callback(out_streams, name, chunk):
try:
r, w = out_streams[name]
except KeyError:
return out_streams return out_streams
try: try:
w.write(chunk) _read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams))
# Write failed # The streams closed while we were writing to it, the job is done here
except ValueError: except ValueError:
# Since that stream is now closed, stop trying to write to it pass
del out_streams[name]
# If that was the last open stream, we raise an
# exception so the thread can terminate.
if not out_streams:
raise
return out_streams # Make sure the writing end are closed proper since we are not
# going to write anything anymore
for r, w in out_streams.values():
w.flush()
if r is not w and w is not None:
w.close()
try: # If there is anything we need to redirect to, spawn a thread taking
_read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams)) # care of that
# The streams closed while we were writing to it, the job is done here select_timeout = 1
except ValueError: thread_out_streams = {
pass name: (r, w)
for name, (r, w) in out_streams.items()
if w is not None
}
redirect_thread = threading.Thread(
target=redirect_thread_f,
args=(stdout_in, stderr_in, thread_out_streams, select_timeout),
# The thread will die when the main thread dies
daemon=True,
)
redirect_thread.start()
# Make sure the writing end are closed proper since we are not return dict(
# going to write anything anymore chan=channel,
for r, w in out_streams.values(): pid=pid,
w.flush() stdin=stdin,
if r is not w and w is not None: # We give the reading end to the consumer of the data
w.close() stdout=out_streams['stdout'][0],
stderr=out_streams['stderr'][0],
redirect_thread=redirect_thread,
)
# If there is anything we need to redirect to, spawn a thread taking return ParamikoBackgroundCommand.from_factory(
# care of that
select_timeout = 1
thread_out_streams = {
name: (r, w)
for name, (r, w) in out_streams.items()
if w is not None
}
redirect_thread = threading.Thread(
target=redirect_thread_f,
args=(stdout_in, stderr_in, thread_out_streams, select_timeout),
# The thread will die when the main thread dies
daemon=True,
)
redirect_thread.start()
return ParamikoBackgroundCommand(
conn=self, conn=self,
cmd=command,
as_root=as_root, as_root=as_root,
chan=channel, make_init_kwargs=make_init_kwargs,
pid=pid,
stdin=stdin,
# We give the reading end to the consumer of the data
stdout=out_streams['stdout'][0],
stderr=out_streams['stderr'][0],
redirect_thread=redirect_thread,
cmd=orig_command,
) )
def _close(self): def _close(self):