1
0
mirror of https://github.com/ARM-software/devlib.git synced 2025-01-31 02:00:45 +00:00

android: Add adb_port connection setting

Allow specifying the port of the adb server in use.
This commit is contained in:
Douglas Raillard 2023-08-04 10:40:59 +01:00 committed by Marc Bonnici
parent 0579a814f1
commit 9f71c818c4

View File

@ -287,6 +287,7 @@ class AdbConnection(ConnectionBase):
timeout=None, timeout=None,
platform=None, platform=None,
adb_server=None, adb_server=None,
adb_port=None,
adb_as_root=False, adb_as_root=False,
connection_attempts=MAX_ATTEMPTS, connection_attempts=MAX_ATTEMPTS,
@ -303,9 +304,10 @@ class AdbConnection(ConnectionBase):
) )
self.timeout = timeout if timeout is not None else self.default_timeout self.timeout = timeout if timeout is not None else self.default_timeout
if device is None: if device is None:
device = adb_get_device(timeout=timeout, adb_server=adb_server) device = adb_get_device(timeout=timeout, adb_server=adb_server, adb_port=adb_port)
self.device = device self.device = device
self.adb_server = adb_server self.adb_server = adb_server
self.adb_port = adb_port
self.adb_as_root = adb_as_root self.adb_as_root = adb_as_root
lock, nr_active = AdbConnection.active_connections lock, nr_active = AdbConnection.active_connections
with lock: with lock:
@ -320,7 +322,7 @@ class AdbConnection(ConnectionBase):
# lead to commands hanging forever in some situations. # lead to commands hanging forever in some situations.
except AdbRootError: except AdbRootError:
pass pass
adb_connect(self.device, adb_server=self.adb_server, attempts=connection_attempts) adb_connect(self.device, adb_server=self.adb_server, adb_port=self.adb_port, attempts=connection_attempts)
self._setup_ls() self._setup_ls()
self._setup_su() self._setup_su()
@ -340,13 +342,14 @@ class AdbConnection(ConnectionBase):
command = "{} {}".format(action, paths) command = "{} {}".format(action, paths)
if timeout: if timeout:
adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server) adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server, adb_port=self.adb_port)
else: else:
bg_cmd = adb_command_background( bg_cmd = adb_command_background(
device=self.device, device=self.device,
conn=self, conn=self,
command=command, command=command,
adb_server=self.adb_server adb_server=self.adb_server,
adb_port=self.adb_port,
) )
handle = PopenTransferHandle( handle = PopenTransferHandle(
@ -365,7 +368,7 @@ class AdbConnection(ConnectionBase):
as_root = False as_root = False
try: try:
return adb_shell(self.device, command, timeout, check_exit_code, return adb_shell(self.device, command, timeout, check_exit_code,
as_root, adb_server=self.adb_server, su_cmd=self.su_cmd) as_root, adb_server=self.adb_server, adb_port=self.adb_port, su_cmd=self.su_cmd)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
cls = TargetTransientCalledProcessError if will_succeed else TargetStableCalledProcessError cls = TargetTransientCalledProcessError if will_succeed else TargetStableCalledProcessError
raise cls( raise cls(
@ -407,7 +410,7 @@ class AdbConnection(ConnectionBase):
if disconnect: if disconnect:
if self.adb_as_root: if self.adb_as_root:
self.adb_root(enable=False) self.adb_root(enable=False)
adb_disconnect(self.device, self.adb_server) adb_disconnect(self.device, self.adb_server, self.adb_port)
def cancel_running_command(self): def cancel_running_command(self):
# adbd multiplexes commands so that they don't interfer with each # adbd multiplexes commands so that they don't interfer with each
@ -425,7 +428,7 @@ class AdbConnection(ConnectionBase):
cmd = 'root' if enable else 'unroot' cmd = 'root' if enable else 'unroot'
try: try:
output = adb_command(self.device, cmd, timeout=30, adb_server=self.adb_server) output = adb_command(self.device, cmd, timeout=30, adb_server=self.adb_server, adb_port=self.adb_port)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
# Ignore if we're already root # Ignore if we're already root
if 'adbd is already running as root' in e.output: if 'adbd is already running as root' in e.output:
@ -439,10 +442,10 @@ class AdbConnection(ConnectionBase):
AdbConnection._connected_as_root[self.device] = enable AdbConnection._connected_as_root[self.device] = enable
def wait_for_device(self, timeout=30): def wait_for_device(self, timeout=30):
adb_command(self.device, 'wait-for-device', timeout, self.adb_server) adb_command(self.device, 'wait-for-device', timeout, self.adb_server, self.adb_port)
def reboot_bootloader(self, timeout=30): def reboot_bootloader(self, timeout=30):
adb_command(self.device, 'reboot-bootloader', timeout, self.adb_server) adb_command(self.device, 'reboot-bootloader', timeout, self.adb_server, self.adb_port)
# Again, we need to handle boards where the default output format from ls is # Again, we need to handle boards where the default output format from ls is
# single column *and* boards where the default output is multi-column. # single column *and* boards where the default output is multi-column.
@ -451,7 +454,7 @@ class AdbConnection(ConnectionBase):
def _setup_ls(self): def _setup_ls(self):
command = "shell '(ls -1); echo \"\n$?\"'" command = "shell '(ls -1); echo \"\n$?\"'"
try: try:
output = adb_command(self.device, command, timeout=self.timeout, adb_server=self.adb_server) output = adb_command(self.device, command, timeout=self.timeout, adb_server=self.adb_server, adb_port=self.adb_port)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise HostError( raise HostError(
'Failed to set up ls command on Android device. Output:\n' 'Failed to set up ls command on Android device. Output:\n'
@ -493,7 +496,7 @@ def fastboot_flash_partition(partition, path_to_image):
fastboot_command(command) fastboot_command(command)
def adb_get_device(timeout=None, adb_server=None): def adb_get_device(timeout=None, adb_server=None, adb_port=None):
""" """
Returns the serial number of a connected android device. Returns the serial number of a connected android device.
@ -504,7 +507,7 @@ def adb_get_device(timeout=None, adb_server=None):
# Ensure server is started so the 'daemon started successfully' message # Ensure server is started so the 'daemon started successfully' message
# doesn't confuse the parsing below # doesn't confuse the parsing below
adb_command(None, 'start-server', adb_server=adb_server) adb_command(None, 'start-server', adb_server=adb_server, adb_port=adb_port)
# The output of calling adb devices consists of a heading line then # The output of calling adb devices consists of a heading line then
# a list of the devices sperated by new line # a list of the devices sperated by new line
@ -512,7 +515,7 @@ def adb_get_device(timeout=None, adb_server=None):
# then the output length is 2 + (1 for each device) # then the output length is 2 + (1 for each device)
start = time.time() start = time.time()
while True: while True:
output = adb_command(None, "devices", adb_server=adb_server).splitlines() # pylint: disable=E1103 output = adb_command(None, "devices", adb_server=adb_server, adb_port=adb_port).splitlines() # pylint: disable=E1103
output_length = len(output) output_length = len(output)
if output_length == 3: if output_length == 3:
# output[1] is the 2nd line in the output which has the device name # output[1] is the 2nd line in the output which has the device name
@ -529,7 +532,7 @@ def adb_get_device(timeout=None, adb_server=None):
time.sleep(1) time.sleep(1)
def adb_connect(device, timeout=None, attempts=MAX_ATTEMPTS, adb_server=None): def adb_connect(device, timeout=None, attempts=MAX_ATTEMPTS, adb_server=None, adb_port=None):
_check_env() _check_env()
tries = 0 tries = 0
output = None output = None
@ -542,12 +545,12 @@ def adb_connect(device, timeout=None, attempts=MAX_ATTEMPTS, adb_server=None):
# adb connection may have gone "stale", resulting in adb blocking # adb connection may have gone "stale", resulting in adb blocking
# indefinitely when making calls to the device. To avoid this, # indefinitely when making calls to the device. To avoid this,
# always disconnect first. # always disconnect first.
adb_disconnect(device, adb_server) adb_disconnect(device, adb_server, adb_port)
adb_cmd = get_adb_command(None, 'connect', adb_server) adb_cmd = get_adb_command(None, 'connect', adb_server, adb_port)
command = '{} {}'.format(adb_cmd, quote(device)) command = '{} {}'.format(adb_cmd, quote(device))
logger.debug(command) logger.debug(command)
output, _ = check_output(command, shell=True, timeout=timeout) output, _ = check_output(command, shell=True, timeout=timeout)
if _ping(device, adb_server): if _ping(device, adb_server, adb_port):
break break
time.sleep(10) time.sleep(10)
else: # did not connect to the device else: # did not connect to the device
@ -557,12 +560,12 @@ def adb_connect(device, timeout=None, attempts=MAX_ATTEMPTS, adb_server=None):
raise HostError(message) raise HostError(message)
def adb_disconnect(device, adb_server=None): def adb_disconnect(device, adb_server=None, adb_port=None):
_check_env() _check_env()
if not device: if not device:
return return
if ":" in device and device in adb_list_devices(adb_server): if ":" in device and device in adb_list_devices(adb_server, adb_port):
adb_cmd = get_adb_command(None, 'disconnect', adb_server) adb_cmd = get_adb_command(None, 'disconnect', adb_server, adb_port)
command = "{} {}".format(adb_cmd, device) command = "{} {}".format(adb_cmd, device)
logger.debug(command) logger.debug(command)
retval = subprocess.call(command, stdout=open(os.devnull, 'wb'), shell=True) retval = subprocess.call(command, stdout=open(os.devnull, 'wb'), shell=True)
@ -570,9 +573,9 @@ def adb_disconnect(device, adb_server=None):
raise TargetTransientError('"{}" returned {}'.format(command, retval)) raise TargetTransientError('"{}" returned {}'.format(command, retval))
def _ping(device, adb_server=None): def _ping(device, adb_server=None, adb_port=None):
_check_env() _check_env()
adb_cmd = get_adb_command(device, 'shell', adb_server) adb_cmd = get_adb_command(device, 'shell', adb_server, adb_port)
command = "{} {}".format(adb_cmd, quote('ls /data/local/tmp > /dev/null')) command = "{} {}".format(adb_cmd, quote('ls /data/local/tmp > /dev/null'))
logger.debug(command) logger.debug(command)
result = subprocess.call(command, stderr=subprocess.PIPE, shell=True) result = subprocess.call(command, stderr=subprocess.PIPE, shell=True)
@ -584,7 +587,7 @@ def _ping(device, adb_server=None):
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
def adb_shell(device, command, timeout=None, check_exit_code=False, def adb_shell(device, command, timeout=None, check_exit_code=False,
as_root=False, adb_server=None, su_cmd='su -c {}'): # NOQA as_root=False, adb_server=None, adb_port=None, su_cmd='su -c {}'): # NOQA
_check_env() _check_env()
# On older combinations of ADB/Android versions, the adb host command always # On older combinations of ADB/Android versions, the adb host command always
@ -594,17 +597,14 @@ def adb_shell(device, command, timeout=None, check_exit_code=False,
# code of the executed command itself. # code of the executed command itself.
command = r'({}); echo "\n$?"'.format(command) command = r'({}); echo "\n$?"'.format(command)
parts = ['adb'] command = su_cmd.format(quote(command)) if as_root else command
if adb_server is not None: command = ('shell', command)
parts += ['-H', adb_server] parts, env = _get_adb_parts(command, device, adb_server, adb_port, quote_adb=False)
if device is not None: env = {**os.environ, **env}
parts += ['-s', device]
parts += ['shell',
command if not as_root else su_cmd.format(quote(command))]
logger.debug(' '.join(quote(part) for part in parts)) logger.debug(' '.join(quote(part) for part in parts))
try: try:
raw_output, error = check_output(parts, timeout, shell=False) raw_output, error = check_output(parts, timeout, shell=False, env=env)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise TargetStableError(str(e)) raise TargetStableError(str(e))
@ -654,6 +654,7 @@ def adb_background_shell(conn, command,
"""Runs the specified command in a subprocess, returning the the Popen object.""" """Runs the specified command in a subprocess, returning the the Popen object."""
device = conn.device device = conn.device
adb_server = conn.adb_server adb_server = conn.adb_server
adb_port = conn.adb_port
busybox = conn.busybox busybox = conn.busybox
_check_env() _check_env()
@ -678,7 +679,7 @@ def adb_background_shell(conn, command,
command = f"{busybox} kill -STOP $$ && exec {busybox} sh -c {quote(command)}" command = f"{busybox} kill -STOP $$ && exec {busybox} sh -c {quote(command)}"
command_uuid, command = with_uuid(command) command_uuid, command = with_uuid(command)
adb_cmd = get_adb_command(device, 'shell', adb_server) adb_cmd = get_adb_command(device, 'shell', adb_server, adb_port)
full_command = f'{adb_cmd} {quote(command)}' full_command = f'{adb_cmd} {quote(command)}'
logger.debug(full_command) logger.debug(full_command)
p = subprocess.Popen(full_command, stdout=stdout, stderr=stderr, stdin=subprocess.PIPE, shell=True) p = subprocess.Popen(full_command, stdout=stdout, stderr=stderr, stdin=subprocess.PIPE, shell=True)
@ -715,11 +716,11 @@ def adb_background_shell(conn, command,
return (p, pid) return (p, pid)
def adb_kill_server(timeout=30, adb_server=None): def adb_kill_server(timeout=30, adb_server=None, adb_port=None):
adb_command(None, 'kill-server', timeout, adb_server) adb_command(None, 'kill-server', timeout, adb_server, adb_port)
def adb_list_devices(adb_server=None): def adb_list_devices(adb_server=None, adb_port=None):
output = adb_command(None, 'devices', adb_server=adb_server) output = adb_command(None, 'devices', adb_server=adb_server, adb_port=adb_port)
devices = [] devices = []
for line in output.splitlines(): for line in output.splitlines():
parts = [p.strip() for p in line.split()] parts = [p.strip() for p in line.split()]
@ -728,24 +729,36 @@ def adb_list_devices(adb_server=None):
return devices return devices
def get_adb_command(device, command, adb_server=None): def _get_adb_parts(command, device=None, adb_server=None, adb_port=None, quote_adb=True):
_quote = quote if quote_adb else lambda x: x
parts = (
'adb',
*(('-H', _quote(adb_server)) if adb_server is not None else ()),
*(('-P', _quote(str(adb_port))) if adb_port is not None else ()),
*(('-s', _quote(device)) if device is not None else ()),
*command,
)
env = {'LC_ALL': 'C'}
return (parts, env)
def get_adb_command(device, command, adb_server=None, adb_port=None):
_check_env() _check_env()
device_string = "" parts, env = _get_adb_parts((command,), device, adb_server, adb_port, quote_adb=True)
if adb_server != None: env = [quote(f'{name}={val}') for name, val in sorted(env.items())]
device_string = ' -H {}'.format(adb_server) parts = [*env, *parts]
device_string += ' -s {}'.format(device) if device else '' return ' '.join(parts)
return "LC_ALL=C adb{} {}".format(device_string, command)
def adb_command(device, command, timeout=None, adb_server=None): def adb_command(device, command, timeout=None, adb_server=None, adb_port=None):
full_command = get_adb_command(device, command, adb_server) full_command = get_adb_command(device, command, adb_server, adb_port)
logger.debug(full_command) logger.debug(full_command)
output, _ = check_output(full_command, timeout, shell=True) output, _ = check_output(full_command, timeout, shell=True)
return output return output
def adb_command_background(device, conn, command, adb_server=None): def adb_command_background(device, conn, command, adb_server=None, adb_port=None):
full_command = get_adb_command(device, command, adb_server) full_command = get_adb_command(device, command, adb_server, adb_port)
logger.debug(full_command) logger.debug(full_command)
popen = get_subprocess(full_command, shell=True) popen = get_subprocess(full_command, shell=True)
cmd = PopenBackgroundCommand(conn=conn, popen=popen) cmd = PopenBackgroundCommand(conn=conn, popen=popen)
@ -946,7 +959,7 @@ class LogcatMonitor(object):
if self._logcat_format: if self._logcat_format:
logcat_cmd = "{} -v {}".format(logcat_cmd, quote(self._logcat_format)) logcat_cmd = "{} -v {}".format(logcat_cmd, quote(self._logcat_format))
logcat_cmd = get_adb_command(self.target.conn.device, logcat_cmd, self.target.adb_server) logcat_cmd = get_adb_command(self.target.conn.device, logcat_cmd, self.target.adb_server, self.target.adb_port)
logger.debug('logcat command ="{}"'.format(logcat_cmd)) logger.debug('logcat command ="{}"'.format(logcat_cmd))
self._logcat = pexpect.spawn(logcat_cmd, logfile=self._logfile, encoding='utf-8') self._logcat = pexpect.spawn(logcat_cmd, logfile=self._logfile, encoding='utf-8')