1
0
mirror of https://github.com/ARM-software/devlib.git synced 2024-10-05 18:30:50 +01:00

target: Allow reuse of a connection once the owning thread is terminated

Once a thread exits, the connection instance it was using can be
returned to the pool so it can be reused by another thread.

Since there is no per-thread equivalent to atexit, this is achieved by
returning the connection to the pool after every top-level method call
that uses it directly, so the object the user can get by accessing
Target.conn can change after each call to Target method.
This commit is contained in:
Douglas Raillard 2024-05-09 14:28:19 +01:00 committed by Marc Bonnici
parent 1d6a007bad
commit 165b87f248

View File

@ -103,7 +103,8 @@ def call_conn(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
reentered = self.conn.is_in_use
conn = self.conn
reentered = conn.is_in_use
disconnect = False
try:
# If the connection was already in use we need to use a different
@ -113,8 +114,9 @@ def call_conn(f):
if reentered:
# Shallow copy so we can use another connection instance
_self = copy.copy(self)
_self.conn = _self.get_connection()
assert self.conn is not _self.conn
new_conn = _self.get_connection()
assert conn is not new_conn
_self.conn = new_conn
disconnect = True
else:
_self = self
@ -122,6 +124,13 @@ def call_conn(f):
finally:
if disconnect:
_self.disconnect()
elif not reentered:
# Return the connection to the pool, so if we end up exiting
# the thread the connection can then be reused by another
# thread.
del self.conn
with self._lock:
self._unused_conns.add(conn)
return wrapper
@ -284,6 +293,7 @@ class Target(object):
@tls_property
def _conn(self):
try:
with self._lock:
return self._unused_conns.pop()
except KeyError:
return self.get_connection()
@ -311,6 +321,8 @@ class Target(object):
is_container=False,
max_async=50,
):
self._lock = threading.RLock()
self._async_pool = None
self._async_pool_size = None
self._unused_conns = set()
@ -435,6 +447,7 @@ class Target(object):
ignored.update((
'_async_pool',
'_unused_conns',
'_lock',
))
return {
k: v
@ -450,6 +463,7 @@ class Target(object):
else:
self._async_pool = ThreadPoolExecutor(pool_size)
self._unused_conns = set()
self._lock = threading.RLock()
# connection and initialization
@ -542,6 +556,13 @@ class Target(object):
if pool is not None:
pool.__exit__(None, None, None)
with self._lock:
connections = self._conn.get_all_values()
for conn in itertools.chain(connections, self._unused_conns):
conn.close()
if self._async_pool is not None:
self._async_pool.__exit__(None, None, None)
def __enter__(self):
return self