1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-02-07 13:41:24 +00:00

output_processors/postgresql: Refactor and fix uploading duplication

Previously run level artifacts would be added with a particular job_id,
and updated artifacts would be stored as new objects each time. Refactor
to remove unnecessary instance variables, only provide a job_id when
required and add an update capability for largeobjects to ensure this
does not happen.
This commit is contained in:
Marc Bonnici 2018-10-19 16:28:53 +01:00
parent 576df80379
commit 281eb6adf9

View File

@ -100,8 +100,8 @@ class PostgresqlResultProcessor(OutputProcessor):
# Lists to track which run-related items have already been added # Lists to track which run-related items have already been added
metrics_already_added = [] metrics_already_added = []
artifacts_already_added = [] # Dicts needed so that jobs can look up ids
# Dict needed so that jobs can look up the augmentation_uuid artifacts_already_added = {}
augmentations_already_added = {} augmentations_already_added = {}
# Status bits (flags) # Status bits (flags)
@ -111,20 +111,8 @@ class PostgresqlResultProcessor(OutputProcessor):
super(PostgresqlResultProcessor, self).__init__(*args, **kwargs) super(PostgresqlResultProcessor, self).__init__(*args, **kwargs)
self.conn = None self.conn = None
self.cursor = None self.cursor = None
self.current_lobj = None
self.current_loid = None
self.run_uuid = None self.run_uuid = None
self.current_job_uuid = None
self.target_uuid = None self.target_uuid = None
self.current_artifact_uuid = None
self.current_large_object_uuid = None
self.current_classifier_uuid = None
self.current_metric_uuid = None
self.current_augmentation_uuid = None
self.current_resource_getter_uuid = None
self.current_event_uuid = None
self.current_job_aug_uuid = None
self.current_parameter_uuid = None
def initialize(self, context): def initialize(self, context):
@ -209,12 +197,12 @@ class PostgresqlResultProcessor(OutputProcessor):
''' Run once for each job to upload information that is ''' Run once for each job to upload information that is
updated on a job by job basis. updated on a job by job basis.
''' '''
self.current_job_uuid = uuid.uuid4() job_uuid = uuid.uuid4()
# Create a new job # Create a new job
self.cursor.execute( self.cursor.execute(
self.sql_command['create_job'], self.sql_command['create_job'],
( (
self.current_job_uuid, job_uuid,
self.run_uuid, self.run_uuid,
job_output.status, job_output.status,
job_output.retry, job_output.retry,
@ -236,18 +224,18 @@ class PostgresqlResultProcessor(OutputProcessor):
self.sql_upload_metrics(run_output, record_in_added=True) self.sql_upload_metrics(run_output, record_in_added=True)
self.sql_upload_augmentations(run_output) self.sql_upload_augmentations(run_output)
self.sql_upload_resource_getters(run_output) self.sql_upload_resource_getters(run_output)
self.sql_upload_events(job_output) self.sql_upload_events(job_output, job_uuid=job_uuid)
self.sql_upload_artifacts(job_output) self.sql_upload_artifacts(job_output, job_uuid=job_uuid)
self.sql_upload_metrics(job_output) self.sql_upload_metrics(job_output, job_uuid=job_uuid)
self.sql_upload_job_augmentations(job_output) self.sql_upload_job_augmentations(job_output, job_uuid=job_uuid)
self.sql_upload_parameters( self.sql_upload_parameters(
"workload", "workload",
job_output.spec.workload_parameters, job_output.spec.workload_parameters,
job_specific=True) job_uuid=job_uuid)
self.sql_upload_parameters( self.sql_upload_parameters(
"runtime", "runtime",
job_output.spec.runtime_parameters, job_output.spec.runtime_parameters,
job_specific=True) job_uuid=job_uuid)
self.conn.commit() self.conn.commit()
def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals
@ -256,7 +244,6 @@ class PostgresqlResultProcessor(OutputProcessor):
''' '''
if not self.cursor: # Database did not connect correctly. if not self.cursor: # Database did not connect correctly.
return return
self.current_job_uuid = None
# Update the job statuses following completion of the run # Update the job statuses following completion of the run
for job in run_output.jobs: for job in run_output.jobs:
job_id = job.id job_id = job.id
@ -289,32 +276,31 @@ class PostgresqlResultProcessor(OutputProcessor):
def sql_upload_resource_getters(self, output_object): def sql_upload_resource_getters(self, output_object):
for resource_getter in output_object.run_config.resource_getters: for resource_getter in output_object.run_config.resource_getters:
self.current_resource_getter_uuid = uuid.uuid4() resource_getter_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
self.sql_command['create_resource_getter'], self.sql_command['create_resource_getter'],
( (
self.current_resource_getter_uuid, resource_getter_uuid,
self.run_uuid, self.run_uuid,
resource_getter)) resource_getter))
self.sql_upload_parameters( self.sql_upload_parameters(
'resource_getter', 'resource_getter',
output_object.run_config.resource_getters[resource_getter], output_object.run_config.resource_getters[resource_getter],
owner_id=self.current_resource_getter_uuid, owner_id=resource_getter_uuid)
job_specific=False)
def sql_upload_events(self, output_object): def sql_upload_events(self, output_object, job_uuid=None):
for event in output_object.events: for event in output_object.events:
self.current_event_uuid = uuid.uuid4() event_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
self.sql_command['create_event'], self.sql_command['create_event'],
( (
self.current_event_uuid, event_uuid,
self.run_uuid, self.run_uuid,
self.current_job_uuid, job_uuid,
event.timestamp, event.timestamp,
event.message)) event.message))
def sql_upload_job_augmentations(self, output_object): def sql_upload_job_augmentations(self, output_object, job_uuid=None):
''' This is a table which links the uuids of augmentations to jobs. ''' This is a table which links the uuids of augmentations to jobs.
Note that the augmentations table is prepopulated, leading to the necessity Note that the augmentations table is prepopulated, leading to the necessity
of an augmentaitions_already_added dictionary, which gives us the corresponding of an augmentaitions_already_added dictionary, which gives us the corresponding
@ -325,136 +311,98 @@ class PostgresqlResultProcessor(OutputProcessor):
for augmentation in output_object.spec.augmentations: for augmentation in output_object.spec.augmentations:
if augmentation.startswith('~'): if augmentation.startswith('~'):
continue continue
self.current_augmentation_uuid = self.augmentations_already_added[augmentation] augmentation_uuid = self.augmentations_already_added[augmentation]
self.current_job_aug_uuid = uuid.uuid4() job_aug_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
self.sql_command['create_job_aug'], self.sql_command['create_job_aug'],
( (
self.current_job_aug_uuid, job_aug_uuid,
self.current_job_uuid, job_uuid,
self.current_augmentation_uuid)) augmentation_uuid))
def sql_upload_augmentations(self, output_object): def sql_upload_augmentations(self, output_object):
for augmentation in output_object.augmentations: for augmentation in output_object.augmentations:
if augmentation.startswith('~') or augmentation in self.augmentations_already_added: if augmentation.startswith('~') or augmentation in self.augmentations_already_added:
continue continue
self.current_augmentation_uuid = uuid.uuid4() augmentation_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
self.sql_command['create_augmentation'], self.sql_command['create_augmentation'],
( (
self.current_augmentation_uuid, augmentation_uuid,
self.run_uuid, self.run_uuid,
augmentation)) augmentation))
self.sql_upload_parameters( self.sql_upload_parameters(
'augmentation', 'augmentation',
output_object.run_config.augmentations[augmentation], output_object.run_config.augmentations[augmentation],
owner_id=self.current_augmentation_uuid, owner_id=augmentation_uuid)
job_specific=False) self.augmentations_already_added[augmentation] = augmentation_uuid
self.augmentations_already_added[augmentation] = self.current_augmentation_uuid
def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False): def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None):
for metric in output_object.metrics: for metric in output_object.metrics:
if metric in self.metrics_already_added and check_uniqueness: if metric in self.metrics_already_added and check_uniqueness:
continue continue
self.current_metric_uuid = uuid.uuid4() metric_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
self.sql_command['create_metric'], self.sql_command['create_metric'],
( (
self.current_metric_uuid, metric_uuid,
self.run_uuid, self.run_uuid,
self.current_job_uuid, job_uuid,
metric.name, metric.name,
metric.value, metric.value,
metric.units, metric.units,
metric.lower_is_better)) metric.lower_is_better))
for classifier in metric.classifiers: for classifier in metric.classifiers:
self.current_classifier_uuid = uuid.uuid4() classifier_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
self.sql_command['create_classifier'], self.sql_command['create_classifier'],
( (
self.current_classifier_uuid, classifier_uuid,
None, None,
self.current_metric_uuid, metric_uuid,
classifier, classifier,
metric.classifiers[classifier])) metric.classifiers[classifier]))
if record_in_added: if record_in_added:
self.metrics_already_added.append(metric) self.metrics_already_added.append(metric)
def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False): def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None):
''' Uploads artifacts to the database. ''' Uploads artifacts to the database.
record_in_added will record the artifacts added in artifacts_aleady_added record_in_added will record the artifacts added in artifacts_aleady_added
check_uniqueness will ensure artifacts in artifacts_already_added do not get added again check_uniqueness will ensure artifacts in artifacts_already_added do not get added again
''' '''
for artifact in output_object.artifacts: for artifact in output_object.artifacts:
if artifact in self.artifacts_already_added and check_uniqueness: if artifact in self.artifacts_already_added and check_uniqueness:
self.logger.debug('Skipping uploading {} as already added' .format(artifact))
continue continue
self.current_artifact_uuid = uuid.uuid4()
self.current_lobj = self.conn.lobject()
self.current_loid = self.current_lobj.oid
self.current_large_object_uuid = uuid.uuid4()
with open(os.path.join(output_object.basepath, artifact.path)) as lobj_file:
lobj_data = lobj_file.read()
if len(lobj_data) > 50000000: # Notify if LO inserts larger than 50MB
self.logger.debug(
"Inserting large object of size {}".format(lo_len))
lo_len = self.current_lobj.write(lobj_data)
self.cursor.execute(
self.sql_command['create_large_object'],
(
self.current_large_object_uuid,
self.current_loid))
self.cursor.execute(
self.sql_command['create_artifact'],
(
self.current_artifact_uuid,
self.run_uuid,
self.current_job_uuid,
artifact.name,
self.current_large_object_uuid,
artifact.description,
artifact.kind))
for classifier in artifact.classifiers:
self.current_classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
self.current_classifier_uuid,
self.current_artifact_uuid,
None,
classifier,
artifact.classifiers[classifier]))
if record_in_added:
self.artifacts_already_added.append(artifact)
def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_specific=False): if artifact in self.artifacts_already_added:
self._sql_update_artifact(artifact, output_object)
else:
self._sql_create_artifact(artifact, output_object, record_in_added, job_uuid)
def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_uuid=None):
# Note, currently no augmentation parameters are workload specific, but in the future # Note, currently no augmentation parameters are workload specific, but in the future
# this may change # this may change
run_uuid = self.run_uuid
job_uuid = None # Default initial value
augmentation_id = None augmentation_id = None
resource_getter_id = None resource_getter_id = None
if parameter_type == "workload":
job_uuid = self.current_job_uuid if parameter_type not in ['workload', 'resource_getter', 'augmentation', 'runtime']:
elif parameter_type == "resource_getter":
job_uuid = None
resource_getter_id = owner_id
elif parameter_type == "augmentation":
if job_specific:
job_uuid = self.current_job_uuid
augmentation_id = owner_id
elif parameter_type == "runtime":
job_uuid = self.current_job_uuid
else:
# boot parameters are not yet implemented # boot parameters are not yet implemented
# device parameters are redundant due to the targets table # device parameters are redundant due to the targets table
raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type)) raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type))
if parameter_type == "resource_getter":
resource_getter_id = owner_id
elif parameter_type == "augmentation":
augmentation_id = owner_id
for parameter in parameter_dict: for parameter in parameter_dict:
self.current_parameter_uuid = uuid.uuid4() parameter_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
self.sql_command['create_parameter'], self.sql_command['create_parameter'],
( (
self.current_parameter_uuid, parameter_uuid,
run_uuid, self.run_uuid,
job_uuid, job_uuid,
augmentation_id, augmentation_id,
resource_getter_id, resource_getter_id,
@ -481,3 +429,53 @@ class PostgresqlResultProcessor(OutputProcessor):
cursor.close() cursor.close()
self.conn.commit() self.conn.commit()
self.conn.reset() self.conn.reset()
def _sql_write_lobject(self, source, lobject):
with open(source) as lobj_file:
lobj_data = lobj_file.read()
if len(lobj_data) > 50000000: # Notify if LO inserts larger than 50MB
self.logger.debug("Inserting large object of size {}".format(len(lobj_data)))
lobject.write(lobj_data)
self.conn.commit()
def _sql_update_artifact(self, artifact, output_object):
self.logger.debug('Updating artifact: {}'.format(artifact))
lobj = self.conn.lobject(oid=self.artifacts_already_added[artifact], mode='w')
self._sql_write_lobject(os.path.join(output_object.basepath, artifact.path), lobj)
def _sql_create_artifact(self, artifact, output_object, record_in_added=False, job_uuid=None):
self.logger.debug('Uploading artifact: {}'.format(artifact))
artifact_uuid = uuid.uuid4()
lobj = self.conn.lobject()
loid = lobj.oid
large_object_uuid = uuid.uuid4()
self._sql_write_lobject(os.path.join(output_object.basepath, artifact.path), lobj)
self.cursor.execute(
self.sql_command['create_large_object'],
(
large_object_uuid,
loid))
self.cursor.execute(
self.sql_command['create_artifact'],
(
artifact_uuid,
self.run_uuid,
job_uuid,
artifact.name,
large_object_uuid,
artifact.description,
artifact.kind))
for classifier in artifact.classifiers:
classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
classifier_uuid,
artifact_uuid,
None,
classifier,
artifact.classifiers[classifier]))
if record_in_added:
self.artifacts_already_added[artifact] = loid