From 1b6c8069bd7a902ed0d39be632e8bde9ec37d057 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Wed, 20 Nov 2024 17:17:07 +0000 Subject: [PATCH] target: Asyncify Target._prepare_xfer() _prepare_xfer() deals with all the paths resulting from glob expansion, so it can benefit from async capabilities in order to process multiple files concurrently. Convert the internals to async/await to enable useful map_concurrently() --- devlib/target.py | 65 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/devlib/target.py b/devlib/target.py index f1005d3..adceac9 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -663,10 +663,22 @@ class Target(object): transfering multiple sources. """ - once = functools.lru_cache(maxsize=None) + def once(f): + cache = dict() + + @functools.wraps(f) + async def wrapper(path): + try: + return cache[path] + except KeyError: + x = await f(path) + cache[path] = x + return x + + return wrapper _target_cache = {} - def target_paths_kind(paths, as_root=False): + async def target_paths_kind(paths, as_root=False): def process(x): x = x.strip() if x == 'notexist': @@ -686,7 +698,7 @@ class Target(object): ) for path in _paths ) - res = self.execute(cmd, as_root=as_root) + res = await self.execute.asyn(cmd, as_root=as_root) _target_cache.update(zip(_paths, map(process, res.split()))) return [ @@ -695,7 +707,7 @@ class Target(object): ] _host_cache = {} - def host_paths_kind(paths, as_root=False): + async def host_paths_kind(paths, as_root=False): def path_kind(path): if os.path.isdir(path): return 'dir' @@ -722,47 +734,55 @@ class Target(object): src_excep = HostError src_path_kind = host_paths_kind - _dst_mkdir = once(self.makedirs) + _dst_mkdir = once(self.makedirs.asyn) dst_path_join = self.path.join dst_paths_kind = target_paths_kind - dst_remove_file = once(functools.partial(self.remove, as_root=as_root)) + + @once + async def dst_remove_file(path): + return await self.remove.asyn(path, as_root=as_root) elif action == 'pull': src_excep = TargetStableError src_path_kind = target_paths_kind - _dst_mkdir = once(functools.partial(os.makedirs, exist_ok=True)) + @once + async def _dst_mkdir(path): + return os.makedirs(path, exist_ok=True) dst_path_join = os.path.join dst_paths_kind = host_paths_kind - dst_remove_file = once(os.remove) + + @once + async def dst_remove_file(path): + return os.remove(path) else: raise ValueError('Unknown action "{}"'.format(action)) # Handle the case where path is None - def dst_mkdir(path): + async def dst_mkdir(path): if path: - _dst_mkdir(path) + await _dst_mkdir(path) - def rewrite_dst(src, dst): + async def rewrite_dst(src, dst): new_dst = dst_path_join(dst, os.path.basename(src)) - src_kind, = src_path_kind([src], as_root) + src_kind, = await src_path_kind([src], as_root) # Batch both checks to avoid a costly extra execute() - dst_kind, new_dst_kind = dst_paths_kind([dst, new_dst], as_root) + dst_kind, new_dst_kind = await dst_paths_kind([dst, new_dst], as_root) if src_kind == 'file': if dst_kind == 'dir': if new_dst_kind == 'dir': raise IsADirectoryError(new_dst) if new_dst_kind == 'file': - dst_remove_file(new_dst) + await dst_remove_file(new_dst) return new_dst else: return new_dst elif dst_kind == 'file': - dst_remove_file(dst) + await dst_remove_file(dst) return dst else: - dst_mkdir(os.path.dirname(dst)) + await dst_mkdir(os.path.dirname(dst)) return dst elif src_kind == 'dir': if dst_kind == 'dir': @@ -776,7 +796,7 @@ class Target(object): elif dst_kind == 'file': raise FileExistsError(dst_kind) else: - dst_mkdir(os.path.dirname(dst)) + await dst_mkdir(os.path.dirname(dst)) return dst else: raise FileNotFoundError(src) @@ -785,18 +805,19 @@ class Target(object): if not sources: raise src_excep('No file matching source pattern: {}'.format(pattern)) - if dst_paths_kind([dest]) != ['dir']: + if (await dst_paths_kind([dest])) != ['dir']: raise NotADirectoryError('A folder dest is required for multiple matches but destination is a file: {}'.format(dest)) + async def f(src): + return await rewrite_dst(src, dest) + mapping = await self.async_manager.map_concurrently(f, sources) + # TODO: since rewrite_dst() will currently return a different path for # each source, it will not bring anything. In order to be useful, # connections need to be able to understand that if the destination is # an empty folder, the source is supposed to be transfered into it with # the same basename. - return groupby_value({ - src: rewrite_dst(src, dest) - for src in sources - }) + return groupby_value(mapping) @asyn.asyncf @call_conn