1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-18 12:06:08 +00:00

fw: Replace usage of file locking with atomic writes

To prevent long timeouts occurring during to file locking on
both reads and writes replace locking with
atomic writes.

While this may results in cache entries being overwritten,
the amount of time used in duplicated retrievals will likely
be saved with the prevention of stalls due to waiting to
acquire the file lock.
This commit is contained in:
Marc Bonnici 2020-06-26 11:18:03 +01:00 committed by setrofim
parent 0c1229df8c
commit 684121e2e7
4 changed files with 52 additions and 55 deletions

View File

@ -31,7 +31,7 @@ import requests
from wa import Parameter, settings, __file__ as _base_filepath
from wa.framework.resource import ResourceGetter, SourcePriority, NO_ONE
from wa.framework.exception import ResourceError
from wa.utils.misc import (ensure_directory_exists as _d, lock_file,
from wa.utils.misc import (ensure_directory_exists as _d, atomic_write_path,
ensure_file_directory_exists as _f, sha256, urljoin)
from wa.utils.types import boolean, caseless_string
@ -254,21 +254,22 @@ class Http(ResourceGetter):
url = urljoin(self.url, owner_name, asset['path'])
local_path = _f(os.path.join(settings.dependencies_directory, '__remote',
owner_name, asset['path'].replace('/', os.sep)))
with lock_file(local_path, timeout=5 * 60):
if os.path.exists(local_path) and not self.always_fetch:
local_sha = sha256(local_path)
if local_sha == asset['sha256']:
self.logger.debug('Local SHA256 matches; not re-downloading')
return local_path
self.logger.debug('Downloading {}'.format(url))
response = self.geturl(url, stream=True)
if response.status_code != http.client.OK:
message = 'Could not download asset "{}"; recieved "{} {}"'
self.logger.warning(message.format(url,
response.status_code,
response.reason))
return
with open(local_path, 'wb') as wfh:
if os.path.exists(local_path) and not self.always_fetch:
local_sha = sha256(local_path)
if local_sha == asset['sha256']:
self.logger.debug('Local SHA256 matches; not re-downloading')
return local_path
self.logger.debug('Downloading {}'.format(url))
response = self.geturl(url, stream=True)
if response.status_code != http.client.OK:
message = 'Could not download asset "{}"; received "{} {}"'
self.logger.warning(message.format(url,
response.status_code,
response.reason))
return
with atomic_write_path(local_path) as at_path:
with open(at_path, 'wb') as wfh:
for chunk in response.iter_content(chunk_size=self.chunk_size):
wfh.write(chunk)
return local_path

View File

@ -23,7 +23,7 @@ from devlib.utils.android import AndroidProperties
from wa.framework.configuration.core import settings
from wa.framework.exception import ConfigError
from wa.utils.serializer import read_pod, write_pod, Podable
from wa.utils.misc import lock_file
from wa.utils.misc import atomic_write_path
def cpuinfo_from_pod(pod):
@ -281,15 +281,14 @@ def read_target_info_cache():
os.makedirs(settings.cache_directory)
if not os.path.isfile(settings.target_info_cache_file):
return {}
with lock_file(settings.target_info_cache_file):
return read_pod(settings.target_info_cache_file)
return read_pod(settings.target_info_cache_file)
def write_target_info_cache(cache):
if not os.path.exists(settings.cache_directory):
os.makedirs(settings.cache_directory)
with lock_file(settings.target_info_cache_file):
write_pod(cache, settings.target_info_cache_file)
with atomic_write_path(settings.target_info_cache_file) as at_path:
write_pod(cache, at_path)
def get_target_info_from_cache(system_id, cache=None):

View File

@ -32,7 +32,7 @@ from wa.framework.exception import WorkloadError, ConfigError
from wa.utils.types import ParameterDict, list_or_string, version_tuple
from wa.utils.revent import ReventRecorder
from wa.utils.exec_control import once_per_instance
from wa.utils.misc import lock_file
from wa.utils.misc import atomic_write_path
class Workload(TargetedPlugin):
@ -732,32 +732,31 @@ class PackageHandler(object):
raise WorkloadError(msg)
self.error_msg = None
with lock_file(os.path.join(self.owner.dependencies_directory, self.owner.name)):
if self.prefer_host_package:
self.resolve_package_from_host(context)
if not self.apk_file:
self.resolve_package_from_target()
else:
if self.prefer_host_package:
self.resolve_package_from_host(context)
if not self.apk_file:
self.resolve_package_from_target()
if not self.apk_file:
self.resolve_package_from_host(context)
else:
self.resolve_package_from_target()
if not self.apk_file:
self.resolve_package_from_host(context)
if self.apk_file:
self.apk_info = get_cacheable_apk_info(self.apk_file)
if self.apk_file:
self.apk_info = get_cacheable_apk_info(self.apk_file)
else:
if self.error_msg:
raise WorkloadError(self.error_msg)
else:
if self.error_msg:
raise WorkloadError(self.error_msg)
if self.package_name:
message = 'Package "{package}" not found for workload {name} '\
'on host or target.'
elif self.version:
message = 'No matching package found for workload {name} '\
'(version {version}) on host or target.'
else:
if self.package_name:
message = 'Package "{package}" not found for workload {name} '\
'on host or target.'
elif self.version:
message = 'No matching package found for workload {name} '\
'(version {version}) on host or target.'
else:
message = 'No matching package found for workload {name} on host or target'
raise WorkloadError(message.format(name=self.owner.name, version=self.version,
package=self.package_name))
message = 'No matching package found for workload {name} on host or target'
raise WorkloadError(message.format(name=self.owner.name, version=self.version,
package=self.package_name))
def resolve_package_from_host(self, context):
self.logger.debug('Resolving package on host system')
@ -900,8 +899,8 @@ class PackageHandler(object):
package_info = self.target.get_package_info(package)
apk_name = self._get_package_name(package_info.apk_path)
host_path = os.path.join(self.owner.dependencies_directory, apk_name)
with lock_file(host_path, timeout=self.install_timeout):
self.target.pull(package_info.apk_path, host_path,
with atomic_write_path(host_path) as at_path:
self.target.pull(package_info.apk_path, at_path,
timeout=self.install_timeout)
return host_path

View File

@ -22,7 +22,7 @@ from devlib.utils.android import ApkInfo as _ApkInfo
from wa.framework.configuration import settings
from wa.utils.serializer import read_pod, write_pod, Podable
from wa.utils.types import enum
from wa.utils.misc import lock_file
from wa.utils.misc import atomic_write_path
LogcatLogLevel = enum(['verbose', 'debug', 'info', 'warn', 'error', 'assert'], start=2)
@ -155,9 +155,9 @@ class ApkInfoCache:
if apk_id in self.cache and not overwrite:
raise ValueError('ApkInfo for {} is already in cache.'.format(apk_info.path))
self.cache[apk_id] = apk_info.to_pod()
with lock_file(self.path):
write_pod(self.cache, self.path)
self.last_modified = os.stat(self.path)
with atomic_write_path(self.path) as at_path:
write_pod(self.cache, at_path)
self.last_modified = os.stat(self.path)
def get_info(self, key):
self._update_cache()
@ -171,9 +171,8 @@ class ApkInfoCache:
return
if self.last_modified != os.stat(self.path):
apk_info_cache_logger.debug('Updating cache {}'.format(self.path))
with lock_file(self.path):
self.cache = read_pod(self.path)
self.last_modified = os.stat(self.path)
self.cache = read_pod(self.path)
self.last_modified = os.stat(self.path)
def get_cacheable_apk_info(path):
@ -189,8 +188,7 @@ def get_cacheable_apk_info(path):
if info:
msg = 'Using ApkInfo ({}) from cache'.format(info.package)
else:
with lock_file(path):
info = ApkInfo(path)
info = ApkInfo(path)
apk_info_cache.store(info, apk_id, overwrite=True)
msg = 'Storing ApkInfo ({}) in cache'.format(info.package)
apk_info_cache_logger.debug(msg)