diff --git a/wa/output_processors/sqlite.py b/wa/output_processors/sqlite.py new file mode 100644 index 00000000..e925e1a3 --- /dev/null +++ b/wa/output_processors/sqlite.py @@ -0,0 +1,207 @@ +# Copyright 2013-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pylint: disable=attribute-defined-outside-init + +import os +import sqlite3 +import uuid +from datetime import datetime, timedelta +from contextlib import contextmanager + +from wa import OutputProcessor, Parameter, OutputProcessorError +from wa.utils.serializer import json +from wa.utils.types import boolean + + +# IMPORTANT: when updating this schema, make sure to bump the version! +SCHEMA_VERSION = '0.0.2' +SCHEMA = [ + '''CREATE TABLE runs ( + uuid text, + start_time datetime, + end_time datetime, + duration integer + )''', + '''CREATE TABLE workload_specs ( + id text, + run_oid text, + number_of_iterations integer, + label text, + workload_name text, + boot_parameters text, + runtime_parameters text, + workload_parameters text + )''', + '''CREATE TABLE metrics ( + spec_oid int, + iteration integer, + metric text, + value text, + units text, + lower_is_better integer + )''', + '''CREATE VIEW results AS + SELECT uuid as run_uuid, spec_id, label as workload, iteration, metric, value, units, lower_is_better + FROM metrics AS m INNER JOIN ( + SELECT ws.OID as spec_oid, ws.id as spec_id, uuid, label + FROM workload_specs AS ws INNER JOIN runs AS r ON ws.run_oid = r.OID + ) AS wsr ON wsr.spec_oid = m.spec_oid + ''', + '''CREATE TABLE __meta ( + schema_version text + )''', + '''INSERT INTO __meta VALUES ("{}")'''.format(SCHEMA_VERSION), +] + + +sqlite3.register_adapter(datetime, lambda x: x.isoformat()) +sqlite3.register_adapter(timedelta, lambda x: x.total_seconds()) +sqlite3.register_adapter(uuid.UUID, str) + + +class SqliteResultProcessor(OutputProcessor): + + name = 'sqlite' + description = """ + Stores results in an sqlite database. + + This may be used to accumulate results of multiple runs in a single file. + + """ + parameters = [ + Parameter('database', default=None, + global_alias='sqlite_database', + description=""" + Full path to the sqlite database to be used. If this is not + specified then a new database file will be created in the + output directory. This setting can be used to accumulate + results from multiple runs in a single database. If the + specified file does not exist, it will be created, however + the directory of the file must exist. + + .. note:: The value must resolve to an absolute path, + relative paths are not allowed; however the + value may contain environment variables and/or + the home reference "~". + """), + Parameter('overwrite', kind=boolean, default=False, + global_alias='sqlite_overwrite', + description=""" + If ``True``, this will overwrite the database file + if it already exists. If ``False`` (the default) data + will be added to the existing file (provided schema + versions match -- otherwise an error will be raised). + """), + + ] + + def initialize(self): + self._last_spec = None + self._run_oid = None + self._spec_oid = None + self._run_initialized = False + + def export_job_output(self, job_output, target_info, run_output): + if not self._run_initialized: + self._init_run(run_output) + + if self._last_spec != job_output.spec: + self._update_spec(job_output.spec) + + metrics = [(self._spec_oid, job_output.iteration, m.name, str(m.value), m.units, int(m.lower_is_better)) + for m in job_output.metrics] + if metrics: + with self._open_connecton() as conn: + conn.executemany('INSERT INTO metrics VALUES (?,?,?,?,?,?)', metrics) + + def export_run_output(self, run_output, target_info): + if not self._run_initialized: + self._init_run(run_output) + + metrics = [(self._spec_oid, run_output.iteration, m.name, str(m.value), m.units, int(m.lower_is_better)) + for m in run_output.metrics] + if metrics: + with self._open_connecton() as conn: + conn.executemany('INSERT INTO metrics VALUES (?,?,?,?,?,?)', metrics) + + info = run_output.info + with self._open_connecton() as conn: + conn.execute('''UPDATE runs SET start_time=?, end_time=?, duration=? + WHERE OID=?''', (info.start_time, info.end_time, info.duration, self._run_oid)) + + def _init_run(self, run_output): + if not self.database: # pylint: disable=access-member-before-definition + self.database = os.path.join(run_output.basepath, 'results.sqlite') + self.database = os.path.expandvars(os.path.expanduser(self.database)) + + if not os.path.exists(self.database): + self._init_db() + elif self.overwrite: # pylint: disable=no-member + os.remove(self.database) + self._init_db() + else: + self._validate_schema_version() + self._update_run(run_output.info.uuid) + + # if the database file happens to be in the output directory, add it as an + # artifiact; if it isn't, then RunOutput doesn't need to keep track of it. + if not os.path.relpath(self.database, run_output.basepath).startswith('..'): + run_output.add_artifact('sqlitedb', self.database, kind='export') + + self._run_initialized = True + + def _init_db(self): + with self._open_connecton() as conn: + for command in SCHEMA: + conn.execute(command) + + def _validate_schema_version(self): + with self._open_connecton() as conn: + try: + c = conn.execute('SELECT schema_version FROM __meta') + found_version = c.fetchone()[0] + except sqlite3.OperationalError: + message = '{} does not appear to be a valid WA results database.'.format(self.database) + raise ResultProcessorError(message) + if found_version != SCHEMA_VERSION: + message = 'Schema version in {} ({}) does not match current version ({}).' + raise ResultProcessorError(message.format(self.database, found_version, SCHEMA_VERSION)) + + def _update_run(self, run_uuid): + with self._open_connecton() as conn: + conn.execute('INSERT INTO runs (uuid) VALUES (?)', (run_uuid,)) + conn.commit() + c = conn.execute('SELECT OID FROM runs WHERE uuid=?', (run_uuid,)) + self._run_oid = c.fetchone()[0] + + def _update_spec(self, spec): + self._last_spec = spec + spec_tuple = (spec.id, self._run_oid, spec.iterations, spec.label, spec.workload_name, + json.dumps(spec.boot_parameters), json.dumps(spec.runtime_parameters), + json.dumps(spec.workload_parameters.to_pod())) + with self._open_connecton() as conn: + conn.execute('INSERT INTO workload_specs VALUES (?,?,?,?,?,?,?,?)', spec_tuple) + conn.commit() + c = conn.execute('SELECT OID FROM workload_specs WHERE run_oid=? AND id=?', (self._run_oid, spec.id)) + self._spec_oid = c.fetchone()[0] + + @contextmanager + def _open_connecton(self): + conn = sqlite3.connect(self.database) + try: + yield conn + finally: + conn.commit()