diff --git a/wa/commands/postgres_schema.sql b/wa/commands/postgres_schemas/postgres_schema.sql similarity index 84% rename from wa/commands/postgres_schema.sql rename to wa/commands/postgres_schemas/postgres_schema.sql index 26bca17a..7510c778 100644 --- a/wa/commands/postgres_schema.sql +++ b/wa/commands/postgres_schemas/postgres_schema.sql @@ -1,4 +1,4 @@ ---!VERSION!1.1!ENDVERSION! +--!VERSION!1.2!ENDVERSION! CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE EXTENSION IF NOT EXISTS "lo"; @@ -13,6 +13,7 @@ DROP TABLE IF EXISTS Metrics; DROP TABLE IF EXISTS Augmentations; DROP TABLE IF EXISTS Jobs_Augs; DROP TABLE IF EXISTS ResourceGetters; +DROP TABLE IF EXISTS Resource_Getters; DROP TABLE IF EXISTS Events; DROP TABLE IF EXISTS Targets; DROP TABLE IF EXISTS Jobs; @@ -42,6 +43,7 @@ CREATE TABLE Runs ( timestamp timestamp, run_name text, project text, + project_stage text, retry_on_status status_enum[], max_retries int, bail_on_init_failure boolean, @@ -49,7 +51,11 @@ CREATE TABLE Runs ( run_uuid uuid, start_time timestamp, end_time timestamp, + duration float, metadata jsonb, + _pod_version int, + _pod_serialization_version int, + state jsonb, PRIMARY KEY (oid) ); @@ -57,12 +63,14 @@ CREATE TABLE Jobs ( oid uuid NOT NULL, run_oid uuid NOT NULL references Runs(oid), status status_enum, - retries int, + retry int, label text, job_id text, iterations int, workload_name text, metadata jsonb, + _pod_version int, + _pod_serialization_version int, PRIMARY KEY (oid) ); @@ -82,6 +90,12 @@ CREATE TABLE Targets ( kernel_sha1 text, kernel_config text[], sched_features text[], + page_size_kb int, + screen_resolution int[], + prop json, + android_id text, + _pod_version int, + _pod_serialization_version int, PRIMARY KEY (oid) ); @@ -91,10 +105,12 @@ CREATE TABLE Events ( job_oid uuid references Jobs(oid), timestamp timestamp, message text, + _pod_version int, + _pod_serialization_version int, PRIMARY KEY (oid) ); -CREATE TABLE ResourceGetters ( +CREATE TABLE Resource_Getters ( oid uuid NOT NULL, run_oid uuid NOT NULL references Runs(oid), name text, @@ -123,6 +139,8 @@ CREATE TABLE Metrics ( value double precision, units text, lower_is_better boolean, + _pod_version int, + _pod_serialization_version int, PRIMARY KEY (oid) ); @@ -144,6 +162,8 @@ CREATE TABLE Artifacts ( large_object_uuid uuid NOT NULL references LargeObjects(oid), description text, kind text, + _pod_version int, + _pod_serialization_version int, PRIMARY KEY (oid) ); @@ -151,6 +171,8 @@ CREATE TABLE Classifiers ( oid uuid NOT NULL, artifact_oid uuid references Artifacts(oid), metric_oid uuid references Metrics(oid), + job_oid uuid references Jobs(oid), + run_oid uuid references Runs(oid), key text, value text, PRIMARY KEY (oid) @@ -161,7 +183,7 @@ CREATE TABLE Parameters ( run_oid uuid NOT NULL references Runs(oid), job_oid uuid references Jobs(oid), augmentation_oid uuid references Augmentations(oid), - resource_getter_oid uuid references ResourceGetters(oid), + resource_getter_oid uuid references Resource_Getters(oid), name text, value text, value_type text, diff --git a/wa/commands/postgres_schemas/postgres_schema_v1.2.sql b/wa/commands/postgres_schemas/postgres_schema_v1.2.sql new file mode 100644 index 00000000..1c982226 --- /dev/null +++ b/wa/commands/postgres_schemas/postgres_schema_v1.2.sql @@ -0,0 +1,30 @@ +ALTER TABLE resourcegetters RENAME TO resource_getters; + +ALTER TABLE classifiers ADD COLUMN job_oid uuid references Jobs(oid); +ALTER TABLE classifiers ADD COLUMN run_oid uuid references Runs(oid); + +ALTER TABLE targets ADD COLUMN page_size_kb int; +ALTER TABLE targets ADD COLUMN screen_resolution int[]; +ALTER TABLE targets ADD COLUMN prop text; +ALTER TABLE targets ADD COLUMN android_id text; +ALTER TABLE targets ADD COLUMN _pod_version int; +ALTER TABLE targets ADD COLUMN _pod_serialization_version int; + +ALTER TABLE jobs RENAME COLUMN retries TO retry; +ALTER TABLE jobs ADD COLUMN _pod_version int; +ALTER TABLE jobs ADD COLUMN _pod_serialization_version int; + +ALTER TABLE runs ADD COLUMN project_stage text; +ALTER TABLE runs ADD COLUMN state jsonb; +ALTER TABLE runs ADD COLUMN duration float; +ALTER TABLE runs ADD COLUMN _pod_version int; +ALTER TABLE runs ADD COLUMN _pod_serialization_version int; + +ALTER TABLE artifacts ADD COLUMN _pod_version int; +ALTER TABLE artifacts ADD COLUMN _pod_serialization_version int; + +ALTER TABLE events ADD COLUMN _pod_version int; +ALTER TABLE events ADD COLUMN _pod_serialization_version int; + +ALTER TABLE metrics ADD COLUMN _pod_version int; +ALTER TABLE metrics ADD COLUMN _pod_serialization_version int; diff --git a/wa/output_processors/postgresql.py b/wa/output_processors/postgresql.py index cdf360b4..5c7059de 100644 --- a/wa/output_processors/postgresql.py +++ b/wa/output_processors/postgresql.py @@ -16,7 +16,6 @@ import os import uuid import collections -import inspect try: import psycopg2 @@ -27,10 +26,12 @@ except ImportError as e: import_error_msg = e.args[0] if e.args else str(e) from devlib.target import KernelVersion, KernelConfig -import wa from wa import OutputProcessor, Parameter, OutputProcessorError from wa.framework.target.info import CpuInfo -from wa.utils import postgres +from wa.utils.postgres import (POSTGRES_SCHEMA_DIR, cast_level, cast_vanilla, + adapt_vanilla, return_as_is, adapt_level, + ListOfLevel, adapt_ListOfX, create_iterable_adapter, + get_schema, get_database_schema_version) from wa.utils.serializer import json from wa.utils.types import level @@ -44,10 +45,8 @@ class PostgresqlResultProcessor(OutputProcessor): The structure of this database can easily be understood by examining the postgres_schema.sql file (the schema used to generate it): {} - """.format(os.path.join( - os.path.dirname(inspect.getfile(wa)), - 'commands', - 'postgres_schema.sql')) + """.format(os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql')) + parameters = [ Parameter('username', default='postgres', description=""" @@ -85,19 +84,23 @@ class PostgresqlResultProcessor(OutputProcessor): # Commands sql_command = { - "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;", - "create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);", - "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)", - "create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)", - "create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)", + "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, project_stage, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata, state, _pod_version, _pod_serialization_version) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s, duration=%s, state=%s WHERE oid=%s;", + "create_job": "INSERT INTO Jobs (oid, run_oid, status, retry, label, job_id, iterations, workload_name, metadata, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", + "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features, page_size_kb, screen_resolution, prop, android_id, _pod_version, _pod_serialization_version) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s", + "create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s , %s, %s, %s)", "create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)", - "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)", - "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", - "create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)", + "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, job_oid, run_oid, key, value) VALUES (%s, %s, %s, %s, %s, %s, %s)", + "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_resource_getter": "INSERT INTO Resource_Getters (oid, run_oid, name) VALUES (%s, %s, %s)", "create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)", - "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"} + "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)" + } # Lists to track which run-related items have already been added metrics_already_added = [] @@ -124,34 +127,37 @@ class PostgresqlResultProcessor(OutputProcessor): # N.B. Typecasters are for postgres->python and adapters the opposite self.connect_to_database() self.cursor = self.conn.cursor() + self.check_schema_versions() + # Register the adapters and typecasters for enum types self.cursor.execute("SELECT NULL::status_enum") status_oid = self.cursor.description[0][1] self.cursor.execute("SELECT NULL::param_enum") param_oid = self.cursor.description[0][1] LEVEL = psycopg2.extensions.new_type( - (status_oid,), "LEVEL", postgres.cast_level) + (status_oid,), "LEVEL", cast_level) psycopg2.extensions.register_type(LEVEL) PARAM = psycopg2.extensions.new_type( - (param_oid,), "PARAM", postgres.cast_vanilla) + (param_oid,), "PARAM", cast_vanilla) psycopg2.extensions.register_type(PARAM) - psycopg2.extensions.register_adapter(level, postgres.return_as_is(postgres.adapt_level)) + psycopg2.extensions.register_adapter(level, return_as_is(adapt_level)) psycopg2.extensions.register_adapter( - postgres.ListOfLevel, postgres.adapt_ListOfX(postgres.adapt_level)) - psycopg2.extensions.register_adapter(KernelVersion, postgres.adapt_vanilla) + ListOfLevel, adapt_ListOfX(adapt_level)) + psycopg2.extensions.register_adapter(KernelVersion, adapt_vanilla) psycopg2.extensions.register_adapter( - CpuInfo, postgres.adapt_vanilla) + CpuInfo, adapt_vanilla) psycopg2.extensions.register_adapter( collections.OrderedDict, extras.Json) psycopg2.extensions.register_adapter(dict, extras.Json) psycopg2.extensions.register_adapter( - KernelConfig, postgres.create_iterable_adapter(2, explicit_iterate=True)) + KernelConfig, create_iterable_adapter(2, explicit_iterate=True)) # Register ready-made UUID type adapter extras.register_uuid() + # Insert a run_uuid which will be globally accessible during the run self.run_uuid = uuid.UUID(str(uuid.uuid4())) run_output = context.run_output - retry_on_status = postgres.ListOfLevel(run_output.run_config.retry_on_status) + retry_on_status = ListOfLevel(run_output.run_config.retry_on_status) self.cursor.execute( self.sql_command['create_run'], ( @@ -162,13 +168,19 @@ class PostgresqlResultProcessor(OutputProcessor): run_output.state.timestamp, run_output.info.run_name, run_output.info.project, + run_output.info.project_stage, retry_on_status, run_output.run_config.max_retries, run_output.run_config.bail_on_init_failure, run_output.run_config.allow_phone_home, run_output.info.uuid, run_output.info.start_time, - run_output.metadata)) + run_output.metadata, + json.dumps(run_output.state.to_pod()), + run_output.result._pod_version, # pylint: disable=protected-access + run_output.result._pod_serialization_version, # pylint: disable=protected-access + ) + ) self.target_uuid = uuid.uuid4() target_info = context.target_info target_pod = target_info.to_pod() @@ -191,7 +203,17 @@ class PostgresqlResultProcessor(OutputProcessor): target_pod['kernel_release'], target_info.kernel_version.sha1, target_info.kernel_config, - target_pod['sched_features'])) + target_pod['sched_features'], + target_pod['page_size_kb'], + # Android Specific + target_pod.get('screen_resolution'), + target_pod.get('prop'), + target_pod.get('android_id'), + target_pod.get('pod_version'), + target_pod.get('pod_serialization_version'), + ) + ) + # Commit cursor commands self.conn.commit() @@ -212,7 +234,26 @@ class PostgresqlResultProcessor(OutputProcessor): job_output.id, job_output.iteration, job_output.spec.workload_name, - job_output.metadata)) + job_output.metadata, + job_output.spec._pod_version, # pylint: disable=protected-access + job_output.spec._pod_serialization_version, # pylint: disable=protected-access + ) + ) + + for classifier in job_output.classifiers: + classifier_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_classifier'], + ( + classifier_uuid, + None, + None, + job_uuid, + None, + classifier, + job_output.classifiers[classifier] + ) + ) # Update the run table and run-level parameters self.cursor.execute( self.sql_command['update_run'], @@ -221,7 +262,24 @@ class PostgresqlResultProcessor(OutputProcessor): run_output.status, run_output.state.timestamp, run_output.info.end_time, + None, + json.dumps(run_output.state.to_pod()), self.run_uuid)) + for classifier in run_output.classifiers: + classifier_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_classifier'], + ( + classifier_uuid, + None, + None, + None, + None, + self.run_uuid, + classifier, + run_output.classifiers[classifier] + ) + ) self.sql_upload_artifacts(run_output, record_in_added=True) self.sql_upload_metrics(run_output, record_in_added=True) self.sql_upload_augmentations(run_output) @@ -255,19 +313,27 @@ class PostgresqlResultProcessor(OutputProcessor): ( job_status, job_id, - self.run_uuid)) + self.run_uuid + ) + ) run_uuid = self.run_uuid # Update the run entry after jobs have completed + run_info_pod = run_output.info.to_pod() + run_state_pod = run_output.state.to_pod() sql_command_update_run = self.sql_command['update_run'] self.cursor.execute( sql_command_update_run, ( run_output.event_summary, run_output.status, - run_output.state.timestamp, - run_output.info.end_time, - run_uuid)) + run_info_pod['start_time'], + run_info_pod['end_time'], + run_info_pod['duration'], + json.dumps(run_state_pod), + run_uuid, + ) + ) self.sql_upload_events(run_output) self.sql_upload_artifacts(run_output, check_uniqueness=True) self.sql_upload_metrics(run_output, check_uniqueness=True) @@ -284,11 +350,14 @@ class PostgresqlResultProcessor(OutputProcessor): ( resource_getter_uuid, self.run_uuid, - resource_getter)) + resource_getter, + ) + ) self.sql_upload_parameters( 'resource_getter', output_object.run_config.resource_getters[resource_getter], - owner_id=resource_getter_uuid) + owner_id=resource_getter_uuid, + ) def sql_upload_events(self, output_object, job_uuid=None): for event in output_object.events: @@ -300,7 +369,11 @@ class PostgresqlResultProcessor(OutputProcessor): self.run_uuid, job_uuid, event.timestamp, - event.message)) + event.message, + event._pod_version, # pylint: disable=protected-access + event._pod_serialization_version, # pylint: disable=protected-access + ) + ) def sql_upload_job_augmentations(self, output_object, job_uuid=None): ''' This is a table which links the uuids of augmentations to jobs. @@ -320,7 +393,9 @@ class PostgresqlResultProcessor(OutputProcessor): ( job_aug_uuid, job_uuid, - augmentation_uuid)) + augmentation_uuid, + ) + ) def sql_upload_augmentations(self, output_object): for augmentation in output_object.augmentations: @@ -332,11 +407,14 @@ class PostgresqlResultProcessor(OutputProcessor): ( augmentation_uuid, self.run_uuid, - augmentation)) + augmentation, + ) + ) self.sql_upload_parameters( 'augmentation', output_object.run_config.augmentations[augmentation], - owner_id=augmentation_uuid) + owner_id=augmentation_uuid, + ) self.augmentations_already_added[augmentation] = augmentation_uuid def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None): @@ -353,7 +431,11 @@ class PostgresqlResultProcessor(OutputProcessor): metric.name, metric.value, metric.units, - metric.lower_is_better)) + metric.lower_is_better, + metric._pod_version, # pylint: disable=protected-access + metric._pod_serialization_version, # pylint: disable=protected-access + ) + ) for classifier in metric.classifiers: classifier_uuid = uuid.uuid4() self.cursor.execute( @@ -362,8 +444,12 @@ class PostgresqlResultProcessor(OutputProcessor): classifier_uuid, None, metric_uuid, + None, + None, classifier, - metric.classifiers[classifier])) + metric.classifiers[classifier], + ) + ) if record_in_added: self.metrics_already_added.append(metric) @@ -374,7 +460,7 @@ class PostgresqlResultProcessor(OutputProcessor): ''' for artifact in output_object.artifacts: if artifact in self.artifacts_already_added and check_uniqueness: - self.logger.debug('Skipping uploading {} as already added' .format(artifact)) + self.logger.debug('Skipping uploading {} as already added'.format(artifact)) continue if artifact in self.artifacts_already_added: @@ -411,7 +497,9 @@ class PostgresqlResultProcessor(OutputProcessor): parameter, json.dumps(parameter_dict[parameter]), str(type(parameter_dict[parameter])), - parameter_type)) + parameter_type, + ) + ) def connect_to_database(self): dsn = "dbname={} user={} password={} host={} port={}".format( @@ -432,6 +520,21 @@ class PostgresqlResultProcessor(OutputProcessor): self.conn.commit() self.conn.reset() + def check_schema_versions(self): + schemafilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql') + cur_major_version, cur_minor_version, _ = get_schema(schemafilepath) + db_schema_version = get_database_schema_version(self.cursor) + if (cur_major_version, cur_minor_version) != db_schema_version: + self.cursor.close() + self.cursor = None + self.conn.commit() + self.conn.reset() + msg = 'The current database schema is v{} however the local ' \ + 'schema version is v{}. Please update your database ' \ + 'with the create command' + raise OutputProcessorError(msg.format(db_schema_version, + (cur_major_version, cur_minor_version))) + def _sql_write_lobject(self, source, lobject): with open(source) as lobj_file: lobj_data = lobj_file.read() @@ -458,7 +561,9 @@ class PostgresqlResultProcessor(OutputProcessor): self.sql_command['create_large_object'], ( large_object_uuid, - loid)) + loid, + ) + ) self.cursor.execute( self.sql_command['create_artifact'], ( @@ -468,7 +573,11 @@ class PostgresqlResultProcessor(OutputProcessor): artifact.name, large_object_uuid, artifact.description, - artifact.kind)) + str(artifact.kind), + artifact._pod_version, # pylint: disable=protected-access + artifact._pod_serialization_version, # pylint: disable=protected-access + ) + ) for classifier in artifact.classifiers: classifier_uuid = uuid.uuid4() self.cursor.execute( @@ -477,7 +586,11 @@ class PostgresqlResultProcessor(OutputProcessor): classifier_uuid, artifact_uuid, None, + None, + None, classifier, - artifact.classifiers[classifier])) + artifact.classifiers[classifier], + ) + ) if record_in_added: self.artifacts_already_added[artifact] = loid