From 9ec36e90402326803d3f7d44a097380f54713f47 Mon Sep 17 00:00:00 2001
From: Douglas Raillard <douglas.raillard@arm.com>
Date: Thu, 30 May 2024 10:41:50 +0100
Subject: [PATCH] connection: Support all signals in
 BackgroundCommand.send_signal()

Support sending any signal to background commands, instead of only
supporting properly SIGKILL/SIGTERM/SIGQUIT.

The main issue is figuring out what PID to send the signal to, as the
devlib API allows running a whole snippet of shell script that typically
is wrapped under many layers of sh -c and sudo calls. In order to lift
the ambiguity, the user has access to a "devlib-signal-target" command
that points devlib at what process should be the target of signals:

    # Run a "setup" command, then the main command that will receive the
    # signals
    cmd = 'echo setup; devlib-signal-target echo hello world'
    with target.background(cmd) as bg:
	bg.communicate()

The devlib-signal-target script can only be used once per background
command, so that it is never ambiguous what process is targeted, and so
that the Python code can cache the target PID.  Subsequent invocations
of devlib-signal-target will fail.
---
 devlib/bin/scripts/devlib-signal-target |  20 +++
 devlib/connection.py                    | 147 ++++++++++-----
 devlib/host.py                          |  27 ++-
 devlib/target.py                        |  11 +-
 devlib/utils/android.py                 |  37 ++--
 devlib/utils/ssh.py                     | 230 ++++++++++++------------
 6 files changed, 290 insertions(+), 182 deletions(-)
 create mode 100644 devlib/bin/scripts/devlib-signal-target

diff --git a/devlib/bin/scripts/devlib-signal-target b/devlib/bin/scripts/devlib-signal-target
new file mode 100644
index 0000000..26ac6fe
--- /dev/null
+++ b/devlib/bin/scripts/devlib-signal-target
@@ -0,0 +1,20 @@
+(
+    # If there is no data dir, it means we are not running as a background
+    # command so we just do nothing
+    if [ -e "$_DEVLIB_BG_CMD_DATA_DIR" ]; then
+        pid_file="$_DEVLIB_BG_CMD_DATA_DIR/pid"
+        # Atomically check if the PID file already exist and make the write
+        # fail if it already does. This way we don't have any race condition
+        # with the Python API, as there is either no PID or the same PID for
+        # the duration of the command
+        set -o noclobber
+        if ! printf "%u\n" $$ > "$pid_file"; then
+            echo "$0 was already called for this command" >&2
+            exit 1
+        fi
+    fi
+) || exit $?
+
+# Use exec so that the PID of the command we run is the same as the current $$
+# PID that we just registered
+exec "$@"
diff --git a/devlib/connection.py b/devlib/connection.py
index 4609975..99055a3 100644
--- a/devlib/connection.py
+++ b/devlib/connection.py
@@ -17,6 +17,7 @@ from abc import ABC, abstractmethod
 from contextlib import contextmanager, nullcontext
 from shlex import quote
 import os
+from pathlib import Path
 import signal
 import subprocess
 import threading
@@ -25,14 +26,11 @@ import logging
 import select
 import fcntl
 
-from devlib.utils.misc import InitCheckpoint
+from devlib.utils.misc import InitCheckpoint, memoized
 
 _KILL_TIMEOUT = 3
 
 
-def _kill_pgid_cmd(pgid, sig, busybox):
-    return '{} kill -{} -{}'.format(busybox, sig.value, pgid)
-
 def _popen_communicate(bg, popen, input, timeout):
     try:
         stdout, stderr = popen.communicate(input=input, timeout=timeout)
@@ -130,8 +128,11 @@ class BackgroundCommand(ABC):
     semantic as :class:`subprocess.Popen`.
     """
 
-    def __init__(self, conn):
+    def __init__(self, conn, data_dir, cmd, as_root):
         self.conn = conn
+        self._data_dir = data_dir
+        self.as_root = as_root
+        self.cmd = cmd
 
         # Poll currently opened background commands on that connection to make
         # them deregister themselves if they are completed. This avoids
@@ -147,15 +148,65 @@ class BackgroundCommand(ABC):
 
         conn._current_bg_cmds.add(self)
 
+    @classmethod
+    def from_factory(cls, conn, cmd, as_root, make_init_kwargs):
+        cmd, data_dir = cls._with_data_dir(conn, cmd)
+        return cls(
+            conn=conn,
+            data_dir=data_dir,
+            cmd=cmd,
+            as_root=as_root,
+            **make_init_kwargs(cmd),
+        )
+
     def _deregister(self):
         try:
             self.conn._current_bg_cmds.remove(self)
         except KeyError:
             pass
 
-    @abstractmethod
-    def _send_signal(self, sig):
-        pass
+    @property
+    def _pid_file(self):
+        return str(Path(self._data_dir, 'pid'))
+
+    @property
+    @memoized
+    def _targeted_pid(self):
+        """
+        PID of the process pointed at by ``devlib-signal-target`` command.
+        """
+        path = quote(self._pid_file)
+        busybox = quote(self.conn.busybox)
+
+        def execute(cmd):
+            return self.conn.execute(cmd, as_root=self.as_root)
+
+        while self.poll() is None:
+            try:
+                pid = execute(f'{busybox} cat {path}')
+            except subprocess.CalledProcessError:
+                time.sleep(0.01)
+            else:
+                if pid.endswith('\n'):
+                    return int(pid.strip())
+                else:
+                    # We got a partial write in the PID file
+                    continue
+
+        raise ValueError(f'The background commmand did not use devlib-signal-target wrapper to designate which command should be the target of signals')
+
+    @classmethod
+    def _with_data_dir(cls, conn, cmd):
+        busybox = quote(conn.busybox)
+        data_dir = conn.execute(f'{busybox} mktemp -d').strip()
+        cmd = f'_DEVLIB_BG_CMD_DATA_DIR={data_dir} exec {busybox} sh -c {quote(cmd)}'
+        return cmd, data_dir
+
+    def _cleanup_data_dir(self):
+        path = quote(self._data_dir)
+        busybox = quote(self.conn.busybox)
+        cmd = f'{busybox} rm -r {path} || true'
+        self.conn.execute(cmd, as_root=self.as_root)
 
     def send_signal(self, sig):
         """
@@ -165,8 +216,29 @@ class BackgroundCommand(ABC):
         :param signal: Signal to send.
         :type signal: signal.Signals
         """
+
+        def execute(cmd):
+            return self.conn.execute(cmd, as_root=self.as_root)
+
+        def send(sig):
+            busybox = quote(self.conn.busybox)
+            # If the command has already completed, we don't want to send a
+            # signal to another process that might have gotten that PID in the
+            # meantime.
+            if self.poll() is None:
+                if sig in (signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL):
+                    # Use -PGID to target a process group rather than just the
+                    # process itself. This will work in any condition and will
+                    # not require cooperation from the command.
+                    execute(f'{busybox} kill -{sig.value} -{self.pid}')
+                else:
+                    # Other signals require cooperation from the shell command
+                    # so that it points to a specific process using
+                    # devlib-signal-target
+                    pid = self._targeted_pid
+                    execute(f'{busybox} kill -{sig.value} {pid}')
         try:
-            return self._send_signal(sig)
+            return send(sig)
         finally:
             # Deregister if the command has finished
             self.poll()
@@ -287,6 +359,7 @@ class BackgroundCommand(ABC):
             return self._close()
         finally:
             self._deregister()
+            self._cleanup_data_dir()
 
     def __enter__(self):
         return self
@@ -300,13 +373,15 @@ class PopenBackgroundCommand(BackgroundCommand):
     :class:`subprocess.Popen`-based background command.
     """
 
-    def __init__(self, conn, popen):
-        super().__init__(conn=conn)
+    def __init__(self, conn, data_dir, cmd, as_root, popen):
+        super().__init__(
+            conn=conn,
+            data_dir=data_dir,
+            cmd=cmd,
+            as_root=as_root,
+        )
         self.popen = popen
 
-    def _send_signal(self, sig):
-        return os.killpg(self.popen.pid, sig)
-
     @property
     def stdin(self):
         return self.popen.stdin
@@ -354,26 +429,20 @@ class ParamikoBackgroundCommand(BackgroundCommand):
     """
     :mod:`paramiko`-based background command.
     """
-    def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread):
-        super().__init__(conn=conn)
+    def __init__(self, conn, data_dir, cmd, as_root, chan, pid, stdin, stdout, stderr, redirect_thread):
+        super().__init__(
+            conn=conn,
+            data_dir=data_dir,
+            cmd=cmd,
+            as_root=as_root,
+        )
+
         self.chan = chan
-        self.as_root = as_root
         self._pid = pid
         self._stdin = stdin
         self._stdout = stdout
         self._stderr = stderr
         self.redirect_thread = redirect_thread
-        self.cmd = cmd
-
-    def _send_signal(self, sig):
-        # If the command has already completed, we don't want to send a signal
-        # to another process that might have gotten that PID in the meantime.
-        if self.poll() is not None:
-            return
-        # Use -PGID to target a process group rather than just the process
-        # itself
-        cmd = _kill_pgid_cmd(self.pid, sig, self.conn.busybox)
-        self.conn.execute(cmd, as_root=self.as_root)
 
     @property
     def pid(self):
@@ -517,18 +586,16 @@ class AdbBackgroundCommand(BackgroundCommand):
     ``adb``-based background command.
     """
 
-    def __init__(self, conn, adb_popen, pid, as_root):
-        super().__init__(conn=conn)
-        self.as_root = as_root
+    def __init__(self, conn, data_dir, cmd, as_root, adb_popen, pid):
+        super().__init__(
+            conn=conn,
+            data_dir=data_dir,
+            cmd=cmd,
+            as_root=as_root,
+        )
         self.adb_popen = adb_popen
         self._pid = pid
 
-    def _send_signal(self, sig):
-        self.conn.execute(
-            _kill_pgid_cmd(self.pid, sig, self.conn.busybox),
-            as_root=self.as_root,
-        )
-
     @property
     def stdin(self):
         return self.adb_popen.stdin
@@ -638,7 +705,7 @@ class TransferHandleBase(ABC):
 
 
 class PopenTransferHandle(TransferHandleBase):
-    def __init__(self, bg_cmd, dest, direction, *args, **kwargs):
+    def __init__(self, popen, dest, direction, *args, **kwargs):
         super().__init__(*args, **kwargs)
 
         if direction == 'push':
@@ -650,7 +717,7 @@ class PopenTransferHandle(TransferHandleBase):
 
         self.sample_size = lambda: sample_size(dest)
 
-        self.bg_cmd = bg_cmd
+        self.popen = popen
         self.last_sample = 0
 
     @staticmethod
@@ -671,7 +738,7 @@ class PopenTransferHandle(TransferHandleBase):
         return int(out.split()[0])
 
     def cancel(self):
-        self.bg_cmd.cancel()
+        self.popen.terminate()
 
     def isactive(self):
         try:
diff --git a/devlib/host.py b/devlib/host.py
index a9958a3..a65b00f 100644
--- a/devlib/host.py
+++ b/devlib/host.py
@@ -147,16 +147,25 @@ class LocalConnection(ConnectionBase):
         def preexec_fn():
             os.setpgrp()
 
-        popen = subprocess.Popen(
-            command,
-            stdout=stdout,
-            stderr=stderr,
-            stdin=subprocess.PIPE,
-            shell=True,
-            preexec_fn=preexec_fn,
+        def make_init_kwargs(command):
+            popen = subprocess.Popen(
+                command,
+                stdout=stdout,
+                stderr=stderr,
+                stdin=subprocess.PIPE,
+                shell=True,
+                preexec_fn=preexec_fn,
+            )
+            return dict(
+                popen=popen,
+            )
+
+        return PopenBackgroundCommand.from_factory(
+            conn=self,
+            cmd=command,
+            as_root=as_root,
+            make_init_kwargs=make_init_kwargs,
         )
-        bg_cmd = PopenBackgroundCommand(self, popen)
-        return bg_cmd
 
     def _close(self):
         pass
diff --git a/devlib/target.py b/devlib/target.py
index adceac9..110d1ad 100644
--- a/devlib/target.py
+++ b/devlib/target.py
@@ -281,7 +281,7 @@ class Target(object):
     @property
     def shutils(self):
         if self._shutils is None:
-            self._setup_shutils()
+            self._setup_scripts()
         return self._shutils
 
     def is_running(self, comm):
@@ -591,7 +591,7 @@ class Target(object):
 
     @asyn.asyncf
     async def setup(self, executables=None):
-        await self._setup_shutils.asyn()
+        await self._setup_scripts.asyn()
 
         for host_exe in (executables or []):  # pylint: disable=superfluous-parens
             await self.install.asyn(host_exe)
@@ -1586,8 +1586,9 @@ fi
     # internal methods
 
     @asyn.asyncf
-    async def _setup_shutils(self):
-        shutils_ifile = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts', 'shutils.in')
+    async def _setup_scripts(self):
+        scripts = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts')
+        shutils_ifile = os.path.join(scripts, 'shutils.in')
         with open(shutils_ifile) as fh:
             lines = fh.readlines()
         with tempfile.TemporaryDirectory() as folder:
@@ -1598,6 +1599,8 @@ fi
                     ofile.write(line)
             self._shutils = await self.install.asyn(shutils_ofile)
 
+        await self.install.asyn(os.path.join(scripts, 'devlib-signal-target'))
+
     @asyn.asyncf
     @call_conn
     async def _execute_util(self, command, timeout=None, check_exit_code=True, as_root=False):
diff --git a/devlib/utils/android.py b/devlib/utils/android.py
index c77a864..001cb93 100755
--- a/devlib/utils/android.py
+++ b/devlib/utils/android.py
@@ -40,7 +40,7 @@ from shlex import quote
 
 from devlib.exception import TargetTransientError, TargetStableError, HostError, TargetTransientCalledProcessError, TargetStableCalledProcessError, AdbRootError
 from devlib.utils.misc import check_output, which, ABI_MAP, redirect_streams, get_subprocess
-from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenBackgroundCommand, PopenTransferHandle
+from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenTransferHandle
 
 
 logger = logging.getLogger('android')
@@ -341,7 +341,7 @@ class AdbConnection(ConnectionBase):
         if timeout:
             adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server, adb_port=self.adb_port)
         else:
-            bg_cmd = adb_command_background(
+            popen = adb_command_popen(
                 device=self.device,
                 conn=self,
                 command=command,
@@ -351,12 +351,12 @@ class AdbConnection(ConnectionBase):
 
             handle = PopenTransferHandle(
                 manager=self.transfer_manager,
-                bg_cmd=bg_cmd,
+                popen=popen,
                 dest=dest,
                 direction=action
             )
-            with bg_cmd, self.transfer_manager.manage(sources, dest, action, handle):
-                bg_cmd.communicate()
+            with popen, self.transfer_manager.manage(sources, dest, action, handle):
+                popen.communicate()
 
     # pylint: disable=unused-argument
     def execute(self, command, timeout=None, check_exit_code=False,
@@ -387,12 +387,18 @@ class AdbConnection(ConnectionBase):
         return bg_cmd
 
     def _background(self, command, stdout, stderr, as_root):
-        adb_shell, pid = adb_background_shell(self, command, stdout, stderr, as_root)
-        bg_cmd = AdbBackgroundCommand(
+        def make_init_kwargs(command):
+            adb_popen, pid = adb_background_shell(self, command, stdout, stderr, as_root)
+            return dict(
+                adb_popen=adb_popen,
+                pid=pid,
+            )
+
+        bg_cmd = AdbBackgroundCommand.from_factory(
             conn=self,
-            adb_popen=adb_shell,
-            pid=pid,
-            as_root=as_root
+            cmd=command,
+            as_root=as_root,
+            make_init_kwargs=make_init_kwargs,
         )
         return bg_cmd
 
@@ -756,12 +762,11 @@ def adb_command(device, command, timeout=None, adb_server=None, adb_port=None):
     return output
 
 
-def adb_command_background(device, conn, command, adb_server=None, adb_port=None):
-    full_command = get_adb_command(device, command, adb_server, adb_port)
-    logger.debug(full_command)
-    popen = get_subprocess(full_command, shell=True)
-    cmd = PopenBackgroundCommand(conn=conn, popen=popen)
-    return cmd
+def adb_command_popen(device, conn, command, adb_server=None, adb_port=None):
+    command = get_adb_command(device, command, adb_server, adb_port)
+    logger.debug(command)
+    popen = get_subprocess(command, shell=True)
+    return popen
 
 
 def grant_app_permissions(target, package):
diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py
index 81aa405..caa0f44 100644
--- a/devlib/utils/ssh.py
+++ b/devlib/utils/ssh.py
@@ -613,138 +613,142 @@ class SshConnection(SshConnectionBase):
             return self._background(command, stdout, stderr, as_root)
 
     def _background(self, command, stdout, stderr, as_root):
-        orig_command = command
-        stdout, stderr, command = redirect_streams(stdout, stderr, command)
+        def make_init_kwargs(command):
+            _stdout, _stderr, _command = redirect_streams(stdout, stderr, command)
 
-        command = "printf '%s\n' $$; exec sh -c {}".format(quote(command))
-        channel = self._make_channel()
+            _command = "printf '%s\n' $$; exec sh -c {}".format(quote(_command))
+            channel = self._make_channel()
 
-        def executor(cmd, timeout):
-            channel.exec_command(cmd)
-            # Read are not buffered so we will always get the data as soon as
-            # they arrive
-            return (
-                channel.makefile_stdin('w', 0),
-                channel.makefile(),
-                channel.makefile_stderr(),
+            def executor(cmd, timeout):
+                channel.exec_command(cmd)
+                # Read are not buffered so we will always get the data as soon as
+                # they arrive
+                return (
+                    channel.makefile_stdin('w', 0),
+                    channel.makefile(),
+                    channel.makefile_stderr(),
+                )
+
+            stdin, stdout_in, stderr_in = self._execute_command(
+                _command,
+                as_root=as_root,
+                log=False,
+                timeout=None,
+                executor=executor,
             )
+            pid = stdout_in.readline()
+            if not pid:
+                _stderr = stderr_in.read()
+                if channel.exit_status_ready():
+                    ret = channel.recv_exit_status()
+                else:
+                    ret = 126
+                raise subprocess.CalledProcessError(
+                    ret,
+                    _command,
+                    b'',
+                    _stderr,
+                )
+            pid = int(pid)
 
-        stdin, stdout_in, stderr_in = self._execute_command(
-            command,
-            as_root=as_root,
-            log=False,
-            timeout=None,
-            executor=executor,
-        )
-        pid = stdout_in.readline()
-        if not pid:
-            stderr = stderr_in.read()
-            if channel.exit_status_ready():
-                ret = channel.recv_exit_status()
-            else:
-                ret = 126
-            raise subprocess.CalledProcessError(
-                ret,
-                command,
-                b'',
-                stderr,
-            )
-        pid = int(pid)
+            def create_out_stream(stream_in, stream_out):
+                """
+                Create a pair of file-like objects. The first one is used to read
+                data and the second one to write.
+                """
 
-        def create_out_stream(stream_in, stream_out):
-            """
-            Create a pair of file-like objects. The first one is used to read
-            data and the second one to write.
-            """
+                if stream_out == subprocess.DEVNULL:
+                    r, w = None, None
+                # When asked for a pipe, we just give the file-like object as the
+                # reading end and no writing end, since paramiko already writes to
+                # it
+                elif stream_out == subprocess.PIPE:
+                    r, w = os.pipe()
+                    r = os.fdopen(r, 'rb')
+                    w = os.fdopen(w, 'wb')
+                # Turn a file descriptor into a file-like object
+                elif isinstance(stream_out, int) and stream_out >= 0:
+                    r = os.fdopen(stream_in, 'rb')
+                    w = os.fdopen(stream_out, 'wb')
+                # file-like object
+                else:
+                    r = stream_in
+                    w = stream_out
 
-            if stream_out == subprocess.DEVNULL:
-                r, w = None, None
-            # When asked for a pipe, we just give the file-like object as the
-            # reading end and no writing end, since paramiko already writes to
-            # it
-            elif stream_out == subprocess.PIPE:
-                r, w = os.pipe()
-                r = os.fdopen(r, 'rb')
-                w = os.fdopen(w, 'wb')
-            # Turn a file descriptor into a file-like object
-            elif isinstance(stream_out, int) and stream_out >= 0:
-                r = os.fdopen(stream_in, 'rb')
-                w = os.fdopen(stream_out, 'wb')
-            # file-like object
-            else:
-                r = stream_in
-                w = stream_out
+                return (r, w)
 
-            return (r, w)
+            out_streams = {
+                name: create_out_stream(stream_in, stream_out)
+                for stream_in, stream_out, name in (
+                    (stdout_in, _stdout, 'stdout'),
+                    (stderr_in, _stderr, 'stderr'),
+                )
+            }
 
-        out_streams = {
-            name: create_out_stream(stream_in, stream_out)
-            for stream_in, stream_out, name in (
-                (stdout_in, stdout, 'stdout'),
-                (stderr_in, stderr, 'stderr'),
-            )
-        }
+            def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout):
+                def callback(out_streams, name, chunk):
+                    try:
+                        r, w = out_streams[name]
+                    except KeyError:
+                        return out_streams
+
+                    try:
+                        w.write(chunk)
+                    # Write failed
+                    except ValueError:
+                        # Since that stream is now closed, stop trying to write to it
+                        del out_streams[name]
+                        # If that was the last open stream, we raise an
+                        # exception so the thread can terminate.
+                        if not out_streams:
+                            raise
 
-        def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout):
-            def callback(out_streams, name, chunk):
-                try:
-                    r, w = out_streams[name]
-                except KeyError:
                     return out_streams
 
                 try:
-                    w.write(chunk)
-                # Write failed
+                    _read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams))
+                # The streams closed while we were writing to it, the job is done here
                 except ValueError:
-                    # Since that stream is now closed, stop trying to write to it
-                    del out_streams[name]
-                    # If that was the last open stream, we raise an
-                    # exception so the thread can terminate.
-                    if not out_streams:
-                        raise
+                    pass
 
-                return out_streams
+                # Make sure the writing end are closed proper since we are not
+                # going to write anything anymore
+                for r, w in out_streams.values():
+                    w.flush()
+                    if r is not w and w is not None:
+                        w.close()
 
-            try:
-                _read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams))
-            # The streams closed while we were writing to it, the job is done here
-            except ValueError:
-                pass
+            # If there is anything we need to redirect to, spawn a thread taking
+            # care of that
+            select_timeout = 1
+            thread_out_streams = {
+                name: (r, w)
+                for name, (r, w) in out_streams.items()
+                if w is not None
+            }
+            redirect_thread = threading.Thread(
+                target=redirect_thread_f,
+                args=(stdout_in, stderr_in, thread_out_streams, select_timeout),
+                # The thread will die when the main thread dies
+                daemon=True,
+            )
+            redirect_thread.start()
 
-            # Make sure the writing end are closed proper since we are not
-            # going to write anything anymore
-            for r, w in out_streams.values():
-                w.flush()
-                if r is not w and w is not None:
-                    w.close()
+            return dict(
+                chan=channel,
+                pid=pid,
+                stdin=stdin,
+                # We give the reading end to the consumer of the data
+                stdout=out_streams['stdout'][0],
+                stderr=out_streams['stderr'][0],
+                redirect_thread=redirect_thread,
+            )
 
-        # If there is anything we need to redirect to, spawn a thread taking
-        # care of that
-        select_timeout = 1
-        thread_out_streams = {
-            name: (r, w)
-            for name, (r, w) in out_streams.items()
-            if w is not None
-        }
-        redirect_thread = threading.Thread(
-            target=redirect_thread_f,
-            args=(stdout_in, stderr_in, thread_out_streams, select_timeout),
-            # The thread will die when the main thread dies
-            daemon=True,
-        )
-        redirect_thread.start()
-
-        return ParamikoBackgroundCommand(
+        return ParamikoBackgroundCommand.from_factory(
             conn=self,
+            cmd=command,
             as_root=as_root,
-            chan=channel,
-            pid=pid,
-            stdin=stdin,
-            # We give the reading end to the consumer of the data
-            stdout=out_streams['stdout'][0],
-            stderr=out_streams['stderr'][0],
-            redirect_thread=redirect_thread,
-            cmd=orig_command,
+            make_init_kwargs=make_init_kwargs,
         )
 
     def _close(self):