From 0fee3debea0adfe03de7cef7b476f16dcb391009 Mon Sep 17 00:00:00 2001 From: Marc Bonnici Date: Wed, 31 Oct 2018 14:40:09 +0000 Subject: [PATCH] fw/output: Implement the Output API for using a database backend Allow for the creating of a RunDatabaseOutput to allow for utilizing WA output API from run data stored in a postgres database. --- wa/framework/output.py | 473 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 469 insertions(+), 4 deletions(-) diff --git a/wa/framework/output.py b/wa/framework/output.py index aa2d66c2..a613ef98 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -13,23 +13,32 @@ # limitations under the License. # +try: + import psycopg2 + from psycopg2 import Error as Psycopg2Error +except ImportError: + psycopg2 = None + Psycopg2Error = None + import logging import os import shutil -from collections import OrderedDict +from collections import OrderedDict, defaultdict from copy import copy, deepcopy from datetime import datetime +from io import StringIO import devlib from wa.framework.configuration.core import JobSpec, Status from wa.framework.configuration.execution import CombinedConfig -from wa.framework.exception import HostError +from wa.framework.exception import HostError, SerializerSyntaxError, ConfigError from wa.framework.run import RunState, RunInfo from wa.framework.target.info import TargetInfo from wa.framework.version import get_wa_version_with_commit +from wa.utils.doc import format_simple_table from wa.utils.misc import touch, ensure_directory_exists, isiterable -from wa.utils.serializer import write_pod, read_pod, Podable +from wa.utils.serializer import write_pod, read_pod, Podable, json from wa.utils.types import enum, numeric @@ -165,6 +174,7 @@ class Output(object): def __str__(self): return os.path.basename(self.basepath) + class RunOutputCommon(object): ''' Split out common functionality to form a second base of the RunOutput classes @@ -320,7 +330,6 @@ class RunOutput(Output, RunOutputCommon): job_output.basepath = failed_path - class JobOutput(Output): kind = 'job' @@ -736,3 +745,459 @@ def _save_raw_config(meta_dir, state): basename = os.path.basename(source) dest_path = os.path.join(raw_config_dir, 'cfg{}-{}'.format(i, basename)) shutil.copy(source, dest_path) + + +class DatabaseOutput(Output): + + kind = None + + @property + def resultfile(self): + if self.conn is None or self.oid is None: + return {} + pod = self._get_pod_version() + pod['metrics'] = self._get_metrics() + pod['status'] = self._get_status() + pod['classifiers'] = self._get_classifiers(self.oid, 'run') + pod['events'] = self._get_events() + pod['artifacts'] = self._get_artifacts() + return pod + + @staticmethod + def _build_command(columns, tables, conditions=None, joins=None): + cmd = '''SELECT\n\t{}\nFROM\n\t{}'''.format(',\n\t'.join(columns), ',\n\t'.join(tables)) + if joins: + for join in joins: + cmd += '''\nLEFT JOIN {} ON {}'''.format(join[0], join[1]) + if conditions: + cmd += '''\nWHERE\n\t{}'''.format('\nAND\n\t'.join(conditions)) + return cmd + ';' + + def __init__(self, conn, oid=None, reload=True): # pylint: disable=super-init-not-called + self.conn = conn + self.oid = oid + self.result = None + if reload: + self.reload() + + def __repr__(self): + return '<{} {}>'.format(self.__class__.__name__, self.oid) + + def __str__(self): + return self.oid + + def reload(self): + try: + self.result = Result.from_pod(self.resultfile) + except Exception as e: # pylint: disable=broad-except + self.result = Result() + self.result.status = Status.UNKNOWN + self.add_event(str(e)) + + def get_artifact_path(self, name): + artifact = self.get_artifact(name) + artifact = StringIO(self.conn.lobject(int(artifact.path)).read()) + self.conn.commit() + return artifact + + # pylint: disable=too-many-locals + def _read_db(self, columns, tables, conditions=None, join=None, as_dict=True): + # Automatically remove table name from column when using column names as keys or + # allow for column names to be aliases when retrieving the data, + # (db_column_name, alias) + db_columns = [] + aliases_colunms = [] + for column in columns: + if isinstance(column, tuple): + db_columns.append(column[0]) + aliases_colunms.append(column[1]) + else: + db_columns.append(column) + aliases_colunms.append(column.rsplit('.', 1)[-1]) + + cmd = self._build_command(db_columns, tables, conditions, join) + + logger.debug(cmd) + with self.conn.cursor() as cursor: + cursor.execute(cmd) + results = cursor.fetchall() + self.conn.commit() + + if not as_dict: + return results + + # Format the output dict using column names as keys + output = [] + for result in results: + entry = {} + for k, v in zip(aliases_colunms, result): + entry[k] = v + output.append(entry) + return output + + def _get_pod_version(self): + columns = ['_pod_version', '_pod_serialization_version'] + tables = ['{}s'.format(self.kind)] + conditions = ['{}s.oid = \'{}\''.format(self.kind, self.oid)] + results = self._read_db(columns, tables, conditions) + if results: + return results[0] + else: + return None + + def _populate_classifers(self, pod, kind): + for entry in pod: + oid = entry.pop('oid') + entry['classifiers'] = self._get_classifiers(oid, kind) + return pod + + def _get_classifiers(self, oid, kind): + columns = ['classifiers.key', 'classifiers.value'] + tables = ['classifiers'] + conditions = ['{}_oid = \'{}\''.format(kind, oid)] + results = self._read_db(columns, tables, conditions, as_dict=False) + classifiers = {} + for (k, v) in results: + classifiers[k] = v + return classifiers + + def _get_metrics(self): + columns = ['metrics.name', 'metrics.value', 'metrics.units', + 'metrics.lower_is_better', + 'metrics.oid', 'metrics._pod_version', + 'metrics._pod_serialization_version'] + tables = ['metrics'] + joins = [('classifiers', 'classifiers.metric_oid = metrics.oid')] + conditions = ['metrics.{}_oid = \'{}\''.format(self.kind, self.oid)] + pod = self._read_db(columns, tables, conditions, joins) + return self._populate_classifers(pod, 'metric') + + def _get_status(self): + columns = ['{}s.status'.format(self.kind)] + tables = ['{}s'.format(self.kind)] + conditions = ['{}s.oid = \'{}\''.format(self.kind, self.oid)] + results = self._read_db(columns, tables, conditions, as_dict=False) + if results: + return results[0][0] + else: + return None + + def _get_artifacts(self): + columns = ['artifacts.name', 'artifacts.description', 'artifacts.kind', + ('largeobjects.lo_oid', 'path'), 'artifacts.oid', + 'artifacts._pod_version', 'artifacts._pod_serialization_version'] + tables = ['largeobjects', 'artifacts'] + joins = [('classifiers', 'classifiers.artifact_oid = artifacts.oid')] + conditions = ['artifacts.{}_oid = \'{}\''.format(self.kind, self.oid), + 'artifacts.large_object_uuid = largeobjects.oid', + 'artifacts.job_oid IS NULL'] + pod = self._read_db(columns, tables, conditions, joins) + for artifact in pod: + artifact['path'] = str(artifact['path']) + return self._populate_classifers(pod, 'metric') + + def _get_events(self): + columns = ['events.message', 'events.timestamp'] + tables = ['events'] + conditions = ['events.{}_oid = \'{}\''.format(self.kind, self.oid)] + return self._read_db(columns, tables, conditions) + + +def kernel_config_from_db(raw): + kernel_config = {} + for k, v in zip(raw[0], raw[1]): + kernel_config[k] = v + return kernel_config + + +class RunDatabaseOutput(DatabaseOutput, RunOutputCommon): + + kind = 'run' + + @property + def basepath(self): + return 'db:({})-{}@{}:{}'.format(self.dbname, self.user, + self.host, self.port) + + @property + def augmentations(self): + columns = ['augmentations.name'] + tables = ['augmentations'] + conditions = ['augmentations.run_oid = \'{}\''.format(self.oid)] + results = self._read_db(columns, tables, conditions, as_dict=False) + return [a for augs in results for a in augs] + + @property + def _db_infofile(self): + columns = ['start_time', 'project', ('run_uuid', 'uuid'), 'end_time', + 'run_name', 'duration', '_pod_version', '_pod_serialization_version'] + tables = ['runs'] + conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)] + pod = self._read_db(columns, tables, conditions) + if not pod: + return {} + return pod[0] + + @property + def _db_targetfile(self): + columns = ['os', 'is_rooted', 'target', 'abi', 'cpus', 'os_version', + 'hostid', 'hostname', 'kernel_version', 'kernel_release', + 'kernel_sha1', 'kernel_config', 'sched_features', + '_pod_version', '_pod_serialization_version'] + tables = ['targets'] + conditions = ['targets.run_oid = \'{}\''.format(self.oid)] + pod = self._read_db(columns, tables, conditions) + if not pod: + return {} + pod = pod[0] + try: + pod['cpus'] = [json.loads(cpu) for cpu in pod.pop('cpus')] + except SerializerSyntaxError: + pod['cpus'] = [] + logger.debug('Failed to deserialize target cpu information') + pod['kernel_config'] = kernel_config_from_db(pod['kernel_config']) + return pod + + @property + def _db_statefile(self): + # Read overall run information + columns = ['runs.state'] + tables = ['runs'] + conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)] + pod = self._read_db(columns, tables, conditions) + pod = pod[0].get('state') + if not pod: + return {} + + # Read job information + columns = ['jobs.job_id', 'jobs.oid'] + tables = ['jobs'] + conditions = ['jobs.run_oid = \'{}\''.format(self.oid)] + job_oids = self._read_db(columns, tables, conditions) + + # Match job oid with jobs from state file + for job in pod.get('jobs', []): + for job_oid in job_oids: + if job['id'] == job_oid['job_id']: + job['oid'] = job_oid['oid'] + break + return pod + + @property + def _db_jobsfile(self): + workload_params = self._get_parameters('workload') + runtime_params = self._get_parameters('runtime') + + columns = [('jobs.job_id', 'id'), 'jobs.label', 'jobs.workload_name', + 'jobs.oid', 'jobs._pod_version', 'jobs._pod_serialization_version'] + tables = ['jobs'] + conditions = ['jobs.run_oid = \'{}\''.format(self.oid)] + jobs = self._read_db(columns, tables, conditions) + + for job in jobs: + job['workload_parameters'] = workload_params.pop(job['oid'], {}) + job['runtime_parameters'] = runtime_params.pop(job['oid'], {}) + job.pop('oid') + return jobs + + @property + def _db_run_config(self): + pod = defaultdict(dict) + parameter_types = ['augmentation', 'resource_getter'] + for parameter_type in parameter_types: + columns = ['parameters.name', 'parameters.value', + 'parameters.value_type', + ('{}s.name'.format(parameter_type), '{}'.format(parameter_type))] + tables = ['parameters', '{}s'.format(parameter_type)] + conditions = ['parameters.run_oid = \'{}\''.format(self.oid), + 'parameters.type = \'{}\''.format(parameter_type), + 'parameters.{0}_oid = {0}s.oid'.format(parameter_type)] + configs = self._read_db(columns, tables, conditions) + for config in configs: + entry = {config['name']: json.loads(config['value'])} + pod['{}s'.format(parameter_type)][config.pop(parameter_type)] = entry + + # run config + columns = ['runs.max_retries', 'runs.allow_phone_home', + 'runs.bail_on_init_failure', 'runs.retry_on_status'] + tables = ['runs'] + conditions = ['runs.oid = \'{}\''.format(self.oid)] + config = self._read_db(columns, tables, conditions) + if not config: + return {} + + config = config[0] + # Convert back into a string representation of an enum list + config['retry_on_status'] = config['retry_on_status'][1:-1].split(',') + pod.update(config) + return pod + + def __init__(self, + password=None, + dbname='wa', + host='localhost', + port='5432', + user='postgres', + run_uuid=None, + list_runs=False): + + if psycopg2 is None: + msg = 'Please install the psycopg2 in order to connect to postgres databases' + raise HostError(msg) + + self.dbname = dbname + self.host = host + self.port = port + self.user = user + self.password = password + self.run_uuid = run_uuid + self.conn = None + + self.info = None + self.state = None + self.result = None + self.target_info = None + self._combined_config = None + self.jobs = [] + self.job_specs = [] + + self.connect() + super(RunDatabaseOutput, self).__init__(conn=self.conn, reload=False) + + if list_runs: + print('Available runs are:') + self._list_runs() + self.disconnect() + return + if not self.run_uuid: + print('Please specify "Run uuid"') + self._list_runs() + self.disconnect() + return + + if not self.oid: + self.oid = self._get_oid() + self.reload() + + def read_job_specs(self): + job_specs = [] + for job in self._db_jobsfile: + job_specs.append(JobSpec.from_pod(job)) + return job_specs + + def connect(self): + if self.conn and not self.conn.closed: + return + try: + self.conn = psycopg2.connect(dbname=self.dbname, + user=self.user, + host=self.host, + password=self.password, + port=self.port) + except Psycopg2Error as e: + raise HostError('Unable to connect to the Database: "{}'.format(e.args[0])) + + def disconnect(self): + self.conn.commit() + self.conn.close() + + def reload(self): + super(RunDatabaseOutput, self).reload() + info_pod = self._db_infofile + state_pod = self._db_statefile + if not info_pod or not state_pod: + msg = '"{}" does not appear to be a valid WA Database Output.' + raise ValueError(msg.format(self.oid)) + + self.info = RunInfo.from_pod(info_pod) + self.state = RunState.from_pod(state_pod) + self._combined_config = CombinedConfig.from_pod({'run_config': self._db_run_config}) + self.target_info = TargetInfo.from_pod(self._db_targetfile) + self.job_specs = self.read_job_specs() + + for job_state in self._db_statefile['jobs']: + job = JobDatabaseOutput(self.conn, job_state.get('oid'), job_state['id'], + job_state['label'], job_state['iteration'], + job_state['retries']) + job.status = job_state['status'] + job.spec = self.get_job_spec(job.id) + if job.spec is None: + logger.warning('Could not find spec for job {}'.format(job.id)) + self.jobs.append(job) + + def _get_oid(self): + columns = ['{}s.oid'.format(self.kind)] + tables = ['{}s'.format(self.kind)] + conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)] + oid = self._read_db(columns, tables, conditions, as_dict=False) + if not oid: + raise ConfigError('No matching run entries found for run_uuid {}'.format(self.run_uuid)) + if len(oid) > 1: + raise ConfigError('Multiple entries found for run_uuid: {}'.format(self.run_uuid)) + return oid[0][0] + + def _get_parameters(self, param_type): + columns = ['parameters.job_oid', 'parameters.name', 'parameters.value'] + tables = ['parameters'] + conditions = ['parameters.type = \'{}\''.format(param_type), + 'parameters.run_oid = \'{}\''.format(self.oid)] + params = self._read_db(columns, tables, conditions, as_dict=False) + parm_dict = defaultdict(dict) + for (job_oid, k, v) in params: + try: + parm_dict[job_oid][k] = json.loads(v) + except SerializerSyntaxError: + logger.debug('Failed to deserialize job_oid:{}-"{}":"{}"'.format(job_oid, k, v)) + return parm_dict + + def _list_runs(self): + columns = ['runs.run_uuid', 'runs.run_name', 'runs.project', + 'runs.project_stage', 'runs.status', 'runs.start_time', 'runs.end_time'] + tables = ['runs'] + pod = self._read_db(columns, tables) + if pod: + headers = ['Run Name', 'Project', 'Project Stage', 'Start Time', 'End Time', + 'run_uuid'] + run_list = [] + for entry in pod: + # Format times to display better + start_time = entry['start_time'] + end_time = entry['end_time'] + if start_time: + start_time = start_time.strftime("%Y-%m-%d %H:%M:%S") + if end_time: + end_time = end_time.strftime("%Y-%m-%d %H:%M:%S") + + run_list.append([ + entry['run_name'], + entry['project'], + entry['project_stage'], + start_time, + end_time, + entry['run_uuid']]) + + print(format_simple_table(run_list, headers)) + else: + print('No Runs Found') + + +class JobDatabaseOutput(DatabaseOutput): + + kind = 'job' + + def __init__(self, conn, oid, job_id, label, iteration, retry): + super(JobDatabaseOutput, self).__init__(conn, oid=oid) + self.id = job_id + self.label = label + self.iteration = iteration + self.retry = retry + self.result = None + self.spec = None + self.reload() + + def __repr__(self): + return '<{} {}-{}-{}>'.format(self.__class__.__name__, + self.id, self.label, self.iteration) + + def __str__(self): + return '{}-{}-{}'.format(self.id, self.label, self.iteration)