1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2024-10-06 19:01:15 +01:00
workload-automation/wa/utils/postgres.py
2019-07-19 16:36:11 +01:00

259 lines
9.0 KiB
Python

# Copyright 2018 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains additional casting and adaptation functions for several
different datatypes and metadata types for use with the psycopg2 module. The
casting functions will transform Postgresql data types into Python objects, and
the adapters the reverse. They are named this way according to the psycopg2
conventions.
For more information about the available adapters and casters in the standard
psycopg2 module, please see:
http://initd.org/psycopg/docs/extensions.html#sql-adaptation-protocol-objects
"""
import re
import os
try:
from psycopg2 import InterfaceError
from psycopg2.extensions import AsIs
except ImportError:
InterfaceError = None
AsIs = None
from wa.utils.types import level
POSTGRES_SCHEMA_DIR = os.path.join(os.path.dirname(__file__),
'..',
'commands',
'postgres_schemas')
def cast_level(value, cur): # pylint: disable=unused-argument
"""Generic Level caster for psycopg2"""
if not InterfaceError:
raise ImportError('There was a problem importing psycopg2.')
if value is None:
return None
m = re.match(r"([^\()]*)\((\d*)\)", value)
name = str(m.group(1))
number = int(m.group(2))
if m:
return level(name, number)
else:
raise InterfaceError("Bad level representation: {}".format(value))
def cast_vanilla(value, cur): # pylint: disable=unused-argument
"""Vanilla Type caster for psycopg2
Simply returns the string representation.
"""
if value is None:
return None
else:
return str(value)
# List functions and classes for adapting
def adapt_level(a_level):
"""Generic Level Adapter for psycopg2"""
return "{}({})".format(a_level.name, a_level.value)
class ListOfLevel(object):
value = None
def __init__(self, a_level):
self.value = a_level
def return_original(self):
return self.value
def adapt_ListOfX(adapt_X):
"""This will create a multi-column adapter for a particular type.
Note that the type must itself need to be in array form. Therefore
this function serves to seaprate out individual lists into multiple
big lists.
E.g. if the X adapter produces array (a,b,c)
then this adapter will take an list of Xs and produce a master array:
((a1,a2,a3),(b1,b2,b3),(c1,c2,c3))
Takes as its argument the adapter for the type which must produce an
SQL array string.
Note that you should NOT put the AsIs in the adapt_X function.
The need for this function arises from the fact that we may want to
actually handle list-creating types differently if they themselves
are in a list, as in the example above, we cannot simply adopt a
recursive strategy.
Note that master_list is the list representing the array. Each element
in the list will represent a subarray (column). If there is only one
subarray following processing then the outer {} are stripped to give a
1 dimensional array.
"""
def adapter_function(param):
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
param = param.value
result_list = []
for element in param: # Where param will be a list of X's
result_list.append(adapt_X(element))
test_element = result_list[0]
num_items = len(test_element.split(","))
master_list = []
for x in range(num_items):
master_list.append("")
for element in result_list:
element = element.strip("{").strip("}")
element = element.split(",")
for x in range(num_items):
master_list[x] = master_list[x] + element[x] + ","
if num_items > 1:
master_sql_string = "{"
else:
master_sql_string = ""
for x in range(num_items):
# Remove trailing comma
master_list[x] = master_list[x].strip(",")
master_list[x] = "{" + master_list[x] + "}"
master_sql_string = master_sql_string + master_list[x] + ","
master_sql_string = master_sql_string.strip(",")
if num_items > 1:
master_sql_string = master_sql_string + "}"
return AsIs("'{}'".format(master_sql_string))
return adapter_function
def return_as_is(adapt_X):
"""Returns the AsIs appended function of the function passed
This is useful for adapter functions intended to be used with the
adapt_ListOfX function, which must return strings, as it allows them
to be standalone adapters.
"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
def adapter_function(param):
return AsIs("'{}'".format(adapt_X(param)))
return adapter_function
def adapt_vanilla(param):
"""Vanilla adapter: simply returns the string representation"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
return AsIs("'{}'".format(param))
def create_iterable_adapter(array_columns, explicit_iterate=False):
"""Create an iterable adapter of a specified dimension
If explicit_iterate is True, then it will be assumed that the param needs
to be iterated upon via param.iteritems(). Otherwise it will simply be
iterated vanilla.
The value of array_columns will be equal to the number of indexed elements
per item in the param iterable. E.g. a list of 3-element-long lists has
3 elements per item in the iterable (the master list) and therefore
array_columns should be equal to 3.
If array_columns is 0, then this indicates that the iterable contains
single items.
"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
def adapt_iterable(param):
"""Adapts an iterable object into an SQL array"""
final_string = "" # String stores a string representation of the array
if param:
if array_columns > 1:
for index in range(array_columns):
array_string = ""
for item in param.iteritems():
array_string = array_string + str(item[index]) + ","
array_string = array_string.strip(",")
array_string = "{" + array_string + "}"
final_string = final_string + array_string + ","
final_string = final_string.strip(",")
else:
# Simply return each item in the array
if explicit_iterate:
for item in param.iteritems():
final_string = final_string + str(item) + ","
else:
for item in param:
final_string = final_string + str(item) + ","
return AsIs("'{{{}}}'".format(final_string))
return adapt_iterable
# For reference only and future use
def adapt_list(param):
"""Adapts a list into an array"""
if not AsIs:
raise ImportError('There was a problem importing psycopg2.')
final_string = ""
if param:
for item in param:
final_string = final_string + str(item) + ","
final_string = "{" + final_string + "}"
return AsIs("'{}'".format(final_string))
def get_schema(schemafilepath):
with open(schemafilepath, 'r') as sqlfile:
sql_commands = sqlfile.read()
schema_major = None
schema_minor = None
# Extract schema version if present
if sql_commands.startswith('--!VERSION'):
splitcommands = sql_commands.split('!ENDVERSION!\n')
schema_major, schema_minor = splitcommands[0].strip('--!VERSION!').split('.')
schema_major = int(schema_major)
schema_minor = int(schema_minor)
sql_commands = splitcommands[1]
return schema_major, schema_minor, sql_commands
def get_database_schema_version(conn):
with conn.cursor() as cursor:
cursor.execute('''SELECT
DatabaseMeta.schema_major,
DatabaseMeta.schema_minor
FROM
DatabaseMeta;''')
schema_major, schema_minor = cursor.fetchone()
return (schema_major, schema_minor)
def get_schema_versions(conn):
schemafilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql')
cur_major_version, cur_minor_version, _ = get_schema(schemafilepath)
db_schema_version = get_database_schema_version(conn)
return (cur_major_version, cur_minor_version), db_schema_version