From 684121e2e71a3dc61121d9f1a48123b20978b469 Mon Sep 17 00:00:00 2001 From: Marc Bonnici Date: Fri, 26 Jun 2020 11:18:03 +0100 Subject: [PATCH] fw: Replace usage of file locking with atomic writes To prevent long timeouts occurring during to file locking on both reads and writes replace locking with atomic writes. While this may results in cache entries being overwritten, the amount of time used in duplicated retrievals will likely be saved with the prevention of stalls due to waiting to acquire the file lock. --- wa/framework/getters.py | 33 +++++++++++++------------ wa/framework/target/info.py | 9 +++---- wa/framework/workload.py | 49 ++++++++++++++++++------------------- wa/utils/android.py | 16 ++++++------ 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/wa/framework/getters.py b/wa/framework/getters.py index 875b3855..56cfb121 100644 --- a/wa/framework/getters.py +++ b/wa/framework/getters.py @@ -31,7 +31,7 @@ import requests from wa import Parameter, settings, __file__ as _base_filepath from wa.framework.resource import ResourceGetter, SourcePriority, NO_ONE from wa.framework.exception import ResourceError -from wa.utils.misc import (ensure_directory_exists as _d, lock_file, +from wa.utils.misc import (ensure_directory_exists as _d, atomic_write_path, ensure_file_directory_exists as _f, sha256, urljoin) from wa.utils.types import boolean, caseless_string @@ -254,21 +254,22 @@ class Http(ResourceGetter): url = urljoin(self.url, owner_name, asset['path']) local_path = _f(os.path.join(settings.dependencies_directory, '__remote', owner_name, asset['path'].replace('/', os.sep))) - with lock_file(local_path, timeout=5 * 60): - if os.path.exists(local_path) and not self.always_fetch: - local_sha = sha256(local_path) - if local_sha == asset['sha256']: - self.logger.debug('Local SHA256 matches; not re-downloading') - return local_path - self.logger.debug('Downloading {}'.format(url)) - response = self.geturl(url, stream=True) - if response.status_code != http.client.OK: - message = 'Could not download asset "{}"; recieved "{} {}"' - self.logger.warning(message.format(url, - response.status_code, - response.reason)) - return - with open(local_path, 'wb') as wfh: + + if os.path.exists(local_path) and not self.always_fetch: + local_sha = sha256(local_path) + if local_sha == asset['sha256']: + self.logger.debug('Local SHA256 matches; not re-downloading') + return local_path + self.logger.debug('Downloading {}'.format(url)) + response = self.geturl(url, stream=True) + if response.status_code != http.client.OK: + message = 'Could not download asset "{}"; received "{} {}"' + self.logger.warning(message.format(url, + response.status_code, + response.reason)) + return + with atomic_write_path(local_path) as at_path: + with open(at_path, 'wb') as wfh: for chunk in response.iter_content(chunk_size=self.chunk_size): wfh.write(chunk) return local_path diff --git a/wa/framework/target/info.py b/wa/framework/target/info.py index 7bf4ddc3..a40d6a76 100644 --- a/wa/framework/target/info.py +++ b/wa/framework/target/info.py @@ -23,7 +23,7 @@ from devlib.utils.android import AndroidProperties from wa.framework.configuration.core import settings from wa.framework.exception import ConfigError from wa.utils.serializer import read_pod, write_pod, Podable -from wa.utils.misc import lock_file +from wa.utils.misc import atomic_write_path def cpuinfo_from_pod(pod): @@ -281,15 +281,14 @@ def read_target_info_cache(): os.makedirs(settings.cache_directory) if not os.path.isfile(settings.target_info_cache_file): return {} - with lock_file(settings.target_info_cache_file): - return read_pod(settings.target_info_cache_file) + return read_pod(settings.target_info_cache_file) def write_target_info_cache(cache): if not os.path.exists(settings.cache_directory): os.makedirs(settings.cache_directory) - with lock_file(settings.target_info_cache_file): - write_pod(cache, settings.target_info_cache_file) + with atomic_write_path(settings.target_info_cache_file) as at_path: + write_pod(cache, at_path) def get_target_info_from_cache(system_id, cache=None): diff --git a/wa/framework/workload.py b/wa/framework/workload.py index feaf63a9..35287de2 100644 --- a/wa/framework/workload.py +++ b/wa/framework/workload.py @@ -32,7 +32,7 @@ from wa.framework.exception import WorkloadError, ConfigError from wa.utils.types import ParameterDict, list_or_string, version_tuple from wa.utils.revent import ReventRecorder from wa.utils.exec_control import once_per_instance -from wa.utils.misc import lock_file +from wa.utils.misc import atomic_write_path class Workload(TargetedPlugin): @@ -732,32 +732,31 @@ class PackageHandler(object): raise WorkloadError(msg) self.error_msg = None - with lock_file(os.path.join(self.owner.dependencies_directory, self.owner.name)): - if self.prefer_host_package: - self.resolve_package_from_host(context) - if not self.apk_file: - self.resolve_package_from_target() - else: + if self.prefer_host_package: + self.resolve_package_from_host(context) + if not self.apk_file: self.resolve_package_from_target() - if not self.apk_file: - self.resolve_package_from_host(context) + else: + self.resolve_package_from_target() + if not self.apk_file: + self.resolve_package_from_host(context) - if self.apk_file: - self.apk_info = get_cacheable_apk_info(self.apk_file) + if self.apk_file: + self.apk_info = get_cacheable_apk_info(self.apk_file) + else: + if self.error_msg: + raise WorkloadError(self.error_msg) else: - if self.error_msg: - raise WorkloadError(self.error_msg) + if self.package_name: + message = 'Package "{package}" not found for workload {name} '\ + 'on host or target.' + elif self.version: + message = 'No matching package found for workload {name} '\ + '(version {version}) on host or target.' else: - if self.package_name: - message = 'Package "{package}" not found for workload {name} '\ - 'on host or target.' - elif self.version: - message = 'No matching package found for workload {name} '\ - '(version {version}) on host or target.' - else: - message = 'No matching package found for workload {name} on host or target' - raise WorkloadError(message.format(name=self.owner.name, version=self.version, - package=self.package_name)) + message = 'No matching package found for workload {name} on host or target' + raise WorkloadError(message.format(name=self.owner.name, version=self.version, + package=self.package_name)) def resolve_package_from_host(self, context): self.logger.debug('Resolving package on host system') @@ -900,8 +899,8 @@ class PackageHandler(object): package_info = self.target.get_package_info(package) apk_name = self._get_package_name(package_info.apk_path) host_path = os.path.join(self.owner.dependencies_directory, apk_name) - with lock_file(host_path, timeout=self.install_timeout): - self.target.pull(package_info.apk_path, host_path, + with atomic_write_path(host_path) as at_path: + self.target.pull(package_info.apk_path, at_path, timeout=self.install_timeout) return host_path diff --git a/wa/utils/android.py b/wa/utils/android.py index b21f99f5..7c70f3aa 100644 --- a/wa/utils/android.py +++ b/wa/utils/android.py @@ -22,7 +22,7 @@ from devlib.utils.android import ApkInfo as _ApkInfo from wa.framework.configuration import settings from wa.utils.serializer import read_pod, write_pod, Podable from wa.utils.types import enum -from wa.utils.misc import lock_file +from wa.utils.misc import atomic_write_path LogcatLogLevel = enum(['verbose', 'debug', 'info', 'warn', 'error', 'assert'], start=2) @@ -155,9 +155,9 @@ class ApkInfoCache: if apk_id in self.cache and not overwrite: raise ValueError('ApkInfo for {} is already in cache.'.format(apk_info.path)) self.cache[apk_id] = apk_info.to_pod() - with lock_file(self.path): - write_pod(self.cache, self.path) - self.last_modified = os.stat(self.path) + with atomic_write_path(self.path) as at_path: + write_pod(self.cache, at_path) + self.last_modified = os.stat(self.path) def get_info(self, key): self._update_cache() @@ -171,9 +171,8 @@ class ApkInfoCache: return if self.last_modified != os.stat(self.path): apk_info_cache_logger.debug('Updating cache {}'.format(self.path)) - with lock_file(self.path): - self.cache = read_pod(self.path) - self.last_modified = os.stat(self.path) + self.cache = read_pod(self.path) + self.last_modified = os.stat(self.path) def get_cacheable_apk_info(path): @@ -189,8 +188,7 @@ def get_cacheable_apk_info(path): if info: msg = 'Using ApkInfo ({}) from cache'.format(info.package) else: - with lock_file(path): - info = ApkInfo(path) + info = ApkInfo(path) apk_info_cache.store(info, apk_id, overwrite=True) msg = 'Storing ApkInfo ({}) in cache'.format(info.package) apk_info_cache_logger.debug(msg)