1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-18 20:11:20 +00:00
Marc Bonnici 281eb6adf9 output_processors/postgresql: Refactor and fix uploading duplication
Previously run level artifacts would be added with a particular job_id,
and updated artifacts would be stored as new objects each time. Refactor
to remove unnecessary instance variables, only provide a job_id when
required and add an update capability for largeobjects to ensure this
does not happen.
2018-10-24 10:42:28 +01:00

482 lines
21 KiB
Python

# Copyright 2018 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import uuid
import collections
import inspect
try:
import psycopg2
from psycopg2 import (connect, extras)
from psycopg2 import Error as Psycopg2Error
except ImportError as e:
psycopg2 = None
import_error_msg = e.args[0] if e.args else str(e)
from devlib.target import KernelVersion, KernelConfig
import wa
from wa.utils import postgres_convert
from wa import OutputProcessor, Parameter, OutputProcessorError
from wa.utils.types import level
from wa.framework.target.info import CpuInfo
class PostgresqlResultProcessor(OutputProcessor):
name = 'postgres'
description = """
Stores results in a Postgresql database.
The structure of this database can easily be understood by examining
the postgres_schema.sql file (the schema used to generate it):
{}
""".format(os.path.join(
os.path.dirname(inspect.getfile(wa)),
'commands',
'postgres_schema.sql'))
parameters = [
Parameter('username', default='postgres',
description="""
This is the username that will be used to connect to the
Postgresql database. Note that depending on whether the user
has privileges to modify the database (normally only possible
on localhost), the user may only be able to append entries.
"""),
Parameter('password', default=None,
description="""
The password to be used to connect to the specified database
with the specified username.
"""),
Parameter('dbname', default='wa',
description="""
Name of the database that will be created or added to. Note,
to override this, you can specify a value in your user
wa configuration file.
"""),
Parameter('host', kind=str, default='localhost',
description="""
The host where the Postgresql server is running. The default
is localhost (i.e. the machine that wa is running on).
This is useful for complex systems where multiple machines
may be executing workloads and uploading their results to
a remote, centralised database.
"""),
Parameter('port', kind=str, default='5432',
description="""
The port the Postgresql server is running on, on the host.
The default is Postgresql's default, so do not change this
unless you have modified the default port for Postgresql.
"""),
]
# Commands
sql_command = {
"create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
"update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;",
"create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);",
"create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)",
"create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)",
"create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)",
"create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)",
"create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)",
"create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)",
"create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)",
"create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"}
# Lists to track which run-related items have already been added
metrics_already_added = []
# Dicts needed so that jobs can look up ids
artifacts_already_added = {}
augmentations_already_added = {}
# Status bits (flags)
first_job_run = True
def __init__(self, *args, **kwargs):
super(PostgresqlResultProcessor, self).__init__(*args, **kwargs)
self.conn = None
self.cursor = None
self.run_uuid = None
self.target_uuid = None
def initialize(self, context):
if not psycopg2:
raise ImportError(
'The psycopg2 module is required for the ' +
'Postgresql Output Processor: {}'.format(import_error_msg))
# N.B. Typecasters are for postgres->python and adapters the opposite
self.connect_to_database()
self.cursor = self.conn.cursor()
# Register the adapters and typecasters for enum types
self.cursor.execute("SELECT NULL::status_enum")
status_oid = self.cursor.description[0][1]
self.cursor.execute("SELECT NULL::param_enum")
param_oid = self.cursor.description[0][1]
LEVEL = psycopg2.extensions.new_type(
(status_oid,), "LEVEL", postgres_convert.cast_level)
psycopg2.extensions.register_type(LEVEL)
PARAM = psycopg2.extensions.new_type(
(param_oid,), "PARAM", postgres_convert.cast_vanilla)
psycopg2.extensions.register_type(PARAM)
psycopg2.extensions.register_adapter(level, postgres_convert.return_as_is(postgres_convert.adapt_level))
psycopg2.extensions.register_adapter(
postgres_convert.ListOfLevel, postgres_convert.adapt_ListOfX(postgres_convert.adapt_level))
psycopg2.extensions.register_adapter(KernelVersion, postgres_convert.adapt_vanilla)
psycopg2.extensions.register_adapter(
CpuInfo, postgres_convert.adapt_vanilla)
psycopg2.extensions.register_adapter(
collections.OrderedDict, extras.Json)
psycopg2.extensions.register_adapter(dict, extras.Json)
psycopg2.extensions.register_adapter(
KernelConfig, postgres_convert.create_iterable_adapter(2, explicit_iterate=True))
# Register ready-made UUID type adapter
extras.register_uuid()
# Insert a run_uuid which will be globally accessible during the run
self.run_uuid = uuid.UUID(str(uuid.uuid4()))
run_output = context.run_output
retry_on_status = postgres_convert.ListOfLevel(run_output.run_config.retry_on_status)
self.cursor.execute(
self.sql_command['create_run'],
(
self.run_uuid,
run_output.event_summary,
run_output.basepath,
run_output.status,
run_output.state.timestamp,
run_output.info.run_name,
run_output.info.project,
retry_on_status,
run_output.run_config.max_retries,
run_output.run_config.bail_on_init_failure,
run_output.run_config.allow_phone_home,
run_output.info.uuid,
run_output.info.start_time,
run_output.metadata))
self.target_uuid = uuid.uuid4()
target_info = context.target_info
self.cursor.execute(
self.sql_command['create_target'],
(
self.target_uuid,
self.run_uuid,
target_info.target,
target_info.cpus,
target_info.os,
target_info.os_version,
target_info.hostid,
target_info.hostname,
target_info.abi,
target_info.is_rooted,
# Important caveat: kernel_version is the name of the column in the Targets table
# However, this refers to kernel_version.version, not to kernel_version as a whole
target_info.kernel_version.version,
target_info.kernel_version.release,
target_info.kernel_version.sha1,
target_info.kernel_config,
target_info.sched_features))
# Commit cursor commands
self.conn.commit()
def export_job_output(self, job_output, target_info, run_output): # pylint: disable=too-many-branches, too-many-statements, too-many-locals, unused-argument
''' Run once for each job to upload information that is
updated on a job by job basis.
'''
job_uuid = uuid.uuid4()
# Create a new job
self.cursor.execute(
self.sql_command['create_job'],
(
job_uuid,
self.run_uuid,
job_output.status,
job_output.retry,
job_output.label,
job_output.id,
job_output.iteration,
job_output.spec.workload_name,
job_output.metadata))
# Update the run table and run-level parameters
self.cursor.execute(
self.sql_command['update_run'],
(
run_output.event_summary,
run_output.status,
run_output.state.timestamp,
run_output.info.end_time,
self.run_uuid))
self.sql_upload_artifacts(run_output, record_in_added=True)
self.sql_upload_metrics(run_output, record_in_added=True)
self.sql_upload_augmentations(run_output)
self.sql_upload_resource_getters(run_output)
self.sql_upload_events(job_output, job_uuid=job_uuid)
self.sql_upload_artifacts(job_output, job_uuid=job_uuid)
self.sql_upload_metrics(job_output, job_uuid=job_uuid)
self.sql_upload_job_augmentations(job_output, job_uuid=job_uuid)
self.sql_upload_parameters(
"workload",
job_output.spec.workload_parameters,
job_uuid=job_uuid)
self.sql_upload_parameters(
"runtime",
job_output.spec.runtime_parameters,
job_uuid=job_uuid)
self.conn.commit()
def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals
''' A final export of the RunOutput that updates existing parameters
and uploads ones which are only generated after jobs have run.
'''
if not self.cursor: # Database did not connect correctly.
return
# Update the job statuses following completion of the run
for job in run_output.jobs:
job_id = job.id
job_status = job.status
self.cursor.execute(
"UPDATE Jobs SET status=%s WHERE job_id=%s and run_oid=%s",
(
job_status,
job_id,
self.run_uuid))
run_uuid = self.run_uuid
# Update the run entry after jobs have completed
sql_command_update_run = self.sql_command['update_run']
self.cursor.execute(
sql_command_update_run,
(
run_output.event_summary,
run_output.status,
run_output.state.timestamp,
run_output.info.end_time,
run_uuid))
self.sql_upload_events(run_output)
self.sql_upload_artifacts(run_output, check_uniqueness=True)
self.sql_upload_metrics(run_output, check_uniqueness=True)
self.sql_upload_augmentations(run_output)
self.conn.commit()
# Upload functions for use with both jobs and runs
def sql_upload_resource_getters(self, output_object):
for resource_getter in output_object.run_config.resource_getters:
resource_getter_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_resource_getter'],
(
resource_getter_uuid,
self.run_uuid,
resource_getter))
self.sql_upload_parameters(
'resource_getter',
output_object.run_config.resource_getters[resource_getter],
owner_id=resource_getter_uuid)
def sql_upload_events(self, output_object, job_uuid=None):
for event in output_object.events:
event_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_event'],
(
event_uuid,
self.run_uuid,
job_uuid,
event.timestamp,
event.message))
def sql_upload_job_augmentations(self, output_object, job_uuid=None):
''' This is a table which links the uuids of augmentations to jobs.
Note that the augmentations table is prepopulated, leading to the necessity
of an augmentaitions_already_added dictionary, which gives us the corresponding
uuids.
Augmentations which are prefixed by ~ are toggled off and not part of the job,
therefore not added.
'''
for augmentation in output_object.spec.augmentations:
if augmentation.startswith('~'):
continue
augmentation_uuid = self.augmentations_already_added[augmentation]
job_aug_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_job_aug'],
(
job_aug_uuid,
job_uuid,
augmentation_uuid))
def sql_upload_augmentations(self, output_object):
for augmentation in output_object.augmentations:
if augmentation.startswith('~') or augmentation in self.augmentations_already_added:
continue
augmentation_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_augmentation'],
(
augmentation_uuid,
self.run_uuid,
augmentation))
self.sql_upload_parameters(
'augmentation',
output_object.run_config.augmentations[augmentation],
owner_id=augmentation_uuid)
self.augmentations_already_added[augmentation] = augmentation_uuid
def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None):
for metric in output_object.metrics:
if metric in self.metrics_already_added and check_uniqueness:
continue
metric_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_metric'],
(
metric_uuid,
self.run_uuid,
job_uuid,
metric.name,
metric.value,
metric.units,
metric.lower_is_better))
for classifier in metric.classifiers:
classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
classifier_uuid,
None,
metric_uuid,
classifier,
metric.classifiers[classifier]))
if record_in_added:
self.metrics_already_added.append(metric)
def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None):
''' Uploads artifacts to the database.
record_in_added will record the artifacts added in artifacts_aleady_added
check_uniqueness will ensure artifacts in artifacts_already_added do not get added again
'''
for artifact in output_object.artifacts:
if artifact in self.artifacts_already_added and check_uniqueness:
self.logger.debug('Skipping uploading {} as already added' .format(artifact))
continue
if artifact in self.artifacts_already_added:
self._sql_update_artifact(artifact, output_object)
else:
self._sql_create_artifact(artifact, output_object, record_in_added, job_uuid)
def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_uuid=None):
# Note, currently no augmentation parameters are workload specific, but in the future
# this may change
augmentation_id = None
resource_getter_id = None
if parameter_type not in ['workload', 'resource_getter', 'augmentation', 'runtime']:
# boot parameters are not yet implemented
# device parameters are redundant due to the targets table
raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type))
if parameter_type == "resource_getter":
resource_getter_id = owner_id
elif parameter_type == "augmentation":
augmentation_id = owner_id
for parameter in parameter_dict:
parameter_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_parameter'],
(
parameter_uuid,
self.run_uuid,
job_uuid,
augmentation_id,
resource_getter_id,
parameter,
str(parameter_dict[parameter]),
str(type(parameter_dict[parameter])),
parameter_type))
def connect_to_database(self):
dsn = "dbname={} user={} password={} host={} port={}".format(
self.dbname, self.username, self.password, self.host, self.port)
try:
self.conn = connect(dsn=dsn)
except Psycopg2Error as e:
raise OutputProcessorError(
"Database error, if the database doesn't exist, " +
"please use 'wa create database' to create the database: {}".format(e))
def execute_sql_line_by_line(self, sql):
cursor = self.conn.cursor()
for line in sql.replace('\n', "").replace(";", ";\n").split("\n"):
if line and not line.startswith('--'):
cursor.execute(line)
cursor.close()
self.conn.commit()
self.conn.reset()
def _sql_write_lobject(self, source, lobject):
with open(source) as lobj_file:
lobj_data = lobj_file.read()
if len(lobj_data) > 50000000: # Notify if LO inserts larger than 50MB
self.logger.debug("Inserting large object of size {}".format(len(lobj_data)))
lobject.write(lobj_data)
self.conn.commit()
def _sql_update_artifact(self, artifact, output_object):
self.logger.debug('Updating artifact: {}'.format(artifact))
lobj = self.conn.lobject(oid=self.artifacts_already_added[artifact], mode='w')
self._sql_write_lobject(os.path.join(output_object.basepath, artifact.path), lobj)
def _sql_create_artifact(self, artifact, output_object, record_in_added=False, job_uuid=None):
self.logger.debug('Uploading artifact: {}'.format(artifact))
artifact_uuid = uuid.uuid4()
lobj = self.conn.lobject()
loid = lobj.oid
large_object_uuid = uuid.uuid4()
self._sql_write_lobject(os.path.join(output_object.basepath, artifact.path), lobj)
self.cursor.execute(
self.sql_command['create_large_object'],
(
large_object_uuid,
loid))
self.cursor.execute(
self.sql_command['create_artifact'],
(
artifact_uuid,
self.run_uuid,
job_uuid,
artifact.name,
large_object_uuid,
artifact.description,
artifact.kind))
for classifier in artifact.classifiers:
classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
classifier_uuid,
artifact_uuid,
None,
classifier,
artifact.classifiers[classifier]))
if record_in_added:
self.artifacts_already_added[artifact] = loid