diff --git a/wa/output_processors/postgresql.py b/wa/output_processors/postgresql.py new file mode 100644 index 00000000..5218e7b7 --- /dev/null +++ b/wa/output_processors/postgresql.py @@ -0,0 +1,481 @@ +# Copyright 2018 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import uuid +import collections +import inspect + +try: + import psycopg2 + from psycopg2 import (connect, extras) + from psycopg2 import Error as Psycopg2Error +except ImportError as e: + psycopg2 = None + import_error_msg = e.args[0] if e.args else str(e) +from devlib.target import KernelVersion, KernelConfig + +import wa +from wa.utils import postgres_convert +from wa import OutputProcessor, Parameter, OutputProcessorError +from wa.utils.types import level +from wa.framework.target.info import CpuInfo + + +class PostgresqlResultProcessor(OutputProcessor): + + name = 'postgres' + description = """ + Stores results in a Postgresql database. + + The structure of this database can easily be understood by examining + the postgres_schema.sql file (the schema used to generate it): + {} + """.format(os.path.join( + os.path.dirname(inspect.getfile(wa)), + 'commands', + 'postgres_schema.sql')) + parameters = [ + Parameter('username', default='postgres', + description=""" + This is the username that will be used to connect to the + Postgresql database. Note that depending on whether the user + has privileges to modify the database (normally only possible + on localhost), the user may only be able to append entries. + """), + Parameter('password', default=None, + description=""" + The password to be used to connect to the specified database + with the specified username. + """), + Parameter('dbname', default='wa', + description=""" + Name of the database that will be created or added to. Note, + to override this, you can specify a value in your user + wa configuration file. + """), + Parameter('host', kind=str, default='localhost', + description=""" + The host where the Postgresql server is running. The default + is localhost (i.e. the machine that wa is running on). + This is useful for complex systems where multiple machines + may be executing workloads and uploading their results to + a remote, centralised database. + """), + Parameter('port', kind=str, default='5432', + description=""" + The port the Postgresql server is running on, on the host. + The default is Postgresql's default, so do not change this + unless you have modified the default port for Postgresql. + """), + ] + + # Commands + sql_command = { + "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;", + "create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);", + "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)", + "create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)", + "create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)", + "create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)", + "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)", + "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)", + "create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)", + "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"} + + # Lists to track which run-related items have already been added + metrics_already_added = [] + artifacts_already_added = [] + # Dict needed so that jobs can look up the augmentation_uuid + augmentations_already_added = {} + + # Status bits (flags) + first_job_run = True + + def __init__(self, *args, **kwargs): + super(PostgresqlResultProcessor, self).__init__(*args, **kwargs) + self.conn = None + self.cursor = None + self.current_lobj = None + self.current_loid = None + self.run_uuid = None + self.current_job_uuid = None + self.target_uuid = None + self.current_artifact_uuid = None + self.current_large_object_uuid = None + self.current_classifier_uuid = None + self.current_metric_uuid = None + self.current_augmentation_uuid = None + self.current_resource_getter_uuid = None + self.current_event_uuid = None + self.current_job_aug_uuid = None + self.current_parameter_uuid = None + + def initialize(self, context): + + if not psycopg2: + raise ImportError( + 'The psycopg2 module is required for the ' + + 'Postgresql Output Processor: {}'.format(import_error_msg)) + # N.B. Typecasters are for postgres->python and adapters the opposite + self.connect_to_database() + self.cursor = self.conn.cursor() + # Register the adapters and typecasters for enum types + self.cursor.execute("SELECT NULL::status_enum") + status_oid = self.cursor.description[0][1] + self.cursor.execute("SELECT NULL::param_enum") + param_oid = self.cursor.description[0][1] + LEVEL = psycopg2.extensions.new_type( + (status_oid,), "LEVEL", postgres_convert.cast_level) + psycopg2.extensions.register_type(LEVEL) + PARAM = psycopg2.extensions.new_type( + (param_oid,), "PARAM", postgres_convert.cast_vanilla) + psycopg2.extensions.register_type(PARAM) + psycopg2.extensions.register_adapter(level, postgres_convert.return_as_is(postgres_convert.adapt_level)) + psycopg2.extensions.register_adapter( + postgres_convert.ListOfLevel, postgres_convert.adapt_ListOfX(postgres_convert.adapt_level)) + psycopg2.extensions.register_adapter(KernelVersion, postgres_convert.adapt_vanilla) + psycopg2.extensions.register_adapter( + CpuInfo, postgres_convert.adapt_vanilla) + psycopg2.extensions.register_adapter( + collections.OrderedDict, extras.Json) + psycopg2.extensions.register_adapter(dict, extras.Json) + psycopg2.extensions.register_adapter( + KernelConfig, postgres_convert.create_iterable_adapter(2, explicit_iterate=True)) + # Register ready-made UUID type adapter + extras.register_uuid() + # Insert a run_uuid which will be globally accessible during the run + self.run_uuid = uuid.UUID(str(uuid.uuid4())) + run_output = context.run_output + retry_on_status = postgres_convert.ListOfLevel(run_output.run_config.retry_on_status) + self.cursor.execute( + self.sql_command['create_run'], + ( + self.run_uuid, + run_output.event_summary, + run_output.basepath, + run_output.status, + run_output.state.timestamp, + run_output.info.run_name, + run_output.info.project, + retry_on_status, + run_output.run_config.max_retries, + run_output.run_config.bail_on_init_failure, + run_output.run_config.allow_phone_home, + run_output.info.uuid, + run_output.info.start_time, + run_output.metadata)) + self.target_uuid = uuid.uuid4() + target_info = context.target_info + self.cursor.execute( + self.sql_command['create_target'], + ( + self.target_uuid, + self.run_uuid, + target_info.target, + target_info.cpus, + target_info.os, + target_info.os_version, + target_info.hostid, + target_info.hostname, + target_info.abi, + target_info.is_rooted, + # Important caveat: kernel_version is the name of the column in the Targets table + # However, this refers to kernel_version.version, not to kernel_version as a whole + target_info.kernel_version.version, + target_info.kernel_version.release, + target_info.kernel_version.sha1, + target_info.kernel_config, + target_info.sched_features)) + # Commit cursor commands + self.conn.commit() + + def export_job_output(self, job_output, target_info, run_output): # pylint: disable=too-many-branches, too-many-statements, too-many-locals, unused-argument + ''' Run once for each job to upload information that is + updated on a job by job basis. + ''' + self.current_job_uuid = uuid.uuid4() + # Create a new job + self.cursor.execute( + self.sql_command['create_job'], + ( + self.current_job_uuid, + self.run_uuid, + job_output.status, + job_output.retry, + job_output.label, + job_output.id, + job_output.iteration, + job_output.spec.workload_name, + job_output.metadata)) + # Update the run table and run-level parameters + self.cursor.execute( + self.sql_command['update_run'], + ( + run_output.event_summary, + run_output.status, + run_output.state.timestamp, + run_output.info.end_time, + self.run_uuid)) + self.sql_upload_artifacts(run_output, record_in_added=True) + self.sql_upload_metrics(run_output, record_in_added=True) + self.sql_upload_augmentations(run_output) + self.sql_upload_resource_getters(run_output) + self.sql_upload_events(job_output) + self.sql_upload_artifacts(job_output) + self.sql_upload_metrics(job_output) + self.sql_upload_job_augmentations(job_output) + self.sql_upload_parameters( + "workload", + job_output.spec.workload_parameters, + job_specific=True) + self.sql_upload_parameters( + "runtime", + job_output.spec.runtime_parameters, + job_specific=True) + self.conn.commit() + + def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals + ''' A final export of the RunOutput that updates existing parameters + and uploads ones which are only generated after jobs have run. + ''' + self.current_job_uuid = None + # Update the job statuses following completion of the run + for job in run_output.jobs: + job_id = job.id + job_status = job.status + self.cursor.execute( + "UPDATE Jobs SET status=%s WHERE job_id=%s and run_oid=%s", + ( + job_status, + job_id, + self.run_uuid)) + + run_uuid = self.run_uuid + # Update the run entry after jobs have completed + sql_command_update_run = self.sql_command['update_run'] + self.cursor.execute( + sql_command_update_run, + ( + run_output.event_summary, + run_output.status, + run_output.state.timestamp, + run_output.info.end_time, + run_uuid)) + self.sql_upload_events(run_output) + self.sql_upload_artifacts(run_output, check_uniqueness=True) + self.sql_upload_metrics(run_output, check_uniqueness=True) + self.sql_upload_augmentations(run_output) + self.conn.commit() + + # Upload functions for use with both jobs and runs + + def sql_upload_resource_getters(self, output_object): + for resource_getter in output_object.run_config.resource_getters: + self.current_resource_getter_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_resource_getter'], + ( + self.current_resource_getter_uuid, + self.run_uuid, + resource_getter)) + self.sql_upload_parameters( + 'resource_getter', + output_object.run_config.resource_getters[resource_getter], + owner_id=self.current_resource_getter_uuid, + job_specific=False) + + def sql_upload_events(self, output_object): + for event in output_object.events: + self.current_event_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_event'], + ( + self.current_event_uuid, + self.run_uuid, + self.current_job_uuid, + event.timestamp, + event.message)) + + def sql_upload_job_augmentations(self, output_object): + ''' This is a table which links the uuids of augmentations to jobs. + Note that the augmentations table is prepopulated, leading to the necessity + of an augmentaitions_already_added dictionary, which gives us the corresponding + uuids. + Augmentations which are prefixed by ~ are toggled off and not part of the job, + therefore not added. + ''' + for augmentation in output_object.spec.augmentations: + if augmentation.startswith('~'): + continue + self.current_augmentation_uuid = self.augmentations_already_added[augmentation] + self.current_job_aug_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_job_aug'], + ( + self.current_job_aug_uuid, + self.current_job_uuid, + self.current_augmentation_uuid)) + + def sql_upload_augmentations(self, output_object): + for augmentation in output_object.augmentations: + if augmentation.startswith('~') or augmentation in self.augmentations_already_added: + continue + self.current_augmentation_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_augmentation'], + ( + self.current_augmentation_uuid, + self.run_uuid, + augmentation)) + self.sql_upload_parameters( + 'augmentation', + output_object.run_config.augmentations[augmentation], + owner_id=self.current_augmentation_uuid, + job_specific=False) + self.augmentations_already_added[augmentation] = self.current_augmentation_uuid + + def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False): + for metric in output_object.metrics: + if metric in self.metrics_already_added and check_uniqueness: + continue + self.current_metric_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_metric'], + ( + self.current_metric_uuid, + self.run_uuid, + self.current_job_uuid, + metric.name, + metric.value, + metric.units, + metric.lower_is_better)) + for classifier in metric.classifiers: + self.current_classifier_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_classifier'], + ( + self.current_classifier_uuid, + None, + self.current_metric_uuid, + classifier, + metric.classifiers[classifier])) + if record_in_added: + self.metrics_already_added.append(metric) + + def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False): + ''' Uploads artifacts to the database. + record_in_added will record the artifacts added in artifacts_aleady_added + check_uniqueness will ensure artifacts in artifacts_already_added do not get added again + ''' + for artifact in output_object.artifacts: + if artifact in self.artifacts_already_added and check_uniqueness: + continue + self.current_artifact_uuid = uuid.uuid4() + self.current_lobj = self.conn.lobject() + self.current_loid = self.current_lobj.oid + self.current_large_object_uuid = uuid.uuid4() + with open(os.path.join(output_object.basepath, artifact.path)) as lobj_file: + lobj_data = lobj_file.read() + lo_len = self.current_lobj.write(lobj_data) + if lo_len > 50000000: # Notify if LO inserts larger than 50MB + self.logger.debug( + "Inserting large object of size {}".format(lo_len)) + self.cursor.execute( + self.sql_command['create_large_object'], + ( + self.current_large_object_uuid, + self.current_loid)) + self.cursor.execute( + self.sql_command['create_artifact'], + ( + self.current_artifact_uuid, + self.run_uuid, + self.current_job_uuid, + artifact.name, + self.current_large_object_uuid, + artifact.description, + artifact.kind)) + for classifier in artifact.classifiers: + self.current_classifier_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_classifier'], + ( + self.current_classifier_uuid, + self.current_artifact_uuid, + None, + classifier, + artifact.classifiers[classifier])) + if record_in_added: + self.artifacts_already_added.append(artifact) + + def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_specific=False): + # Note, currently no augmentation parameters are workload specific, but in the future + # this may change + run_uuid = self.run_uuid + job_uuid = None # Default initial value + augmentation_id = None + resource_getter_id = None + if parameter_type == "workload": + job_uuid = self.current_job_uuid + elif parameter_type == "resource_getter": + job_uuid = None + resource_getter_id = owner_id + elif parameter_type == "augmentation": + if job_specific: + job_uuid = self.current_job_uuid + augmentation_id = owner_id + elif parameter_type == "runtime": + job_uuid = self.current_job_uuid + else: + # boot parameters are not yet implemented + # device parameters are redundant due to the targets table + raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type)) + for parameter in parameter_dict: + self.current_parameter_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_parameter'], + ( + self.current_parameter_uuid, + run_uuid, + job_uuid, + augmentation_id, + resource_getter_id, + parameter, + str(parameter_dict[parameter]), + str(type(parameter_dict[parameter])), + parameter_type)) + + def connect_to_database(self): + dsn = "dbname={} user={} password={} host={} port={}".format( + self.dbname, self.username, self.password, self.host, self.port) + try: + self.conn = connect(dsn=dsn) + except Psycopg2Error as e: + raise OutputProcessorError( + "Database error, if the database doesn't exist, " + + "please use 'wa create database' to create the database: {}".format(e)) + + def execute_sql_line_by_line(self, sql): + cursor = self.conn.cursor() + for line in sql.replace('\n', "").replace(";", ";\n").split("\n"): + if line and not line.startswith('--'): + cursor.execute(line) + cursor.close() + self.conn.commit() + self.conn.reset() diff --git a/wa/utils/postgres_convert.py b/wa/utils/postgres_convert.py new file mode 100644 index 00000000..0f25b612 --- /dev/null +++ b/wa/utils/postgres_convert.py @@ -0,0 +1,219 @@ +# Copyright 2018 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module contains additional casting and adaptation functions for several +different datatypes and metadata types for use with the psycopg2 module. The +casting functions will transform Postgresql data types into Python objects, and +the adapters the reverse. They are named this way according to the psycopg2 +conventions. + +For more information about the available adapters and casters in the standard +psycopg2 module, please see: + +http://initd.org/psycopg/docs/extensions.html#sql-adaptation-protocol-objects + +""" + +import re + +try: + from psycopg2 import InterfaceError + from psycopg2.extensions import AsIs +except ImportError as e: + InterfaceError = None + AsIs = None + +from wa.utils.types import level + + +def cast_level(value, cur): # pylint: disable=unused-argument + """Generic Level caster for psycopg2""" + if not InterfaceError: + raise ImportError('There was a problem importing psycopg2.') + if value is None: + return None + + m = re.match(r"([^\()]*)\((\d*)\)", value) + name = str(m.group(1)) + number = int(m.group(2)) + + if m: + return level(name, number) + else: + raise InterfaceError("Bad level representation: {}".format(value)) + + +def cast_vanilla(value, cur): # pylint: disable=unused-argument + """Vanilla Type caster for psycopg2 + + Simply returns the string representation. + """ + if value is None: + return None + else: + return str(value) + + +# List functions and classes for adapting + +def adapt_level(a_level): + """Generic Level Adapter for psycopg2""" + return "{}({})".format(a_level.name, a_level.value) + + +class ListOfLevel(object): + value = None + + def __init__(self, a_level): + self.value = a_level + + def return_original(self): + return self.value + + +def adapt_ListOfX(adapt_X): + """This will create a multi-column adapter for a particular type. + + Note that the type must itself need to be in array form. Therefore + this function serves to seaprate out individual lists into multiple + big lists. + E.g. if the X adapter produces array (a,b,c) + then this adapter will take an list of Xs and produce a master array: + ((a1,a2,a3),(b1,b2,b3),(c1,c2,c3)) + + Takes as its argument the adapter for the type which must produce an + SQL array string. + Note that you should NOT put the AsIs in the adapt_X function. + + The need for this function arises from the fact that we may want to + actually handle list-creating types differently if they themselves + are in a list, as in the example above, we cannot simply adopt a + recursive strategy. + + Note that master_list is the list representing the array. Each element + in the list will represent a subarray (column). If there is only one + subarray following processing then the outer {} are stripped to give a + 1 dimensional array. + """ + def adapter_function(param): + if not AsIs: + raise ImportError('There was a problem importing psycopg2.') + param = param.value + result_list = [] + for element in param: # Where param will be a list of X's + result_list.append(adapt_X(element)) + test_element = result_list[0] + num_items = len(test_element.split(",")) + master_list = [] + for x in range(num_items): + master_list.append("") + for element in result_list: + element = element.strip("{").strip("}") + element = element.split(",") + for x in range(num_items): + master_list[x] = master_list[x] + element[x] + "," + if num_items > 1: + master_sql_string = "{" + else: + master_sql_string = "" + for x in range(num_items): + # Remove trailing comma + master_list[x] = master_list[x].strip(",") + master_list[x] = "{" + master_list[x] + "}" + master_sql_string = master_sql_string + master_list[x] + "," + master_sql_string = master_sql_string.strip(",") + if num_items > 1: + master_sql_string = master_sql_string + "}" + return AsIs("'{}'".format(master_sql_string)) + return adapter_function + + +def return_as_is(adapt_X): + """Returns the AsIs appended function of the function passed + + This is useful for adapter functions intended to be used with the + adapt_ListOfX function, which must return strings, as it allows them + to be standalone adapters. + """ + if not AsIs: + raise ImportError('There was a problem importing psycopg2.') + + def adapter_function(param): + return AsIs("'{}'".format(adapt_X(param))) + return adapter_function + + +def adapt_vanilla(param): + """Vanilla adapter: simply returns the string representation""" + if not AsIs: + raise ImportError('There was a problem importing psycopg2.') + return AsIs("'{}'".format(param)) + + +def create_iterable_adapter(array_columns, explicit_iterate=False): + """Create an iterable adapter of a specified dimension + + If explicit_iterate is True, then it will be assumed that the param needs + to be iterated upon via param.iteritems(). Otherwise it will simply be + iterated vanilla. + The value of array_columns will be equal to the number of indexed elements + per item in the param iterable. E.g. a list of 3-element-long lists has + 3 elements per item in the iterable (the master list) and therefore + array_columns should be equal to 3. + If array_columns is 0, then this indicates that the iterable contains + single items. + """ + if not AsIs: + raise ImportError('There was a problem importing psycopg2.') + + def adapt_iterable(param): + """Adapts an iterable object into an SQL array""" + final_string = "" # String stores a string representation of the array + if param: + if array_columns > 1: + for index in range(array_columns): + array_string = "" + for item in param.iteritems(): + array_string = array_string + str(item[index]) + "," + array_string = array_string.strip(",") + array_string = "{" + array_string + "}" + final_string = final_string + array_string + "," + final_string = final_string.strip(",") + final_string = "{" + final_string + "}" + else: + # Simply return each item in the array + if explicit_iterate: + for item in param.iteritems(): + final_string = final_string + str(item) + "," + else: + for item in param: + final_string = final_string + str(item) + "," + final_string = "{" + final_string + "}" + return AsIs("'{}'".format(final_string)) + return adapt_iterable + + +# For reference only and future use +def adapt_list(param): + """Adapts a list into an array""" + if not AsIs: + raise ImportError('There was a problem importing psycopg2.') + final_string = "" + if param: + for item in param: + final_string = final_string + str(item) + "," + final_string = "{" + final_string + "}" + return AsIs("'{}'".format(final_string))