# Copyright 2018 ARM Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import uuid import collections import inspect try: import psycopg2 from psycopg2 import (connect, extras) from psycopg2 import Error as Psycopg2Error except ImportError as e: psycopg2 = None import_error_msg = e.args[0] if e.args else str(e) from devlib.target import KernelVersion, KernelConfig import wa from wa.utils import postgres_convert from wa import OutputProcessor, Parameter, OutputProcessorError from wa.utils.types import level from wa.framework.target.info import CpuInfo class PostgresqlResultProcessor(OutputProcessor): name = 'postgres' description = """ Stores results in a Postgresql database. The structure of this database can easily be understood by examining the postgres_schema.sql file (the schema used to generate it): {} """.format(os.path.join( os.path.dirname(inspect.getfile(wa)), 'commands', 'postgres_schema.sql')) parameters = [ Parameter('username', default='postgres', description=""" This is the username that will be used to connect to the Postgresql database. Note that depending on whether the user has privileges to modify the database (normally only possible on localhost), the user may only be able to append entries. """), Parameter('password', default=None, description=""" The password to be used to connect to the specified database with the specified username. """), Parameter('dbname', default='wa', description=""" Name of the database that will be created or added to. Note, to override this, you can specify a value in your user wa configuration file. """), Parameter('host', kind=str, default='localhost', description=""" The host where the Postgresql server is running. The default is localhost (i.e. the machine that wa is running on). This is useful for complex systems where multiple machines may be executing workloads and uploading their results to a remote, centralised database. """), Parameter('port', kind=str, default='5432', description=""" The port the Postgresql server is running on, on the host. The default is Postgresql's default, so do not change this unless you have modified the default port for Postgresql. """), ] # Commands sql_command = { "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;", "create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);", "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)", "create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)", "create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)", "create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)", "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)", "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", "create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)", "create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)", "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"} # Lists to track which run-related items have already been added metrics_already_added = [] artifacts_already_added = [] # Dict needed so that jobs can look up the augmentation_uuid augmentations_already_added = {} # Status bits (flags) first_job_run = True def __init__(self, *args, **kwargs): super(PostgresqlResultProcessor, self).__init__(*args, **kwargs) self.conn = None self.cursor = None self.current_lobj = None self.current_loid = None self.run_uuid = None self.current_job_uuid = None self.target_uuid = None self.current_artifact_uuid = None self.current_large_object_uuid = None self.current_classifier_uuid = None self.current_metric_uuid = None self.current_augmentation_uuid = None self.current_resource_getter_uuid = None self.current_event_uuid = None self.current_job_aug_uuid = None self.current_parameter_uuid = None def initialize(self, context): if not psycopg2: raise ImportError( 'The psycopg2 module is required for the ' + 'Postgresql Output Processor: {}'.format(import_error_msg)) # N.B. Typecasters are for postgres->python and adapters the opposite self.connect_to_database() self.cursor = self.conn.cursor() # Register the adapters and typecasters for enum types self.cursor.execute("SELECT NULL::status_enum") status_oid = self.cursor.description[0][1] self.cursor.execute("SELECT NULL::param_enum") param_oid = self.cursor.description[0][1] LEVEL = psycopg2.extensions.new_type( (status_oid,), "LEVEL", postgres_convert.cast_level) psycopg2.extensions.register_type(LEVEL) PARAM = psycopg2.extensions.new_type( (param_oid,), "PARAM", postgres_convert.cast_vanilla) psycopg2.extensions.register_type(PARAM) psycopg2.extensions.register_adapter(level, postgres_convert.return_as_is(postgres_convert.adapt_level)) psycopg2.extensions.register_adapter( postgres_convert.ListOfLevel, postgres_convert.adapt_ListOfX(postgres_convert.adapt_level)) psycopg2.extensions.register_adapter(KernelVersion, postgres_convert.adapt_vanilla) psycopg2.extensions.register_adapter( CpuInfo, postgres_convert.adapt_vanilla) psycopg2.extensions.register_adapter( collections.OrderedDict, extras.Json) psycopg2.extensions.register_adapter(dict, extras.Json) psycopg2.extensions.register_adapter( KernelConfig, postgres_convert.create_iterable_adapter(2, explicit_iterate=True)) # Register ready-made UUID type adapter extras.register_uuid() # Insert a run_uuid which will be globally accessible during the run self.run_uuid = uuid.UUID(str(uuid.uuid4())) run_output = context.run_output retry_on_status = postgres_convert.ListOfLevel(run_output.run_config.retry_on_status) self.cursor.execute( self.sql_command['create_run'], ( self.run_uuid, run_output.event_summary, run_output.basepath, run_output.status, run_output.state.timestamp, run_output.info.run_name, run_output.info.project, retry_on_status, run_output.run_config.max_retries, run_output.run_config.bail_on_init_failure, run_output.run_config.allow_phone_home, run_output.info.uuid, run_output.info.start_time, run_output.metadata)) self.target_uuid = uuid.uuid4() target_info = context.target_info self.cursor.execute( self.sql_command['create_target'], ( self.target_uuid, self.run_uuid, target_info.target, target_info.cpus, target_info.os, target_info.os_version, target_info.hostid, target_info.hostname, target_info.abi, target_info.is_rooted, # Important caveat: kernel_version is the name of the column in the Targets table # However, this refers to kernel_version.version, not to kernel_version as a whole target_info.kernel_version.version, target_info.kernel_version.release, target_info.kernel_version.sha1, target_info.kernel_config, target_info.sched_features)) # Commit cursor commands self.conn.commit() def export_job_output(self, job_output, target_info, run_output): # pylint: disable=too-many-branches, too-many-statements, too-many-locals, unused-argument ''' Run once for each job to upload information that is updated on a job by job basis. ''' self.current_job_uuid = uuid.uuid4() # Create a new job self.cursor.execute( self.sql_command['create_job'], ( self.current_job_uuid, self.run_uuid, job_output.status, job_output.retry, job_output.label, job_output.id, job_output.iteration, job_output.spec.workload_name, job_output.metadata)) # Update the run table and run-level parameters self.cursor.execute( self.sql_command['update_run'], ( run_output.event_summary, run_output.status, run_output.state.timestamp, run_output.info.end_time, self.run_uuid)) self.sql_upload_artifacts(run_output, record_in_added=True) self.sql_upload_metrics(run_output, record_in_added=True) self.sql_upload_augmentations(run_output) self.sql_upload_resource_getters(run_output) self.sql_upload_events(job_output) self.sql_upload_artifacts(job_output) self.sql_upload_metrics(job_output) self.sql_upload_job_augmentations(job_output) self.sql_upload_parameters( "workload", job_output.spec.workload_parameters, job_specific=True) self.sql_upload_parameters( "runtime", job_output.spec.runtime_parameters, job_specific=True) self.conn.commit() def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals ''' A final export of the RunOutput that updates existing parameters and uploads ones which are only generated after jobs have run. ''' self.current_job_uuid = None # Update the job statuses following completion of the run for job in run_output.jobs: job_id = job.id job_status = job.status self.cursor.execute( "UPDATE Jobs SET status=%s WHERE job_id=%s and run_oid=%s", ( job_status, job_id, self.run_uuid)) run_uuid = self.run_uuid # Update the run entry after jobs have completed sql_command_update_run = self.sql_command['update_run'] self.cursor.execute( sql_command_update_run, ( run_output.event_summary, run_output.status, run_output.state.timestamp, run_output.info.end_time, run_uuid)) self.sql_upload_events(run_output) self.sql_upload_artifacts(run_output, check_uniqueness=True) self.sql_upload_metrics(run_output, check_uniqueness=True) self.sql_upload_augmentations(run_output) self.conn.commit() # Upload functions for use with both jobs and runs def sql_upload_resource_getters(self, output_object): for resource_getter in output_object.run_config.resource_getters: self.current_resource_getter_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_resource_getter'], ( self.current_resource_getter_uuid, self.run_uuid, resource_getter)) self.sql_upload_parameters( 'resource_getter', output_object.run_config.resource_getters[resource_getter], owner_id=self.current_resource_getter_uuid, job_specific=False) def sql_upload_events(self, output_object): for event in output_object.events: self.current_event_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_event'], ( self.current_event_uuid, self.run_uuid, self.current_job_uuid, event.timestamp, event.message)) def sql_upload_job_augmentations(self, output_object): ''' This is a table which links the uuids of augmentations to jobs. Note that the augmentations table is prepopulated, leading to the necessity of an augmentaitions_already_added dictionary, which gives us the corresponding uuids. Augmentations which are prefixed by ~ are toggled off and not part of the job, therefore not added. ''' for augmentation in output_object.spec.augmentations: if augmentation.startswith('~'): continue self.current_augmentation_uuid = self.augmentations_already_added[augmentation] self.current_job_aug_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_job_aug'], ( self.current_job_aug_uuid, self.current_job_uuid, self.current_augmentation_uuid)) def sql_upload_augmentations(self, output_object): for augmentation in output_object.augmentations: if augmentation.startswith('~') or augmentation in self.augmentations_already_added: continue self.current_augmentation_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_augmentation'], ( self.current_augmentation_uuid, self.run_uuid, augmentation)) self.sql_upload_parameters( 'augmentation', output_object.run_config.augmentations[augmentation], owner_id=self.current_augmentation_uuid, job_specific=False) self.augmentations_already_added[augmentation] = self.current_augmentation_uuid def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False): for metric in output_object.metrics: if metric in self.metrics_already_added and check_uniqueness: continue self.current_metric_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_metric'], ( self.current_metric_uuid, self.run_uuid, self.current_job_uuid, metric.name, metric.value, metric.units, metric.lower_is_better)) for classifier in metric.classifiers: self.current_classifier_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_classifier'], ( self.current_classifier_uuid, None, self.current_metric_uuid, classifier, metric.classifiers[classifier])) if record_in_added: self.metrics_already_added.append(metric) def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False): ''' Uploads artifacts to the database. record_in_added will record the artifacts added in artifacts_aleady_added check_uniqueness will ensure artifacts in artifacts_already_added do not get added again ''' for artifact in output_object.artifacts: if artifact in self.artifacts_already_added and check_uniqueness: continue self.current_artifact_uuid = uuid.uuid4() self.current_lobj = self.conn.lobject() self.current_loid = self.current_lobj.oid self.current_large_object_uuid = uuid.uuid4() with open(os.path.join(output_object.basepath, artifact.path)) as lobj_file: lobj_data = lobj_file.read() lo_len = self.current_lobj.write(lobj_data) if lo_len > 50000000: # Notify if LO inserts larger than 50MB self.logger.debug( "Inserting large object of size {}".format(lo_len)) self.cursor.execute( self.sql_command['create_large_object'], ( self.current_large_object_uuid, self.current_loid)) self.cursor.execute( self.sql_command['create_artifact'], ( self.current_artifact_uuid, self.run_uuid, self.current_job_uuid, artifact.name, self.current_large_object_uuid, artifact.description, artifact.kind)) for classifier in artifact.classifiers: self.current_classifier_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_classifier'], ( self.current_classifier_uuid, self.current_artifact_uuid, None, classifier, artifact.classifiers[classifier])) if record_in_added: self.artifacts_already_added.append(artifact) def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_specific=False): # Note, currently no augmentation parameters are workload specific, but in the future # this may change run_uuid = self.run_uuid job_uuid = None # Default initial value augmentation_id = None resource_getter_id = None if parameter_type == "workload": job_uuid = self.current_job_uuid elif parameter_type == "resource_getter": job_uuid = None resource_getter_id = owner_id elif parameter_type == "augmentation": if job_specific: job_uuid = self.current_job_uuid augmentation_id = owner_id elif parameter_type == "runtime": job_uuid = self.current_job_uuid else: # boot parameters are not yet implemented # device parameters are redundant due to the targets table raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type)) for parameter in parameter_dict: self.current_parameter_uuid = uuid.uuid4() self.cursor.execute( self.sql_command['create_parameter'], ( self.current_parameter_uuid, run_uuid, job_uuid, augmentation_id, resource_getter_id, parameter, str(parameter_dict[parameter]), str(type(parameter_dict[parameter])), parameter_type)) def connect_to_database(self): dsn = "dbname={} user={} password={} host={} port={}".format( self.dbname, self.username, self.password, self.host, self.port) try: self.conn = connect(dsn=dsn) except Psycopg2Error as e: raise OutputProcessorError( "Database error, if the database doesn't exist, " + "please use 'wa create database' to create the database: {}".format(e)) def execute_sql_line_by_line(self, sql): cursor = self.conn.cursor() for line in sql.replace('\n', "").replace(";", ";\n").split("\n"): if line and not line.startswith('--'): cursor.execute(line) cursor.close() self.conn.commit() self.conn.reset()