1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-02-20 20:09:11 +00:00

fw/output: Implement the Output API for using a database backend

Allow for the creating of a RunDatabaseOutput to allow for utilizing WA
output API from run data stored in a postgres database.
This commit is contained in:
Marc Bonnici 2018-10-31 14:40:09 +00:00 committed by setrofim
parent 423882a8e6
commit 0fee3debea

View File

@ -13,23 +13,32 @@
# limitations under the License.
#
try:
import psycopg2
from psycopg2 import Error as Psycopg2Error
except ImportError:
psycopg2 = None
Psycopg2Error = None
import logging
import os
import shutil
from collections import OrderedDict
from collections import OrderedDict, defaultdict
from copy import copy, deepcopy
from datetime import datetime
from io import StringIO
import devlib
from wa.framework.configuration.core import JobSpec, Status
from wa.framework.configuration.execution import CombinedConfig
from wa.framework.exception import HostError
from wa.framework.exception import HostError, SerializerSyntaxError, ConfigError
from wa.framework.run import RunState, RunInfo
from wa.framework.target.info import TargetInfo
from wa.framework.version import get_wa_version_with_commit
from wa.utils.doc import format_simple_table
from wa.utils.misc import touch, ensure_directory_exists, isiterable
from wa.utils.serializer import write_pod, read_pod, Podable
from wa.utils.serializer import write_pod, read_pod, Podable, json
from wa.utils.types import enum, numeric
@ -165,6 +174,7 @@ class Output(object):
def __str__(self):
return os.path.basename(self.basepath)
class RunOutputCommon(object):
''' Split out common functionality to form a second base of
the RunOutput classes
@ -320,7 +330,6 @@ class RunOutput(Output, RunOutputCommon):
job_output.basepath = failed_path
class JobOutput(Output):
kind = 'job'
@ -736,3 +745,459 @@ def _save_raw_config(meta_dir, state):
basename = os.path.basename(source)
dest_path = os.path.join(raw_config_dir, 'cfg{}-{}'.format(i, basename))
shutil.copy(source, dest_path)
class DatabaseOutput(Output):
kind = None
@property
def resultfile(self):
if self.conn is None or self.oid is None:
return {}
pod = self._get_pod_version()
pod['metrics'] = self._get_metrics()
pod['status'] = self._get_status()
pod['classifiers'] = self._get_classifiers(self.oid, 'run')
pod['events'] = self._get_events()
pod['artifacts'] = self._get_artifacts()
return pod
@staticmethod
def _build_command(columns, tables, conditions=None, joins=None):
cmd = '''SELECT\n\t{}\nFROM\n\t{}'''.format(',\n\t'.join(columns), ',\n\t'.join(tables))
if joins:
for join in joins:
cmd += '''\nLEFT JOIN {} ON {}'''.format(join[0], join[1])
if conditions:
cmd += '''\nWHERE\n\t{}'''.format('\nAND\n\t'.join(conditions))
return cmd + ';'
def __init__(self, conn, oid=None, reload=True): # pylint: disable=super-init-not-called
self.conn = conn
self.oid = oid
self.result = None
if reload:
self.reload()
def __repr__(self):
return '<{} {}>'.format(self.__class__.__name__, self.oid)
def __str__(self):
return self.oid
def reload(self):
try:
self.result = Result.from_pod(self.resultfile)
except Exception as e: # pylint: disable=broad-except
self.result = Result()
self.result.status = Status.UNKNOWN
self.add_event(str(e))
def get_artifact_path(self, name):
artifact = self.get_artifact(name)
artifact = StringIO(self.conn.lobject(int(artifact.path)).read())
self.conn.commit()
return artifact
# pylint: disable=too-many-locals
def _read_db(self, columns, tables, conditions=None, join=None, as_dict=True):
# Automatically remove table name from column when using column names as keys or
# allow for column names to be aliases when retrieving the data,
# (db_column_name, alias)
db_columns = []
aliases_colunms = []
for column in columns:
if isinstance(column, tuple):
db_columns.append(column[0])
aliases_colunms.append(column[1])
else:
db_columns.append(column)
aliases_colunms.append(column.rsplit('.', 1)[-1])
cmd = self._build_command(db_columns, tables, conditions, join)
logger.debug(cmd)
with self.conn.cursor() as cursor:
cursor.execute(cmd)
results = cursor.fetchall()
self.conn.commit()
if not as_dict:
return results
# Format the output dict using column names as keys
output = []
for result in results:
entry = {}
for k, v in zip(aliases_colunms, result):
entry[k] = v
output.append(entry)
return output
def _get_pod_version(self):
columns = ['_pod_version', '_pod_serialization_version']
tables = ['{}s'.format(self.kind)]
conditions = ['{}s.oid = \'{}\''.format(self.kind, self.oid)]
results = self._read_db(columns, tables, conditions)
if results:
return results[0]
else:
return None
def _populate_classifers(self, pod, kind):
for entry in pod:
oid = entry.pop('oid')
entry['classifiers'] = self._get_classifiers(oid, kind)
return pod
def _get_classifiers(self, oid, kind):
columns = ['classifiers.key', 'classifiers.value']
tables = ['classifiers']
conditions = ['{}_oid = \'{}\''.format(kind, oid)]
results = self._read_db(columns, tables, conditions, as_dict=False)
classifiers = {}
for (k, v) in results:
classifiers[k] = v
return classifiers
def _get_metrics(self):
columns = ['metrics.name', 'metrics.value', 'metrics.units',
'metrics.lower_is_better',
'metrics.oid', 'metrics._pod_version',
'metrics._pod_serialization_version']
tables = ['metrics']
joins = [('classifiers', 'classifiers.metric_oid = metrics.oid')]
conditions = ['metrics.{}_oid = \'{}\''.format(self.kind, self.oid)]
pod = self._read_db(columns, tables, conditions, joins)
return self._populate_classifers(pod, 'metric')
def _get_status(self):
columns = ['{}s.status'.format(self.kind)]
tables = ['{}s'.format(self.kind)]
conditions = ['{}s.oid = \'{}\''.format(self.kind, self.oid)]
results = self._read_db(columns, tables, conditions, as_dict=False)
if results:
return results[0][0]
else:
return None
def _get_artifacts(self):
columns = ['artifacts.name', 'artifacts.description', 'artifacts.kind',
('largeobjects.lo_oid', 'path'), 'artifacts.oid',
'artifacts._pod_version', 'artifacts._pod_serialization_version']
tables = ['largeobjects', 'artifacts']
joins = [('classifiers', 'classifiers.artifact_oid = artifacts.oid')]
conditions = ['artifacts.{}_oid = \'{}\''.format(self.kind, self.oid),
'artifacts.large_object_uuid = largeobjects.oid',
'artifacts.job_oid IS NULL']
pod = self._read_db(columns, tables, conditions, joins)
for artifact in pod:
artifact['path'] = str(artifact['path'])
return self._populate_classifers(pod, 'metric')
def _get_events(self):
columns = ['events.message', 'events.timestamp']
tables = ['events']
conditions = ['events.{}_oid = \'{}\''.format(self.kind, self.oid)]
return self._read_db(columns, tables, conditions)
def kernel_config_from_db(raw):
kernel_config = {}
for k, v in zip(raw[0], raw[1]):
kernel_config[k] = v
return kernel_config
class RunDatabaseOutput(DatabaseOutput, RunOutputCommon):
kind = 'run'
@property
def basepath(self):
return 'db:({})-{}@{}:{}'.format(self.dbname, self.user,
self.host, self.port)
@property
def augmentations(self):
columns = ['augmentations.name']
tables = ['augmentations']
conditions = ['augmentations.run_oid = \'{}\''.format(self.oid)]
results = self._read_db(columns, tables, conditions, as_dict=False)
return [a for augs in results for a in augs]
@property
def _db_infofile(self):
columns = ['start_time', 'project', ('run_uuid', 'uuid'), 'end_time',
'run_name', 'duration', '_pod_version', '_pod_serialization_version']
tables = ['runs']
conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)]
pod = self._read_db(columns, tables, conditions)
if not pod:
return {}
return pod[0]
@property
def _db_targetfile(self):
columns = ['os', 'is_rooted', 'target', 'abi', 'cpus', 'os_version',
'hostid', 'hostname', 'kernel_version', 'kernel_release',
'kernel_sha1', 'kernel_config', 'sched_features',
'_pod_version', '_pod_serialization_version']
tables = ['targets']
conditions = ['targets.run_oid = \'{}\''.format(self.oid)]
pod = self._read_db(columns, tables, conditions)
if not pod:
return {}
pod = pod[0]
try:
pod['cpus'] = [json.loads(cpu) for cpu in pod.pop('cpus')]
except SerializerSyntaxError:
pod['cpus'] = []
logger.debug('Failed to deserialize target cpu information')
pod['kernel_config'] = kernel_config_from_db(pod['kernel_config'])
return pod
@property
def _db_statefile(self):
# Read overall run information
columns = ['runs.state']
tables = ['runs']
conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)]
pod = self._read_db(columns, tables, conditions)
pod = pod[0].get('state')
if not pod:
return {}
# Read job information
columns = ['jobs.job_id', 'jobs.oid']
tables = ['jobs']
conditions = ['jobs.run_oid = \'{}\''.format(self.oid)]
job_oids = self._read_db(columns, tables, conditions)
# Match job oid with jobs from state file
for job in pod.get('jobs', []):
for job_oid in job_oids:
if job['id'] == job_oid['job_id']:
job['oid'] = job_oid['oid']
break
return pod
@property
def _db_jobsfile(self):
workload_params = self._get_parameters('workload')
runtime_params = self._get_parameters('runtime')
columns = [('jobs.job_id', 'id'), 'jobs.label', 'jobs.workload_name',
'jobs.oid', 'jobs._pod_version', 'jobs._pod_serialization_version']
tables = ['jobs']
conditions = ['jobs.run_oid = \'{}\''.format(self.oid)]
jobs = self._read_db(columns, tables, conditions)
for job in jobs:
job['workload_parameters'] = workload_params.pop(job['oid'], {})
job['runtime_parameters'] = runtime_params.pop(job['oid'], {})
job.pop('oid')
return jobs
@property
def _db_run_config(self):
pod = defaultdict(dict)
parameter_types = ['augmentation', 'resource_getter']
for parameter_type in parameter_types:
columns = ['parameters.name', 'parameters.value',
'parameters.value_type',
('{}s.name'.format(parameter_type), '{}'.format(parameter_type))]
tables = ['parameters', '{}s'.format(parameter_type)]
conditions = ['parameters.run_oid = \'{}\''.format(self.oid),
'parameters.type = \'{}\''.format(parameter_type),
'parameters.{0}_oid = {0}s.oid'.format(parameter_type)]
configs = self._read_db(columns, tables, conditions)
for config in configs:
entry = {config['name']: json.loads(config['value'])}
pod['{}s'.format(parameter_type)][config.pop(parameter_type)] = entry
# run config
columns = ['runs.max_retries', 'runs.allow_phone_home',
'runs.bail_on_init_failure', 'runs.retry_on_status']
tables = ['runs']
conditions = ['runs.oid = \'{}\''.format(self.oid)]
config = self._read_db(columns, tables, conditions)
if not config:
return {}
config = config[0]
# Convert back into a string representation of an enum list
config['retry_on_status'] = config['retry_on_status'][1:-1].split(',')
pod.update(config)
return pod
def __init__(self,
password=None,
dbname='wa',
host='localhost',
port='5432',
user='postgres',
run_uuid=None,
list_runs=False):
if psycopg2 is None:
msg = 'Please install the psycopg2 in order to connect to postgres databases'
raise HostError(msg)
self.dbname = dbname
self.host = host
self.port = port
self.user = user
self.password = password
self.run_uuid = run_uuid
self.conn = None
self.info = None
self.state = None
self.result = None
self.target_info = None
self._combined_config = None
self.jobs = []
self.job_specs = []
self.connect()
super(RunDatabaseOutput, self).__init__(conn=self.conn, reload=False)
if list_runs:
print('Available runs are:')
self._list_runs()
self.disconnect()
return
if not self.run_uuid:
print('Please specify "Run uuid"')
self._list_runs()
self.disconnect()
return
if not self.oid:
self.oid = self._get_oid()
self.reload()
def read_job_specs(self):
job_specs = []
for job in self._db_jobsfile:
job_specs.append(JobSpec.from_pod(job))
return job_specs
def connect(self):
if self.conn and not self.conn.closed:
return
try:
self.conn = psycopg2.connect(dbname=self.dbname,
user=self.user,
host=self.host,
password=self.password,
port=self.port)
except Psycopg2Error as e:
raise HostError('Unable to connect to the Database: "{}'.format(e.args[0]))
def disconnect(self):
self.conn.commit()
self.conn.close()
def reload(self):
super(RunDatabaseOutput, self).reload()
info_pod = self._db_infofile
state_pod = self._db_statefile
if not info_pod or not state_pod:
msg = '"{}" does not appear to be a valid WA Database Output.'
raise ValueError(msg.format(self.oid))
self.info = RunInfo.from_pod(info_pod)
self.state = RunState.from_pod(state_pod)
self._combined_config = CombinedConfig.from_pod({'run_config': self._db_run_config})
self.target_info = TargetInfo.from_pod(self._db_targetfile)
self.job_specs = self.read_job_specs()
for job_state in self._db_statefile['jobs']:
job = JobDatabaseOutput(self.conn, job_state.get('oid'), job_state['id'],
job_state['label'], job_state['iteration'],
job_state['retries'])
job.status = job_state['status']
job.spec = self.get_job_spec(job.id)
if job.spec is None:
logger.warning('Could not find spec for job {}'.format(job.id))
self.jobs.append(job)
def _get_oid(self):
columns = ['{}s.oid'.format(self.kind)]
tables = ['{}s'.format(self.kind)]
conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)]
oid = self._read_db(columns, tables, conditions, as_dict=False)
if not oid:
raise ConfigError('No matching run entries found for run_uuid {}'.format(self.run_uuid))
if len(oid) > 1:
raise ConfigError('Multiple entries found for run_uuid: {}'.format(self.run_uuid))
return oid[0][0]
def _get_parameters(self, param_type):
columns = ['parameters.job_oid', 'parameters.name', 'parameters.value']
tables = ['parameters']
conditions = ['parameters.type = \'{}\''.format(param_type),
'parameters.run_oid = \'{}\''.format(self.oid)]
params = self._read_db(columns, tables, conditions, as_dict=False)
parm_dict = defaultdict(dict)
for (job_oid, k, v) in params:
try:
parm_dict[job_oid][k] = json.loads(v)
except SerializerSyntaxError:
logger.debug('Failed to deserialize job_oid:{}-"{}":"{}"'.format(job_oid, k, v))
return parm_dict
def _list_runs(self):
columns = ['runs.run_uuid', 'runs.run_name', 'runs.project',
'runs.project_stage', 'runs.status', 'runs.start_time', 'runs.end_time']
tables = ['runs']
pod = self._read_db(columns, tables)
if pod:
headers = ['Run Name', 'Project', 'Project Stage', 'Start Time', 'End Time',
'run_uuid']
run_list = []
for entry in pod:
# Format times to display better
start_time = entry['start_time']
end_time = entry['end_time']
if start_time:
start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
if end_time:
end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
run_list.append([
entry['run_name'],
entry['project'],
entry['project_stage'],
start_time,
end_time,
entry['run_uuid']])
print(format_simple_table(run_list, headers))
else:
print('No Runs Found')
class JobDatabaseOutput(DatabaseOutput):
kind = 'job'
def __init__(self, conn, oid, job_id, label, iteration, retry):
super(JobDatabaseOutput, self).__init__(conn, oid=oid)
self.id = job_id
self.label = label
self.iteration = iteration
self.retry = retry
self.result = None
self.spec = None
self.reload()
def __repr__(self):
return '<{} {}-{}-{}>'.format(self.__class__.__name__,
self.id, self.label, self.iteration)
def __str__(self):
return '{}-{}-{}'.format(self.id, self.label, self.iteration)