1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-31 02:01:16 +00:00

Add Postgres Output Processor

The Output processor which is used to upload the results found in the
wa_output folder to a Postgres database, whose schema is defined by the
WA Create Database command.
This commit is contained in:
Waleed El-Geresy 2018-07-20 15:17:03 +01:00 committed by Marc Bonnici
parent bb255de9ad
commit 6d654157b2
2 changed files with 700 additions and 0 deletions

View File

@ -0,0 +1,481 @@
# Copyright 2018 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import uuid
import collections
import inspect
try:
import psycopg2
from psycopg2 import (connect, extras)
from psycopg2 import Error as Psycopg2Error
except ImportError as e:
psycopg2 = None
import_error_msg = e.args[0] if e.args else str(e)
from devlib.target import KernelVersion, KernelConfig
import wa
from wa.utils import postgres_convert
from wa import OutputProcessor, Parameter, OutputProcessorError
from wa.utils.types import level
from wa.framework.target.info import CpuInfo
class PostgresqlResultProcessor(OutputProcessor):
name = 'postgres'
description = """
Stores results in a Postgresql database.
The structure of this database can easily be understood by examining
the postgres_schema.sql file (the schema used to generate it):
{}
""".format(os.path.join(
os.path.dirname(inspect.getfile(wa)),
'commands',
'postgres_schema.sql'))
parameters = [
Parameter('username', default='postgres',
description="""
This is the username that will be used to connect to the
Postgresql database. Note that depending on whether the user
has privileges to modify the database (normally only possible
on localhost), the user may only be able to append entries.
"""),
Parameter('password', default=None,
description="""
The password to be used to connect to the specified database
with the specified username.
"""),
Parameter('dbname', default='wa',
description="""
Name of the database that will be created or added to. Note,
to override this, you can specify a value in your user
wa configuration file.
"""),
Parameter('host', kind=str, default='localhost',
description="""
The host where the Postgresql server is running. The default
is localhost (i.e. the machine that wa is running on).
This is useful for complex systems where multiple machines
may be executing workloads and uploading their results to
a remote, centralised database.
"""),
Parameter('port', kind=str, default='5432',
description="""
The port the Postgresql server is running on, on the host.
The default is Postgresql's default, so do not change this
unless you have modified the default port for Postgresql.
"""),
]
# Commands
sql_command = {
"create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
"update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;",
"create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);",
"create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)",
"create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)",
"create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)",
"create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)",
"create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)",
"create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
"create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)",
"create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)",
"create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"}
# Lists to track which run-related items have already been added
metrics_already_added = []
artifacts_already_added = []
# Dict needed so that jobs can look up the augmentation_uuid
augmentations_already_added = {}
# Status bits (flags)
first_job_run = True
def __init__(self, *args, **kwargs):
super(PostgresqlResultProcessor, self).__init__(*args, **kwargs)
self.conn = None
self.cursor = None
self.current_lobj = None
self.current_loid = None
self.run_uuid = None
self.current_job_uuid = None
self.target_uuid = None
self.current_artifact_uuid = None
self.current_large_object_uuid = None
self.current_classifier_uuid = None
self.current_metric_uuid = None
self.current_augmentation_uuid = None
self.current_resource_getter_uuid = None
self.current_event_uuid = None
self.current_job_aug_uuid = None
self.current_parameter_uuid = None
def initialize(self, context):
if not psycopg2:
raise ImportError(
'The psycopg2 module is required for the ' +
'Postgresql Output Processor: {}'.format(import_error_msg))
# N.B. Typecasters are for postgres->python and adapters the opposite
self.connect_to_database()
self.cursor = self.conn.cursor()
# Register the adapters and typecasters for enum types
self.cursor.execute("SELECT NULL::status_enum")
status_oid = self.cursor.description[0][1]
self.cursor.execute("SELECT NULL::param_enum")
param_oid = self.cursor.description[0][1]
LEVEL = psycopg2.extensions.new_type(
(status_oid,), "LEVEL", postgres_convert.cast_level)
psycopg2.extensions.register_type(LEVEL)
PARAM = psycopg2.extensions.new_type(
(param_oid,), "PARAM", postgres_convert.cast_vanilla)
psycopg2.extensions.register_type(PARAM)
psycopg2.extensions.register_adapter(level, postgres_convert.return_as_is(postgres_convert.adapt_level))
psycopg2.extensions.register_adapter(
postgres_convert.ListOfLevel, postgres_convert.adapt_ListOfX(postgres_convert.adapt_level))
psycopg2.extensions.register_adapter(KernelVersion, postgres_convert.adapt_vanilla)
psycopg2.extensions.register_adapter(
CpuInfo, postgres_convert.adapt_vanilla)
psycopg2.extensions.register_adapter(
collections.OrderedDict, extras.Json)
psycopg2.extensions.register_adapter(dict, extras.Json)
psycopg2.extensions.register_adapter(
KernelConfig, postgres_convert.create_iterable_adapter(2, explicit_iterate=True))
# Register ready-made UUID type adapter
extras.register_uuid()
# Insert a run_uuid which will be globally accessible during the run
self.run_uuid = uuid.UUID(str(uuid.uuid4()))
run_output = context.run_output
retry_on_status = postgres_convert.ListOfLevel(run_output.run_config.retry_on_status)
self.cursor.execute(
self.sql_command['create_run'],
(
self.run_uuid,
run_output.event_summary,
run_output.basepath,
run_output.status,
run_output.state.timestamp,
run_output.info.run_name,
run_output.info.project,
retry_on_status,
run_output.run_config.max_retries,
run_output.run_config.bail_on_init_failure,
run_output.run_config.allow_phone_home,
run_output.info.uuid,
run_output.info.start_time,
run_output.metadata))
self.target_uuid = uuid.uuid4()
target_info = context.target_info
self.cursor.execute(
self.sql_command['create_target'],
(
self.target_uuid,
self.run_uuid,
target_info.target,
target_info.cpus,
target_info.os,
target_info.os_version,
target_info.hostid,
target_info.hostname,
target_info.abi,
target_info.is_rooted,
# Important caveat: kernel_version is the name of the column in the Targets table
# However, this refers to kernel_version.version, not to kernel_version as a whole
target_info.kernel_version.version,
target_info.kernel_version.release,
target_info.kernel_version.sha1,
target_info.kernel_config,
target_info.sched_features))
# Commit cursor commands
self.conn.commit()
def export_job_output(self, job_output, target_info, run_output): # pylint: disable=too-many-branches, too-many-statements, too-many-locals, unused-argument
''' Run once for each job to upload information that is
updated on a job by job basis.
'''
self.current_job_uuid = uuid.uuid4()
# Create a new job
self.cursor.execute(
self.sql_command['create_job'],
(
self.current_job_uuid,
self.run_uuid,
job_output.status,
job_output.retry,
job_output.label,
job_output.id,
job_output.iteration,
job_output.spec.workload_name,
job_output.metadata))
# Update the run table and run-level parameters
self.cursor.execute(
self.sql_command['update_run'],
(
run_output.event_summary,
run_output.status,
run_output.state.timestamp,
run_output.info.end_time,
self.run_uuid))
self.sql_upload_artifacts(run_output, record_in_added=True)
self.sql_upload_metrics(run_output, record_in_added=True)
self.sql_upload_augmentations(run_output)
self.sql_upload_resource_getters(run_output)
self.sql_upload_events(job_output)
self.sql_upload_artifacts(job_output)
self.sql_upload_metrics(job_output)
self.sql_upload_job_augmentations(job_output)
self.sql_upload_parameters(
"workload",
job_output.spec.workload_parameters,
job_specific=True)
self.sql_upload_parameters(
"runtime",
job_output.spec.runtime_parameters,
job_specific=True)
self.conn.commit()
def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals
''' A final export of the RunOutput that updates existing parameters
and uploads ones which are only generated after jobs have run.
'''
self.current_job_uuid = None
# Update the job statuses following completion of the run
for job in run_output.jobs:
job_id = job.id
job_status = job.status
self.cursor.execute(
"UPDATE Jobs SET status=%s WHERE job_id=%s and run_oid=%s",
(
job_status,
job_id,
self.run_uuid))
run_uuid = self.run_uuid
# Update the run entry after jobs have completed
sql_command_update_run = self.sql_command['update_run']
self.cursor.execute(
sql_command_update_run,
(
run_output.event_summary,
run_output.status,
run_output.state.timestamp,
run_output.info.end_time,
run_uuid))
self.sql_upload_events(run_output)
self.sql_upload_artifacts(run_output, check_uniqueness=True)
self.sql_upload_metrics(run_output, check_uniqueness=True)
self.sql_upload_augmentations(run_output)
self.conn.commit()
# Upload functions for use with both jobs and runs
def sql_upload_resource_getters(self, output_object):
for resource_getter in output_object.run_config.resource_getters:
self.current_resource_getter_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_resource_getter'],
(
self.current_resource_getter_uuid,
self.run_uuid,
resource_getter))
self.sql_upload_parameters(
'resource_getter',
output_object.run_config.resource_getters[resource_getter],
owner_id=self.current_resource_getter_uuid,
job_specific=False)
def sql_upload_events(self, output_object):
for event in output_object.events:
self.current_event_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_event'],
(
self.current_event_uuid,
self.run_uuid,
self.current_job_uuid,
event.timestamp,
event.message))
def sql_upload_job_augmentations(self, output_object):
''' This is a table which links the uuids of augmentations to jobs.
Note that the augmentations table is prepopulated, leading to the necessity
of an augmentaitions_already_added dictionary, which gives us the corresponding
uuids.
Augmentations which are prefixed by ~ are toggled off and not part of the job,
therefore not added.
'''
for augmentation in output_object.spec.augmentations:
if augmentation.startswith('~'):
continue
self.current_augmentation_uuid = self.augmentations_already_added[augmentation]
self.current_job_aug_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_job_aug'],
(
self.current_job_aug_uuid,
self.current_job_uuid,
self.current_augmentation_uuid))
def sql_upload_augmentations(self, output_object):
for augmentation in output_object.augmentations:
if augmentation.startswith('~') or augmentation in self.augmentations_already_added:
continue
self.current_augmentation_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_augmentation'],
(
self.current_augmentation_uuid,
self.run_uuid,
augmentation))
self.sql_upload_parameters(
'augmentation',
output_object.run_config.augmentations[augmentation],
owner_id=self.current_augmentation_uuid,
job_specific=False)
self.augmentations_already_added[augmentation] = self.current_augmentation_uuid
def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False):
for metric in output_object.metrics:
if metric in self.metrics_already_added and check_uniqueness:
continue
self.current_metric_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_metric'],
(
self.current_metric_uuid,
self.run_uuid,
self.current_job_uuid,
metric.name,
metric.value,
metric.units,
metric.lower_is_better))
for classifier in metric.classifiers:
self.current_classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
self.current_classifier_uuid,
None,
self.current_metric_uuid,
classifier,
metric.classifiers[classifier]))
if record_in_added:
self.metrics_already_added.append(metric)
def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False):
''' Uploads artifacts to the database.
record_in_added will record the artifacts added in artifacts_aleady_added
check_uniqueness will ensure artifacts in artifacts_already_added do not get added again
'''
for artifact in output_object.artifacts:
if artifact in self.artifacts_already_added and check_uniqueness:
continue
self.current_artifact_uuid = uuid.uuid4()
self.current_lobj = self.conn.lobject()
self.current_loid = self.current_lobj.oid
self.current_large_object_uuid = uuid.uuid4()
with open(os.path.join(output_object.basepath, artifact.path)) as lobj_file:
lobj_data = lobj_file.read()
lo_len = self.current_lobj.write(lobj_data)
if lo_len > 50000000: # Notify if LO inserts larger than 50MB
self.logger.debug(
"Inserting large object of size {}".format(lo_len))
self.cursor.execute(
self.sql_command['create_large_object'],
(
self.current_large_object_uuid,
self.current_loid))
self.cursor.execute(
self.sql_command['create_artifact'],
(
self.current_artifact_uuid,
self.run_uuid,
self.current_job_uuid,
artifact.name,
self.current_large_object_uuid,
artifact.description,
artifact.kind))
for classifier in artifact.classifiers:
self.current_classifier_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_classifier'],
(
self.current_classifier_uuid,
self.current_artifact_uuid,
None,
classifier,
artifact.classifiers[classifier]))
if record_in_added:
self.artifacts_already_added.append(artifact)
def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_specific=False):
# Note, currently no augmentation parameters are workload specific, but in the future
# this may change
run_uuid = self.run_uuid
job_uuid = None # Default initial value
augmentation_id = None
resource_getter_id = None
if parameter_type == "workload":
job_uuid = self.current_job_uuid
elif parameter_type == "resource_getter":
job_uuid = None
resource_getter_id = owner_id
elif parameter_type == "augmentation":
if job_specific:
job_uuid = self.current_job_uuid
augmentation_id = owner_id
elif parameter_type == "runtime":
job_uuid = self.current_job_uuid
else:
# boot parameters are not yet implemented
# device parameters are redundant due to the targets table
raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type))
for parameter in parameter_dict:
self.current_parameter_uuid = uuid.uuid4()
self.cursor.execute(
self.sql_command['create_parameter'],
(
self.current_parameter_uuid,
run_uuid,
job_uuid,
augmentation_id,
resource_getter_id,
parameter,
str(parameter_dict[parameter]),
str(type(parameter_dict[parameter])),
parameter_type))
def connect_to_database(self):
dsn = "dbname={} user={} password={} host={} port={}".format(
self.dbname, self.username, self.password, self.host, self.port)
try:
self.conn = connect(dsn=dsn)
except Psycopg2Error as e:
raise OutputProcessorError(
"Database error, if the database doesn't exist, " +
"please use 'wa create database' to create the database: {}".format(e))
def execute_sql_line_by_line(self, sql):
cursor = self.conn.cursor()
for line in sql.replace('\n', "").replace(";", ";\n").split("\n"):
if line and not line.startswith('--'):
cursor.execute(line)
cursor.close()
self.conn.commit()
self.conn.reset()

View File

@ -0,0 +1,219 @@
# Copyright 2018 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains additional casting and adaptation functions for several
different datatypes and metadata types for use with the psycopg2 module. The
casting functions will transform Postgresql data types into Python objects, and
the adapters the reverse. They are named this way according to the psycopg2
conventions.
For more information about the available adapters and casters in the standard
psycopg2 module, please see:
http://initd.org/psycopg/docs/extensions.html#sql-adaptation-protocol-objects
"""
import re
try:
from psycopg2 import InterfaceError
from psycopg2.extensions import AsIs
except ImportError as e:
InterfaceError = None
AsIs = None
from wa.utils.types import level
def cast_level(value, cur): # pylint: disable=unused-argument
"""Generic Level caster for psycopg2"""
if not InterfaceError:
raise ImportError('There was a problem importing psycopg2.')
if value is None:
return None
m = re.match(r"([^\()]*)\((\d*)\)", value)
name = str(m.group(1))
number = int(m.group(2))
if m:
return level(name, number)
else:
raise InterfaceError("Bad level representation: {}".format(value))
def cast_vanilla(value, cur): # pylint: disable=unused-argument
"""Vanilla Type caster for psycopg2
Simply returns the string representation.
"""
if value is None:
return None
else:
return str(value)
# List functions and classes for adapting
def adapt_level(a_level):
"""Generic Level Adapter for psycopg2"""
return "{}({})".format(a_level.name, a_level.value)
class ListOfLevel(object):
value = None
def __init__(self, a_level):
self.value = a_level
def return_original(self):
return self.value
def adapt_ListOfX(adapt_X):
"""This will create a multi-column adapter for a particular type.
Note that the type must itself need to be in array form. Therefore
this function serves to seaprate out individual lists into multiple
big lists.
E.g. if the X adapter produces array (a,b,c)
then this adapter will take an list of Xs and produce a master array:
((a1,a2,a3),(b1,b2,b3),(c1,c2,c3))
Takes as its argument the adapter for the type which must produce an
SQL array string.
Note that you should NOT put the AsIs in the adapt_X function.
The need for this function arises from the fact that we may want to
actually handle list-creating types differently if they themselves
are in a list, as in the example above, we cannot simply adopt a
recursive strategy.
Note that master_list is the list representing the array. Each element
in the list will represent a subarray (column). If there is only one
subarray following processing then the outer {} are stripped to give a
1 dimensional array.
"""
def adapter_function(param):
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
param = param.value
result_list = []
for element in param: # Where param will be a list of X's
result_list.append(adapt_X(element))
test_element = result_list[0]
num_items = len(test_element.split(","))
master_list = []
for x in range(num_items):
master_list.append("")
for element in result_list:
element = element.strip("{").strip("}")
element = element.split(",")
for x in range(num_items):
master_list[x] = master_list[x] + element[x] + ","
if num_items > 1:
master_sql_string = "{"
else:
master_sql_string = ""
for x in range(num_items):
# Remove trailing comma
master_list[x] = master_list[x].strip(",")
master_list[x] = "{" + master_list[x] + "}"
master_sql_string = master_sql_string + master_list[x] + ","
master_sql_string = master_sql_string.strip(",")
if num_items > 1:
master_sql_string = master_sql_string + "}"
return AsIs("'{}'".format(master_sql_string))
return adapter_function
def return_as_is(adapt_X):
"""Returns the AsIs appended function of the function passed
This is useful for adapter functions intended to be used with the
adapt_ListOfX function, which must return strings, as it allows them
to be standalone adapters.
"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
def adapter_function(param):
return AsIs("'{}'".format(adapt_X(param)))
return adapter_function
def adapt_vanilla(param):
"""Vanilla adapter: simply returns the string representation"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
return AsIs("'{}'".format(param))
def create_iterable_adapter(array_columns, explicit_iterate=False):
"""Create an iterable adapter of a specified dimension
If explicit_iterate is True, then it will be assumed that the param needs
to be iterated upon via param.iteritems(). Otherwise it will simply be
iterated vanilla.
The value of array_columns will be equal to the number of indexed elements
per item in the param iterable. E.g. a list of 3-element-long lists has
3 elements per item in the iterable (the master list) and therefore
array_columns should be equal to 3.
If array_columns is 0, then this indicates that the iterable contains
single items.
"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
def adapt_iterable(param):
"""Adapts an iterable object into an SQL array"""
final_string = "" # String stores a string representation of the array
if param:
if array_columns > 1:
for index in range(array_columns):
array_string = ""
for item in param.iteritems():
array_string = array_string + str(item[index]) + ","
array_string = array_string.strip(",")
array_string = "{" + array_string + "}"
final_string = final_string + array_string + ","
final_string = final_string.strip(",")
final_string = "{" + final_string + "}"
else:
# Simply return each item in the array
if explicit_iterate:
for item in param.iteritems():
final_string = final_string + str(item) + ","
else:
for item in param:
final_string = final_string + str(item) + ","
final_string = "{" + final_string + "}"
return AsIs("'{}'".format(final_string))
return adapt_iterable
# For reference only and future use
def adapt_list(param):
"""Adapts a list into an array"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
final_string = ""
if param:
for item in param:
final_string = final_string + str(item) + ","
final_string = "{" + final_string + "}"
return AsIs("'{}'".format(final_string))