From 281eb6adf917f80ee651491c61c968d3ad71e085 Mon Sep 17 00:00:00 2001 From: Marc Bonnici Date: Fri, 19 Oct 2018 16:28:53 +0100 Subject: [PATCH] output_processors/postgresql: Refactor and fix uploading duplication Previously run level artifacts would be added with a particular job_id, and updated artifacts would be stored as new objects each time. Refactor to remove unnecessary instance variables, only provide a job_id when required and add an update capability for largeobjects to ensure this does not happen. --- wa/output_processors/postgresql.py | 208 ++++++++++++++--------------- 1 file changed, 103 insertions(+), 105 deletions(-) diff --git a/wa/output_processors/postgresql.py b/wa/output_processors/postgresql.py index 2b34be75..16d25667 100644 --- a/wa/output_processors/postgresql.py +++ b/wa/output_processors/postgresql.py @@ -100,8 +100,8 @@ class PostgresqlResultProcessor(OutputProcessor): # Lists to track which run-related items have already been added metrics_already_added = [] - artifacts_already_added = [] - # Dict needed so that jobs can look up the augmentation_uuid + # Dicts needed so that jobs can look up ids + artifacts_already_added = {} augmentations_already_added = {} # Status bits (flags) @@ -111,20 +111,8 @@ class PostgresqlResultProcessor(OutputProcessor): super(PostgresqlResultProcessor, self).__init__(*args, **kwargs) self.conn = None self.cursor = None - self.current_lobj = None - self.current_loid = None self.run_uuid = None - self.current_job_uuid = None self.target_uuid = None - self.current_artifact_uuid = None - self.current_large_object_uuid = None - self.current_classifier_uuid = None - self.current_metric_uuid = None - self.current_augmentation_uuid = None - self.current_resource_getter_uuid = None - self.current_event_uuid = None - self.current_job_aug_uuid = None - self.current_parameter_uuid = None def initialize(self, context): @@ -209,12 +197,12 @@ class PostgresqlResultProcessor(OutputProcessor): ''' Run once for each job to upload information that is updated on a job by job basis. ''' - self.current_job_uuid = uuid.uuid4() + job_uuid = uuid.uuid4() # Create a new job self.cursor.execute( self.sql_command['create_job'], ( - self.current_job_uuid, + job_uuid, self.run_uuid, job_output.status, job_output.retry, @@ -236,18 +224,18 @@ class PostgresqlResultProcessor(OutputProcessor): self.sql_upload_metrics(run_output, record_in_added=True) self.sql_upload_augmentations(run_output) self.sql_upload_resource_getters(run_output) - self.sql_upload_events(job_output) - self.sql_upload_artifacts(job_output) - self.sql_upload_metrics(job_output) - self.sql_upload_job_augmentations(job_output) + self.sql_upload_events(job_output, job_uuid=job_uuid) + self.sql_upload_artifacts(job_output, job_uuid=job_uuid) + self.sql_upload_metrics(job_output, job_uuid=job_uuid) + self.sql_upload_job_augmentations(job_output, job_uuid=job_uuid) self.sql_upload_parameters( "workload", job_output.spec.workload_parameters, - job_specific=True) + job_uuid=job_uuid) self.sql_upload_parameters( "runtime", job_output.spec.runtime_parameters, - job_specific=True) + job_uuid=job_uuid) self.conn.commit() def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals @@ -256,7 +244,6 @@ class PostgresqlResultProcessor(OutputProcessor): ''' if not self.cursor: # Database did not connect correctly. return - self.current_job_uuid = None # Update the job statuses following completion of the run for job in run_output.jobs: job_id = job.id @@ -289,32 +276,31 @@ class PostgresqlResultProcessor(OutputProcessor): def sql_upload_resource_getters(self, output_object): for resource_getter in output_object.run_config.resource_getters: - self.current_resource_getter_uuid = uuid.uuid4() + resource_getter_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_resource_getter'], ( - self.current_resource_getter_uuid, + resource_getter_uuid, self.run_uuid, resource_getter)) self.sql_upload_parameters( 'resource_getter', output_object.run_config.resource_getters[resource_getter], - owner_id=self.current_resource_getter_uuid, - job_specific=False) + owner_id=resource_getter_uuid) - def sql_upload_events(self, output_object): + def sql_upload_events(self, output_object, job_uuid=None): for event in output_object.events: - self.current_event_uuid = uuid.uuid4() + event_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_event'], ( - self.current_event_uuid, + event_uuid, self.run_uuid, - self.current_job_uuid, + job_uuid, event.timestamp, event.message)) - def sql_upload_job_augmentations(self, output_object): + def sql_upload_job_augmentations(self, output_object, job_uuid=None): ''' This is a table which links the uuids of augmentations to jobs. Note that the augmentations table is prepopulated, leading to the necessity of an augmentaitions_already_added dictionary, which gives us the corresponding @@ -325,136 +311,98 @@ class PostgresqlResultProcessor(OutputProcessor): for augmentation in output_object.spec.augmentations: if augmentation.startswith('~'): continue - self.current_augmentation_uuid = self.augmentations_already_added[augmentation] - self.current_job_aug_uuid = uuid.uuid4() + augmentation_uuid = self.augmentations_already_added[augmentation] + job_aug_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_job_aug'], ( - self.current_job_aug_uuid, - self.current_job_uuid, - self.current_augmentation_uuid)) + job_aug_uuid, + job_uuid, + augmentation_uuid)) def sql_upload_augmentations(self, output_object): for augmentation in output_object.augmentations: if augmentation.startswith('~') or augmentation in self.augmentations_already_added: continue - self.current_augmentation_uuid = uuid.uuid4() + augmentation_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_augmentation'], ( - self.current_augmentation_uuid, + augmentation_uuid, self.run_uuid, augmentation)) self.sql_upload_parameters( 'augmentation', output_object.run_config.augmentations[augmentation], - owner_id=self.current_augmentation_uuid, - job_specific=False) - self.augmentations_already_added[augmentation] = self.current_augmentation_uuid + owner_id=augmentation_uuid) + self.augmentations_already_added[augmentation] = augmentation_uuid - def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False): + def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None): for metric in output_object.metrics: if metric in self.metrics_already_added and check_uniqueness: continue - self.current_metric_uuid = uuid.uuid4() + metric_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_metric'], ( - self.current_metric_uuid, + metric_uuid, self.run_uuid, - self.current_job_uuid, + job_uuid, metric.name, metric.value, metric.units, metric.lower_is_better)) for classifier in metric.classifiers: - self.current_classifier_uuid = uuid.uuid4() + classifier_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_classifier'], ( - self.current_classifier_uuid, + classifier_uuid, None, - self.current_metric_uuid, + metric_uuid, classifier, metric.classifiers[classifier])) if record_in_added: self.metrics_already_added.append(metric) - def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False): + def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None): ''' Uploads artifacts to the database. record_in_added will record the artifacts added in artifacts_aleady_added check_uniqueness will ensure artifacts in artifacts_already_added do not get added again ''' for artifact in output_object.artifacts: if artifact in self.artifacts_already_added and check_uniqueness: + self.logger.debug('Skipping uploading {} as already added' .format(artifact)) continue - self.current_artifact_uuid = uuid.uuid4() - self.current_lobj = self.conn.lobject() - self.current_loid = self.current_lobj.oid - self.current_large_object_uuid = uuid.uuid4() - with open(os.path.join(output_object.basepath, artifact.path)) as lobj_file: - lobj_data = lobj_file.read() - if len(lobj_data) > 50000000: # Notify if LO inserts larger than 50MB - self.logger.debug( - "Inserting large object of size {}".format(lo_len)) - lo_len = self.current_lobj.write(lobj_data) - self.cursor.execute( - self.sql_command['create_large_object'], - ( - self.current_large_object_uuid, - self.current_loid)) - self.cursor.execute( - self.sql_command['create_artifact'], - ( - self.current_artifact_uuid, - self.run_uuid, - self.current_job_uuid, - artifact.name, - self.current_large_object_uuid, - artifact.description, - artifact.kind)) - for classifier in artifact.classifiers: - self.current_classifier_uuid = uuid.uuid4() - self.cursor.execute( - self.sql_command['create_classifier'], - ( - self.current_classifier_uuid, - self.current_artifact_uuid, - None, - classifier, - artifact.classifiers[classifier])) - if record_in_added: - self.artifacts_already_added.append(artifact) - def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_specific=False): + if artifact in self.artifacts_already_added: + self._sql_update_artifact(artifact, output_object) + else: + self._sql_create_artifact(artifact, output_object, record_in_added, job_uuid) + + def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_uuid=None): # Note, currently no augmentation parameters are workload specific, but in the future # this may change - run_uuid = self.run_uuid - job_uuid = None # Default initial value augmentation_id = None resource_getter_id = None - if parameter_type == "workload": - job_uuid = self.current_job_uuid - elif parameter_type == "resource_getter": - job_uuid = None - resource_getter_id = owner_id - elif parameter_type == "augmentation": - if job_specific: - job_uuid = self.current_job_uuid - augmentation_id = owner_id - elif parameter_type == "runtime": - job_uuid = self.current_job_uuid - else: + + if parameter_type not in ['workload', 'resource_getter', 'augmentation', 'runtime']: # boot parameters are not yet implemented # device parameters are redundant due to the targets table raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type)) + + if parameter_type == "resource_getter": + resource_getter_id = owner_id + elif parameter_type == "augmentation": + augmentation_id = owner_id + for parameter in parameter_dict: - self.current_parameter_uuid = uuid.uuid4() + parameter_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_parameter'], ( - self.current_parameter_uuid, - run_uuid, + parameter_uuid, + self.run_uuid, job_uuid, augmentation_id, resource_getter_id, @@ -481,3 +429,53 @@ class PostgresqlResultProcessor(OutputProcessor): cursor.close() self.conn.commit() self.conn.reset() + + def _sql_write_lobject(self, source, lobject): + with open(source) as lobj_file: + lobj_data = lobj_file.read() + if len(lobj_data) > 50000000: # Notify if LO inserts larger than 50MB + self.logger.debug("Inserting large object of size {}".format(len(lobj_data))) + lobject.write(lobj_data) + self.conn.commit() + + def _sql_update_artifact(self, artifact, output_object): + self.logger.debug('Updating artifact: {}'.format(artifact)) + lobj = self.conn.lobject(oid=self.artifacts_already_added[artifact], mode='w') + self._sql_write_lobject(os.path.join(output_object.basepath, artifact.path), lobj) + + def _sql_create_artifact(self, artifact, output_object, record_in_added=False, job_uuid=None): + self.logger.debug('Uploading artifact: {}'.format(artifact)) + artifact_uuid = uuid.uuid4() + lobj = self.conn.lobject() + loid = lobj.oid + large_object_uuid = uuid.uuid4() + + self._sql_write_lobject(os.path.join(output_object.basepath, artifact.path), lobj) + + self.cursor.execute( + self.sql_command['create_large_object'], + ( + large_object_uuid, + loid)) + self.cursor.execute( + self.sql_command['create_artifact'], + ( + artifact_uuid, + self.run_uuid, + job_uuid, + artifact.name, + large_object_uuid, + artifact.description, + artifact.kind)) + for classifier in artifact.classifiers: + classifier_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_classifier'], + ( + classifier_uuid, + artifact_uuid, + None, + classifier, + artifact.classifiers[classifier])) + if record_in_added: + self.artifacts_already_added[artifact] = loid