mirror of
https://github.com/ARM-software/devlib.git
synced 2025-03-04 09:17:51 +00:00
target: Asyncify Target._prepare_xfer()
_prepare_xfer() deals with all the paths resulting from glob expansion, so it can benefit from async capabilities in order to process multiple files concurrently. Convert the internals to async/await to enable useful map_concurrently()
This commit is contained in:
parent
4431932e0d
commit
1b6c8069bd
@ -663,10 +663,22 @@ class Target(object):
|
||||
transfering multiple sources.
|
||||
"""
|
||||
|
||||
once = functools.lru_cache(maxsize=None)
|
||||
def once(f):
|
||||
cache = dict()
|
||||
|
||||
@functools.wraps(f)
|
||||
async def wrapper(path):
|
||||
try:
|
||||
return cache[path]
|
||||
except KeyError:
|
||||
x = await f(path)
|
||||
cache[path] = x
|
||||
return x
|
||||
|
||||
return wrapper
|
||||
|
||||
_target_cache = {}
|
||||
def target_paths_kind(paths, as_root=False):
|
||||
async def target_paths_kind(paths, as_root=False):
|
||||
def process(x):
|
||||
x = x.strip()
|
||||
if x == 'notexist':
|
||||
@ -686,7 +698,7 @@ class Target(object):
|
||||
)
|
||||
for path in _paths
|
||||
)
|
||||
res = self.execute(cmd, as_root=as_root)
|
||||
res = await self.execute.asyn(cmd, as_root=as_root)
|
||||
_target_cache.update(zip(_paths, map(process, res.split())))
|
||||
|
||||
return [
|
||||
@ -695,7 +707,7 @@ class Target(object):
|
||||
]
|
||||
|
||||
_host_cache = {}
|
||||
def host_paths_kind(paths, as_root=False):
|
||||
async def host_paths_kind(paths, as_root=False):
|
||||
def path_kind(path):
|
||||
if os.path.isdir(path):
|
||||
return 'dir'
|
||||
@ -722,47 +734,55 @@ class Target(object):
|
||||
src_excep = HostError
|
||||
src_path_kind = host_paths_kind
|
||||
|
||||
_dst_mkdir = once(self.makedirs)
|
||||
_dst_mkdir = once(self.makedirs.asyn)
|
||||
dst_path_join = self.path.join
|
||||
dst_paths_kind = target_paths_kind
|
||||
dst_remove_file = once(functools.partial(self.remove, as_root=as_root))
|
||||
|
||||
@once
|
||||
async def dst_remove_file(path):
|
||||
return await self.remove.asyn(path, as_root=as_root)
|
||||
elif action == 'pull':
|
||||
src_excep = TargetStableError
|
||||
src_path_kind = target_paths_kind
|
||||
|
||||
_dst_mkdir = once(functools.partial(os.makedirs, exist_ok=True))
|
||||
@once
|
||||
async def _dst_mkdir(path):
|
||||
return os.makedirs(path, exist_ok=True)
|
||||
dst_path_join = os.path.join
|
||||
dst_paths_kind = host_paths_kind
|
||||
dst_remove_file = once(os.remove)
|
||||
|
||||
@once
|
||||
async def dst_remove_file(path):
|
||||
return os.remove(path)
|
||||
else:
|
||||
raise ValueError('Unknown action "{}"'.format(action))
|
||||
|
||||
# Handle the case where path is None
|
||||
def dst_mkdir(path):
|
||||
async def dst_mkdir(path):
|
||||
if path:
|
||||
_dst_mkdir(path)
|
||||
await _dst_mkdir(path)
|
||||
|
||||
def rewrite_dst(src, dst):
|
||||
async def rewrite_dst(src, dst):
|
||||
new_dst = dst_path_join(dst, os.path.basename(src))
|
||||
|
||||
src_kind, = src_path_kind([src], as_root)
|
||||
src_kind, = await src_path_kind([src], as_root)
|
||||
# Batch both checks to avoid a costly extra execute()
|
||||
dst_kind, new_dst_kind = dst_paths_kind([dst, new_dst], as_root)
|
||||
dst_kind, new_dst_kind = await dst_paths_kind([dst, new_dst], as_root)
|
||||
|
||||
if src_kind == 'file':
|
||||
if dst_kind == 'dir':
|
||||
if new_dst_kind == 'dir':
|
||||
raise IsADirectoryError(new_dst)
|
||||
if new_dst_kind == 'file':
|
||||
dst_remove_file(new_dst)
|
||||
await dst_remove_file(new_dst)
|
||||
return new_dst
|
||||
else:
|
||||
return new_dst
|
||||
elif dst_kind == 'file':
|
||||
dst_remove_file(dst)
|
||||
await dst_remove_file(dst)
|
||||
return dst
|
||||
else:
|
||||
dst_mkdir(os.path.dirname(dst))
|
||||
await dst_mkdir(os.path.dirname(dst))
|
||||
return dst
|
||||
elif src_kind == 'dir':
|
||||
if dst_kind == 'dir':
|
||||
@ -776,7 +796,7 @@ class Target(object):
|
||||
elif dst_kind == 'file':
|
||||
raise FileExistsError(dst_kind)
|
||||
else:
|
||||
dst_mkdir(os.path.dirname(dst))
|
||||
await dst_mkdir(os.path.dirname(dst))
|
||||
return dst
|
||||
else:
|
||||
raise FileNotFoundError(src)
|
||||
@ -785,18 +805,19 @@ class Target(object):
|
||||
if not sources:
|
||||
raise src_excep('No file matching source pattern: {}'.format(pattern))
|
||||
|
||||
if dst_paths_kind([dest]) != ['dir']:
|
||||
if (await dst_paths_kind([dest])) != ['dir']:
|
||||
raise NotADirectoryError('A folder dest is required for multiple matches but destination is a file: {}'.format(dest))
|
||||
|
||||
async def f(src):
|
||||
return await rewrite_dst(src, dest)
|
||||
mapping = await self.async_manager.map_concurrently(f, sources)
|
||||
|
||||
# TODO: since rewrite_dst() will currently return a different path for
|
||||
# each source, it will not bring anything. In order to be useful,
|
||||
# connections need to be able to understand that if the destination is
|
||||
# an empty folder, the source is supposed to be transfered into it with
|
||||
# the same basename.
|
||||
return groupby_value({
|
||||
src: rewrite_dst(src, dest)
|
||||
for src in sources
|
||||
})
|
||||
return groupby_value(mapping)
|
||||
|
||||
@asyn.asyncf
|
||||
@call_conn
|
||||
|
Loading…
x
Reference in New Issue
Block a user