1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-04-15 23:30:47 +01:00

postgres: Update schema to v1.2

Update the postgres database schema:
    - Rename "resourcegetters" schema to "resource_getters" for
      consistency
    - Rename "retreies" colum to "retry" to better relflect it purpose
    - Store additional information including:
        - POD serialization data
        - Missing target information
        - JSON formatted runstate
This commit is contained in:
Marc Bonnici 2018-10-30 17:08:14 +00:00 committed by setrofim
parent 64f7c2431e
commit 250bf61c4b
3 changed files with 215 additions and 50 deletions

View File

@ -1,4 +1,4 @@
--!VERSION!1.1!ENDVERSION! --!VERSION!1.2!ENDVERSION!
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "lo"; CREATE EXTENSION IF NOT EXISTS "lo";
@ -13,6 +13,7 @@ DROP TABLE IF EXISTS Metrics;
DROP TABLE IF EXISTS Augmentations; DROP TABLE IF EXISTS Augmentations;
DROP TABLE IF EXISTS Jobs_Augs; DROP TABLE IF EXISTS Jobs_Augs;
DROP TABLE IF EXISTS ResourceGetters; DROP TABLE IF EXISTS ResourceGetters;
DROP TABLE IF EXISTS Resource_Getters;
DROP TABLE IF EXISTS Events; DROP TABLE IF EXISTS Events;
DROP TABLE IF EXISTS Targets; DROP TABLE IF EXISTS Targets;
DROP TABLE IF EXISTS Jobs; DROP TABLE IF EXISTS Jobs;
@ -42,6 +43,7 @@ CREATE TABLE Runs (
timestamp timestamp, timestamp timestamp,
run_name text, run_name text,
project text, project text,
project_stage text,
retry_on_status status_enum[], retry_on_status status_enum[],
max_retries int, max_retries int,
bail_on_init_failure boolean, bail_on_init_failure boolean,
@ -49,7 +51,11 @@ CREATE TABLE Runs (
run_uuid uuid, run_uuid uuid,
start_time timestamp, start_time timestamp,
end_time timestamp, end_time timestamp,
duration float,
metadata jsonb, metadata jsonb,
_pod_version int,
_pod_serialization_version int,
state jsonb,
PRIMARY KEY (oid) PRIMARY KEY (oid)
); );
@ -57,12 +63,14 @@ CREATE TABLE Jobs (
oid uuid NOT NULL, oid uuid NOT NULL,
run_oid uuid NOT NULL references Runs(oid), run_oid uuid NOT NULL references Runs(oid),
status status_enum, status status_enum,
retries int, retry int,
label text, label text,
job_id text, job_id text,
iterations int, iterations int,
workload_name text, workload_name text,
metadata jsonb, metadata jsonb,
_pod_version int,
_pod_serialization_version int,
PRIMARY KEY (oid) PRIMARY KEY (oid)
); );
@ -82,6 +90,12 @@ CREATE TABLE Targets (
kernel_sha1 text, kernel_sha1 text,
kernel_config text[], kernel_config text[],
sched_features text[], sched_features text[],
page_size_kb int,
screen_resolution int[],
prop json,
android_id text,
_pod_version int,
_pod_serialization_version int,
PRIMARY KEY (oid) PRIMARY KEY (oid)
); );
@ -91,10 +105,12 @@ CREATE TABLE Events (
job_oid uuid references Jobs(oid), job_oid uuid references Jobs(oid),
timestamp timestamp, timestamp timestamp,
message text, message text,
_pod_version int,
_pod_serialization_version int,
PRIMARY KEY (oid) PRIMARY KEY (oid)
); );
CREATE TABLE ResourceGetters ( CREATE TABLE Resource_Getters (
oid uuid NOT NULL, oid uuid NOT NULL,
run_oid uuid NOT NULL references Runs(oid), run_oid uuid NOT NULL references Runs(oid),
name text, name text,
@ -123,6 +139,8 @@ CREATE TABLE Metrics (
value double precision, value double precision,
units text, units text,
lower_is_better boolean, lower_is_better boolean,
_pod_version int,
_pod_serialization_version int,
PRIMARY KEY (oid) PRIMARY KEY (oid)
); );
@ -144,6 +162,8 @@ CREATE TABLE Artifacts (
large_object_uuid uuid NOT NULL references LargeObjects(oid), large_object_uuid uuid NOT NULL references LargeObjects(oid),
description text, description text,
kind text, kind text,
_pod_version int,
_pod_serialization_version int,
PRIMARY KEY (oid) PRIMARY KEY (oid)
); );
@ -151,6 +171,8 @@ CREATE TABLE Classifiers (
oid uuid NOT NULL, oid uuid NOT NULL,
artifact_oid uuid references Artifacts(oid), artifact_oid uuid references Artifacts(oid),
metric_oid uuid references Metrics(oid), metric_oid uuid references Metrics(oid),
job_oid uuid references Jobs(oid),
run_oid uuid references Runs(oid),
key text, key text,
value text, value text,
PRIMARY KEY (oid) PRIMARY KEY (oid)
@ -161,7 +183,7 @@ CREATE TABLE Parameters (
run_oid uuid NOT NULL references Runs(oid), run_oid uuid NOT NULL references Runs(oid),
job_oid uuid references Jobs(oid), job_oid uuid references Jobs(oid),
augmentation_oid uuid references Augmentations(oid), augmentation_oid uuid references Augmentations(oid),
resource_getter_oid uuid references ResourceGetters(oid), resource_getter_oid uuid references Resource_Getters(oid),
name text, name text,
value text, value text,
value_type text, value_type text,

View File

@ -0,0 +1,30 @@
ALTER TABLE resourcegetters RENAME TO resource_getters;
ALTER TABLE classifiers ADD COLUMN job_oid uuid references Jobs(oid);
ALTER TABLE classifiers ADD COLUMN run_oid uuid references Runs(oid);
ALTER TABLE targets ADD COLUMN page_size_kb int;
ALTER TABLE targets ADD COLUMN screen_resolution int[];
ALTER TABLE targets ADD COLUMN prop text;
ALTER TABLE targets ADD COLUMN android_id text;
ALTER TABLE targets ADD COLUMN _pod_version int;
ALTER TABLE targets ADD COLUMN _pod_serialization_version int;
ALTER TABLE jobs RENAME COLUMN retries TO retry;
ALTER TABLE jobs ADD COLUMN _pod_version int;
ALTER TABLE jobs ADD COLUMN _pod_serialization_version int;
ALTER TABLE runs ADD COLUMN project_stage text;
ALTER TABLE runs ADD COLUMN state jsonb;
ALTER TABLE runs ADD COLUMN duration float;
ALTER TABLE runs ADD COLUMN _pod_version int;
ALTER TABLE runs ADD COLUMN _pod_serialization_version int;
ALTER TABLE artifacts ADD COLUMN _pod_version int;
ALTER TABLE artifacts ADD COLUMN _pod_serialization_version int;
ALTER TABLE events ADD COLUMN _pod_version int;
ALTER TABLE events ADD COLUMN _pod_serialization_version int;
ALTER TABLE metrics ADD COLUMN _pod_version int;
ALTER TABLE metrics ADD COLUMN _pod_serialization_version int;

View File

@ -16,7 +16,6 @@
import os import os
import uuid import uuid
import collections import collections
import inspect
try: try:
import psycopg2 import psycopg2
@ -27,10 +26,12 @@ except ImportError as e:
import_error_msg = e.args[0] if e.args else str(e) import_error_msg = e.args[0] if e.args else str(e)
from devlib.target import KernelVersion, KernelConfig from devlib.target import KernelVersion, KernelConfig
import wa
from wa import OutputProcessor, Parameter, OutputProcessorError from wa import OutputProcessor, Parameter, OutputProcessorError
from wa.framework.target.info import CpuInfo from wa.framework.target.info import CpuInfo
from wa.utils import postgres from wa.utils.postgres import (POSTGRES_SCHEMA_DIR, cast_level, cast_vanilla,
adapt_vanilla, return_as_is, adapt_level,
ListOfLevel, adapt_ListOfX, create_iterable_adapter,
get_schema, get_database_schema_version)
from wa.utils.serializer import json from wa.utils.serializer import json
from wa.utils.types import level from wa.utils.types import level
@ -44,10 +45,8 @@ class PostgresqlResultProcessor(OutputProcessor):
The structure of this database can easily be understood by examining The structure of this database can easily be understood by examining
the postgres_schema.sql file (the schema used to generate it): the postgres_schema.sql file (the schema used to generate it):
{} {}
""".format(os.path.join( """.format(os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql'))
os.path.dirname(inspect.getfile(wa)),
'commands',
'postgres_schema.sql'))
parameters = [ parameters = [
Parameter('username', default='postgres', Parameter('username', default='postgres',
description=""" description="""
@ -85,19 +84,23 @@ class PostgresqlResultProcessor(OutputProcessor):
# Commands # Commands
sql_command = { sql_command = {
"create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, project_stage, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata, state, _pod_version, _pod_serialization_version) "
"update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;", "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);", "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s, duration=%s, state=%s WHERE oid=%s;",
"create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", "create_job": "INSERT INTO Jobs (oid, run_oid, status, retry, label, job_id, iterations, workload_name, metadata, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);",
"create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)", "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features, page_size_kb, screen_resolution, prop, android_id, _pod_version, _pod_serialization_version) "
"create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)", "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)", "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s",
"create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s , %s, %s, %s)",
"create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)", "create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)",
"create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)", "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, job_oid, run_oid, key, value) VALUES (%s, %s, %s, %s, %s, %s, %s)",
"create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) "
"create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)", "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_resource_getter": "INSERT INTO Resource_Getters (oid, run_oid, name) VALUES (%s, %s, %s)",
"create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)", "create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)",
"create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"} "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"
}
# Lists to track which run-related items have already been added # Lists to track which run-related items have already been added
metrics_already_added = [] metrics_already_added = []
@ -124,34 +127,37 @@ class PostgresqlResultProcessor(OutputProcessor):
# N.B. Typecasters are for postgres->python and adapters the opposite # N.B. Typecasters are for postgres->python and adapters the opposite
self.connect_to_database() self.connect_to_database()
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()
self.check_schema_versions()
# Register the adapters and typecasters for enum types # Register the adapters and typecasters for enum types
self.cursor.execute("SELECT NULL::status_enum") self.cursor.execute("SELECT NULL::status_enum")
status_oid = self.cursor.description[0][1] status_oid = self.cursor.description[0][1]
self.cursor.execute("SELECT NULL::param_enum") self.cursor.execute("SELECT NULL::param_enum")
param_oid = self.cursor.description[0][1] param_oid = self.cursor.description[0][1]
LEVEL = psycopg2.extensions.new_type( LEVEL = psycopg2.extensions.new_type(
(status_oid,), "LEVEL", postgres.cast_level) (status_oid,), "LEVEL", cast_level)
psycopg2.extensions.register_type(LEVEL) psycopg2.extensions.register_type(LEVEL)
PARAM = psycopg2.extensions.new_type( PARAM = psycopg2.extensions.new_type(
(param_oid,), "PARAM", postgres.cast_vanilla) (param_oid,), "PARAM", cast_vanilla)
psycopg2.extensions.register_type(PARAM) psycopg2.extensions.register_type(PARAM)
psycopg2.extensions.register_adapter(level, postgres.return_as_is(postgres.adapt_level)) psycopg2.extensions.register_adapter(level, return_as_is(adapt_level))
psycopg2.extensions.register_adapter( psycopg2.extensions.register_adapter(
postgres.ListOfLevel, postgres.adapt_ListOfX(postgres.adapt_level)) ListOfLevel, adapt_ListOfX(adapt_level))
psycopg2.extensions.register_adapter(KernelVersion, postgres.adapt_vanilla) psycopg2.extensions.register_adapter(KernelVersion, adapt_vanilla)
psycopg2.extensions.register_adapter( psycopg2.extensions.register_adapter(
CpuInfo, postgres.adapt_vanilla) CpuInfo, adapt_vanilla)
psycopg2.extensions.register_adapter( psycopg2.extensions.register_adapter(
collections.OrderedDict, extras.Json) collections.OrderedDict, extras.Json)
psycopg2.extensions.register_adapter(dict, extras.Json) psycopg2.extensions.register_adapter(dict, extras.Json)
psycopg2.extensions.register_adapter( psycopg2.extensions.register_adapter(
KernelConfig, postgres.create_iterable_adapter(2, explicit_iterate=True)) KernelConfig, create_iterable_adapter(2, explicit_iterate=True))
# Register ready-made UUID type adapter # Register ready-made UUID type adapter
extras.register_uuid() extras.register_uuid()
# Insert a run_uuid which will be globally accessible during the run # Insert a run_uuid which will be globally accessible during the run
self.run_uuid = uuid.UUID(str(uuid.uuid4())) self.run_uuid = uuid.UUID(str(uuid.uuid4()))
run_output = context.run_output run_output = context.run_output
retry_on_status = postgres.ListOfLevel(run_output.run_config.retry_on_status) retry_on_status = ListOfLevel(run_output.run_config.retry_on_status)
self.cursor.execute( self.cursor.execute(
self.sql_command['create_run'], self.sql_command['create_run'],
( (
@ -162,13 +168,19 @@ class PostgresqlResultProcessor(OutputProcessor):
run_output.state.timestamp, run_output.state.timestamp,
run_output.info.run_name, run_output.info.run_name,
run_output.info.project, run_output.info.project,
run_output.info.project_stage,
retry_on_status, retry_on_status,
run_output.run_config.max_retries, run_output.run_config.max_retries,
run_output.run_config.bail_on_init_failure, run_output.run_config.bail_on_init_failure,
run_output.run_config.allow_phone_home, run_output.run_config.allow_phone_home,
run_output.info.uuid, run_output.info.uuid,
run_output.info.start_time, run_output.info.start_time,
run_output.metadata)) run_output.metadata,
json.dumps(run_output.state.to_pod()),
run_output.result._pod_version, # pylint: disable=protected-access
run_output.result._pod_serialization_version, # pylint: disable=protected-access
)
)
self.target_uuid = uuid.uuid4() self.target_uuid = uuid.uuid4()
target_info = context.target_info target_info = context.target_info
target_pod = target_info.to_pod() target_pod = target_info.to_pod()
@ -191,7 +203,17 @@ class PostgresqlResultProcessor(OutputProcessor):
target_pod['kernel_release'], target_pod['kernel_release'],
target_info.kernel_version.sha1, target_info.kernel_version.sha1,
target_info.kernel_config, target_info.kernel_config,
target_pod['sched_features'])) target_pod['sched_features'],
target_pod['page_size_kb'],
# Android Specific
target_pod.get('screen_resolution'),
target_pod.get('prop'),
target_pod.get('android_id'),
target_pod.get('pod_version'),
target_pod.get('pod_serialization_version'),
)
)
# Commit cursor commands # Commit cursor commands
self.conn.commit() self.conn.commit()
@ -212,7 +234,26 @@ class PostgresqlResultProcessor(OutputProcessor):
job_output.id, job_output.id,
job_output.iteration, job_output.iteration,
job_output.spec.workload_name, job_output.spec.workload_name,
job_output.metadata)) job_output.metadata,
job_output.spec._pod_version, # pylint: disable=protected-access
job_output.spec._pod_serialization_version, # pylint: disable=protected-access
)
)
for classifier in job_output.classifiers:
classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
classifier_uuid,
None,
None,
job_uuid,
None,
classifier,
job_output.classifiers[classifier]
)
)
# Update the run table and run-level parameters # Update the run table and run-level parameters
self.cursor.execute( self.cursor.execute(
self.sql_command['update_run'], self.sql_command['update_run'],
@ -221,7 +262,24 @@ class PostgresqlResultProcessor(OutputProcessor):
run_output.status, run_output.status,
run_output.state.timestamp, run_output.state.timestamp,
run_output.info.end_time, run_output.info.end_time,
None,
json.dumps(run_output.state.to_pod()),
self.run_uuid)) self.run_uuid))
for classifier in run_output.classifiers:
classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
classifier_uuid,
None,
None,
None,
None,
self.run_uuid,
classifier,
run_output.classifiers[classifier]
)
)
self.sql_upload_artifacts(run_output, record_in_added=True) self.sql_upload_artifacts(run_output, record_in_added=True)
self.sql_upload_metrics(run_output, record_in_added=True) self.sql_upload_metrics(run_output, record_in_added=True)
self.sql_upload_augmentations(run_output) self.sql_upload_augmentations(run_output)
@ -255,19 +313,27 @@ class PostgresqlResultProcessor(OutputProcessor):
( (
job_status, job_status,
job_id, job_id,
self.run_uuid)) self.run_uuid
)
)
run_uuid = self.run_uuid run_uuid = self.run_uuid
# Update the run entry after jobs have completed # Update the run entry after jobs have completed
run_info_pod = run_output.info.to_pod()
run_state_pod = run_output.state.to_pod()
sql_command_update_run = self.sql_command['update_run'] sql_command_update_run = self.sql_command['update_run']
self.cursor.execute( self.cursor.execute(
sql_command_update_run, sql_command_update_run,
( (
run_output.event_summary, run_output.event_summary,
run_output.status, run_output.status,
run_output.state.timestamp, run_info_pod['start_time'],
run_output.info.end_time, run_info_pod['end_time'],
run_uuid)) run_info_pod['duration'],
json.dumps(run_state_pod),
run_uuid,
)
)
self.sql_upload_events(run_output) self.sql_upload_events(run_output)
self.sql_upload_artifacts(run_output, check_uniqueness=True) self.sql_upload_artifacts(run_output, check_uniqueness=True)
self.sql_upload_metrics(run_output, check_uniqueness=True) self.sql_upload_metrics(run_output, check_uniqueness=True)
@ -284,11 +350,14 @@ class PostgresqlResultProcessor(OutputProcessor):
( (
resource_getter_uuid, resource_getter_uuid,
self.run_uuid, self.run_uuid,
resource_getter)) resource_getter,
)
)
self.sql_upload_parameters( self.sql_upload_parameters(
'resource_getter', 'resource_getter',
output_object.run_config.resource_getters[resource_getter], output_object.run_config.resource_getters[resource_getter],
owner_id=resource_getter_uuid) owner_id=resource_getter_uuid,
)
def sql_upload_events(self, output_object, job_uuid=None): def sql_upload_events(self, output_object, job_uuid=None):
for event in output_object.events: for event in output_object.events:
@ -300,7 +369,11 @@ class PostgresqlResultProcessor(OutputProcessor):
self.run_uuid, self.run_uuid,
job_uuid, job_uuid,
event.timestamp, event.timestamp,
event.message)) event.message,
event._pod_version, # pylint: disable=protected-access
event._pod_serialization_version, # pylint: disable=protected-access
)
)
def sql_upload_job_augmentations(self, output_object, job_uuid=None): def sql_upload_job_augmentations(self, output_object, job_uuid=None):
''' This is a table which links the uuids of augmentations to jobs. ''' This is a table which links the uuids of augmentations to jobs.
@ -320,7 +393,9 @@ class PostgresqlResultProcessor(OutputProcessor):
( (
job_aug_uuid, job_aug_uuid,
job_uuid, job_uuid,
augmentation_uuid)) augmentation_uuid,
)
)
def sql_upload_augmentations(self, output_object): def sql_upload_augmentations(self, output_object):
for augmentation in output_object.augmentations: for augmentation in output_object.augmentations:
@ -332,11 +407,14 @@ class PostgresqlResultProcessor(OutputProcessor):
( (
augmentation_uuid, augmentation_uuid,
self.run_uuid, self.run_uuid,
augmentation)) augmentation,
)
)
self.sql_upload_parameters( self.sql_upload_parameters(
'augmentation', 'augmentation',
output_object.run_config.augmentations[augmentation], output_object.run_config.augmentations[augmentation],
owner_id=augmentation_uuid) owner_id=augmentation_uuid,
)
self.augmentations_already_added[augmentation] = augmentation_uuid self.augmentations_already_added[augmentation] = augmentation_uuid
def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None): def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None):
@ -353,7 +431,11 @@ class PostgresqlResultProcessor(OutputProcessor):
metric.name, metric.name,
metric.value, metric.value,
metric.units, metric.units,
metric.lower_is_better)) metric.lower_is_better,
metric._pod_version, # pylint: disable=protected-access
metric._pod_serialization_version, # pylint: disable=protected-access
)
)
for classifier in metric.classifiers: for classifier in metric.classifiers:
classifier_uuid = uuid.uuid4() classifier_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
@ -362,8 +444,12 @@ class PostgresqlResultProcessor(OutputProcessor):
classifier_uuid, classifier_uuid,
None, None,
metric_uuid, metric_uuid,
None,
None,
classifier, classifier,
metric.classifiers[classifier])) metric.classifiers[classifier],
)
)
if record_in_added: if record_in_added:
self.metrics_already_added.append(metric) self.metrics_already_added.append(metric)
@ -374,7 +460,7 @@ class PostgresqlResultProcessor(OutputProcessor):
''' '''
for artifact in output_object.artifacts: for artifact in output_object.artifacts:
if artifact in self.artifacts_already_added and check_uniqueness: if artifact in self.artifacts_already_added and check_uniqueness:
self.logger.debug('Skipping uploading {} as already added' .format(artifact)) self.logger.debug('Skipping uploading {} as already added'.format(artifact))
continue continue
if artifact in self.artifacts_already_added: if artifact in self.artifacts_already_added:
@ -411,7 +497,9 @@ class PostgresqlResultProcessor(OutputProcessor):
parameter, parameter,
json.dumps(parameter_dict[parameter]), json.dumps(parameter_dict[parameter]),
str(type(parameter_dict[parameter])), str(type(parameter_dict[parameter])),
parameter_type)) parameter_type,
)
)
def connect_to_database(self): def connect_to_database(self):
dsn = "dbname={} user={} password={} host={} port={}".format( dsn = "dbname={} user={} password={} host={} port={}".format(
@ -432,6 +520,21 @@ class PostgresqlResultProcessor(OutputProcessor):
self.conn.commit() self.conn.commit()
self.conn.reset() self.conn.reset()
def check_schema_versions(self):
schemafilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql')
cur_major_version, cur_minor_version, _ = get_schema(schemafilepath)
db_schema_version = get_database_schema_version(self.cursor)
if (cur_major_version, cur_minor_version) != db_schema_version:
self.cursor.close()
self.cursor = None
self.conn.commit()
self.conn.reset()
msg = 'The current database schema is v{} however the local ' \
'schema version is v{}. Please update your database ' \
'with the create command'
raise OutputProcessorError(msg.format(db_schema_version,
(cur_major_version, cur_minor_version)))
def _sql_write_lobject(self, source, lobject): def _sql_write_lobject(self, source, lobject):
with open(source) as lobj_file: with open(source) as lobj_file:
lobj_data = lobj_file.read() lobj_data = lobj_file.read()
@ -458,7 +561,9 @@ class PostgresqlResultProcessor(OutputProcessor):
self.sql_command['create_large_object'], self.sql_command['create_large_object'],
( (
large_object_uuid, large_object_uuid,
loid)) loid,
)
)
self.cursor.execute( self.cursor.execute(
self.sql_command['create_artifact'], self.sql_command['create_artifact'],
( (
@ -468,7 +573,11 @@ class PostgresqlResultProcessor(OutputProcessor):
artifact.name, artifact.name,
large_object_uuid, large_object_uuid,
artifact.description, artifact.description,
artifact.kind)) str(artifact.kind),
artifact._pod_version, # pylint: disable=protected-access
artifact._pod_serialization_version, # pylint: disable=protected-access
)
)
for classifier in artifact.classifiers: for classifier in artifact.classifiers:
classifier_uuid = uuid.uuid4() classifier_uuid = uuid.uuid4()
self.cursor.execute( self.cursor.execute(
@ -477,7 +586,11 @@ class PostgresqlResultProcessor(OutputProcessor):
classifier_uuid, classifier_uuid,
artifact_uuid, artifact_uuid,
None, None,
None,
None,
classifier, classifier,
artifact.classifiers[classifier])) artifact.classifiers[classifier],
)
)
if record_in_added: if record_in_added:
self.artifacts_already_added[artifact] = loid self.artifacts_already_added[artifact] = loid