mirror of
				https://github.com/ARM-software/workload-automation.git
				synced 2025-10-29 22:24:51 +00:00 
			
		
		
		
	commands/create: Allow for upgrading database schema
Provide a method of upgrading existing postgres databases to a new schema version.
This commit is contained in:
		| @@ -40,10 +40,12 @@ from wa.framework.exception import ConfigError, CommandError | ||||
| from wa.instruments.energy_measurement import EnergyInstrumentBackend | ||||
| from wa.utils.misc import (ensure_directory_exists as _d, capitalize, | ||||
|                            ensure_file_directory_exists as _f) | ||||
| from wa.utils.postgres import get_schema | ||||
| from wa.utils.serializer import yaml | ||||
|  | ||||
|  | ||||
| TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), 'templates') | ||||
| POSTGRES_SCHEMA_DIR = os.path.join(os.path.dirname(__file__), 'postgres_schemas') | ||||
|  | ||||
|  | ||||
| class CreateDatabaseSubcommand(SubCommand): | ||||
| @@ -54,14 +56,21 @@ class CreateDatabaseSubcommand(SubCommand): | ||||
|     output processor. | ||||
|     """ | ||||
|  | ||||
|     schemafilepath = 'postgres_schema.sql' | ||||
|     schemafilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql') | ||||
|     schemaupdatefilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema_update_v{}.{}.sql') | ||||
|  | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         super(CreateDatabaseSubcommand, self).__init__(*args, **kwargs) | ||||
|         self.sql_commands = None | ||||
|         self.schemaversion = None | ||||
|         self.schema_major = None | ||||
|         self.schema_minor = None | ||||
|         self.postgres_host = None | ||||
|         self.postgres_port = None | ||||
|         self.username = None | ||||
|         self.password = None | ||||
|         self.dbname = None | ||||
|         self.config_file = None | ||||
|         self.force = None | ||||
|  | ||||
|     def initialize(self, context): | ||||
|         self.parser.add_argument( | ||||
| @@ -91,25 +100,35 @@ class CreateDatabaseSubcommand(SubCommand): | ||||
|         self.parser.add_argument( | ||||
|             '-x', '--schema-version', action='store_true', | ||||
|             help='Display the current schema version.') | ||||
|         self.parser.add_argument( | ||||
|             '-U', '--upgrade', action='store_true', | ||||
|             help='Upgrade the database to use the latest schema version.') | ||||
|  | ||||
|     def execute(self, state, args):  # pylint: disable=too-many-branches | ||||
|         if not psycopg2: | ||||
|             raise CommandError( | ||||
|                 'The module psycopg2 is required for the wa ' + | ||||
|                 'create database command.') | ||||
|         self.get_schema(self.schemafilepath) | ||||
|  | ||||
|         # Display the version if needed and exit | ||||
|         if args.schema_version: | ||||
|             self.logger.info( | ||||
|                 'The current schema version is {}'.format(self.schemaversion)) | ||||
|             return | ||||
|  | ||||
|         if args.dbname == 'postgres': | ||||
|             raise ValueError('Databasename to create cannot be postgres.') | ||||
|  | ||||
|         self._parse_args(args) | ||||
|         self.schema_major, self.schema_minor, self.sql_commands = _get_schema(self.schemafilepath) | ||||
|  | ||||
|         # Display the version if needed and exit | ||||
|         if args.schema_version: | ||||
|             self.logger.info( | ||||
|                 'The current schema version is {}.{}'.format(self.schema_major, | ||||
|                                                              self.schema_minor)) | ||||
|             return | ||||
|  | ||||
|         if args.upgrade: | ||||
|             self.update_schema() | ||||
|             return | ||||
|  | ||||
|         # Open user configuration | ||||
|         with open(args.config_file, 'r') as config_file: | ||||
|         with open(self.config_file, 'r') as config_file: | ||||
|             config = yaml.load(config_file) | ||||
|             if 'postgres' in config and not args.force_update_config: | ||||
|                 raise CommandError( | ||||
| @@ -149,39 +168,158 @@ class CreateDatabaseSubcommand(SubCommand): | ||||
|  | ||||
|         # Attempt to create database | ||||
|         try: | ||||
|             self.create_database(args) | ||||
|             self.create_database() | ||||
|         except OperationalError as e: | ||||
|             for handle in possible_connection_errors: | ||||
|                 predicate(e, handle) | ||||
|             raise e | ||||
|  | ||||
|         # Update the configuration file | ||||
|         _update_configuration_file(args, config) | ||||
|         self._update_configuration_file(config) | ||||
|  | ||||
|     def create_database(self, args): | ||||
|         _validate_version(args) | ||||
|     def create_database(self): | ||||
|         self._validate_version() | ||||
|  | ||||
|         _check_database_existence(args) | ||||
|         self._check_database_existence() | ||||
|  | ||||
|         _create_database_postgres(args) | ||||
|         self._create_database_postgres() | ||||
|  | ||||
|         _apply_database_schema(args, self.sql_commands, self.schema_major, self.schema_minor) | ||||
|         self._apply_database_schema(self.sql_commands, self.schema_major, self.schema_minor) | ||||
|  | ||||
|         self.logger.debug( | ||||
|             "Successfully created the database {}".format(args.dbname)) | ||||
|         self.logger.info( | ||||
|             "Successfully created the database {}".format(self.dbname)) | ||||
|  | ||||
|     def get_schema(self, schemafilepath): | ||||
|         postgres_output_processor_dir = os.path.dirname(__file__) | ||||
|         sqlfile = open(os.path.join( | ||||
|             postgres_output_processor_dir, schemafilepath)) | ||||
|         self.sql_commands = sqlfile.read() | ||||
|         sqlfile.close() | ||||
|         # Extract schema version | ||||
|         if self.sql_commands.startswith('--!VERSION'): | ||||
|             splitcommands = self.sql_commands.split('!ENDVERSION!\n') | ||||
|             self.schemaversion = splitcommands[0].strip('--!VERSION!') | ||||
|             (self.schema_major, self.schema_minor) = self.schemaversion.split('.') | ||||
|             self.sql_commands = splitcommands[1] | ||||
|     def update_schema(self): | ||||
|         self._validate_version() | ||||
|         schema_major, schema_minor, _ = _get_schema(self.schemafilepath) | ||||
|         meta_oid, current_major, current_minor = self._get_database_schema_version() | ||||
|  | ||||
|         while not (schema_major == current_major and schema_minor == current_minor): | ||||
|             current_minor = self._update_schema_minors(current_major, current_minor, meta_oid) | ||||
|             current_major, current_minor = self._update_schema_major(current_major, current_minor, meta_oid) | ||||
|         msg = "Database schema update of '{}' to v{}.{} complete" | ||||
|         self.logger.info(msg.format(self.dbname, schema_major, schema_minor)) | ||||
|  | ||||
|     def _update_schema_minors(self, major, minor, meta_oid): | ||||
|         # Upgrade all available minor versions | ||||
|         while True: | ||||
|             minor += 1 | ||||
|             schema_update = os.path.join(POSTGRES_SCHEMA_DIR, | ||||
|                                          self.schemaupdatefilepath.format(major, minor)) | ||||
|             if not os.path.exists(schema_update): | ||||
|                 break | ||||
|  | ||||
|             _, _, sql_commands = _get_schema(schema_update) | ||||
|             self._apply_database_schema(sql_commands, major, minor, meta_oid) | ||||
|             msg = "Updated the database schema to v{}.{}" | ||||
|             self.logger.debug(msg.format(major, minor)) | ||||
|  | ||||
|         # Return last existing update file version | ||||
|         return minor - 1 | ||||
|  | ||||
|     def _update_schema_major(self, current_major, current_minor, meta_oid): | ||||
|         current_major += 1 | ||||
|         schema_update = os.path.join(POSTGRES_SCHEMA_DIR, | ||||
|                                      self.schemaupdatefilepath.format(current_major, 0)) | ||||
|         if not os.path.exists(schema_update): | ||||
|             return (current_major - 1, current_minor) | ||||
|  | ||||
|         # Reset minor to 0 with major version bump | ||||
|         current_minor = 0 | ||||
|         _, _, sql_commands = _get_schema(schema_update) | ||||
|         self._apply_database_schema(sql_commands, current_major, current_minor, meta_oid) | ||||
|         msg = "Updated the database schema to v{}.{}" | ||||
|         self.logger.debug(msg.format(current_major, current_minor)) | ||||
|         return (current_major, current_minor) | ||||
|  | ||||
|     def _validate_version(self): | ||||
|         conn = connect(user=self.username, | ||||
|                        password=self.password, host=self.postgres_host, port=self.postgres_port) | ||||
|         if conn.server_version < 90400: | ||||
|             msg = 'Postgres version too low. Please ensure that you are using atleast v9.4' | ||||
|             raise CommandError(msg) | ||||
|  | ||||
|     def _get_database_schema_version(self): | ||||
|         conn = connect(dbname=self.dbname, user=self.username, | ||||
|                        password=self.password, host=self.postgres_host, port=self.postgres_port) | ||||
|         cursor = conn.cursor() | ||||
|         cursor.execute('''SELECT | ||||
|                                 DatabaseMeta.oid, | ||||
|                                 DatabaseMeta.schema_major, | ||||
|                                 DatabaseMeta.schema_minor | ||||
|                           FROM | ||||
|                                 DatabaseMeta;''') | ||||
|         return cursor.fetchone() | ||||
|  | ||||
|     def _check_database_existence(self): | ||||
|         try: | ||||
|             connect(dbname=self.dbname, user=self.username, | ||||
|                     password=self.password, host=self.postgres_host, port=self.postgres_port) | ||||
|         except OperationalError as e: | ||||
|             # Expect an operational error (database's non-existence) | ||||
|             if not re.compile('FATAL:  database ".*" does not exist').match(str(e)): | ||||
|                 raise e | ||||
|         else: | ||||
|             if not self.force: | ||||
|                 raise CommandError( | ||||
|                     "Database {} already exists. ".format(self.dbname) + | ||||
|                     "Please specify the -f flag to create it from afresh." | ||||
|                 ) | ||||
|  | ||||
|     def _create_database_postgres(self): | ||||
|         conn = connect(dbname='postgres', user=self.username, | ||||
|                        password=self.password, host=self.postgres_host, port=self.postgres_port) | ||||
|         conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) | ||||
|         cursor = conn.cursor() | ||||
|         cursor.execute('DROP DATABASE IF EXISTS ' + self.dbname) | ||||
|         cursor.execute('CREATE DATABASE ' + self.dbname) | ||||
|         conn.commit() | ||||
|         cursor.close() | ||||
|         conn.close() | ||||
|  | ||||
|     def _apply_database_schema(self, sql_commands, schema_major, schema_minor, meta_uuid=None): | ||||
|         conn = connect(dbname=self.dbname, user=self.username, | ||||
|                        password=self.password, host=self.postgres_host, port=self.postgres_port) | ||||
|         cursor = conn.cursor() | ||||
|         cursor.execute(sql_commands) | ||||
|  | ||||
|         if not meta_uuid: | ||||
|             extras.register_uuid() | ||||
|             meta_uuid = uuid.uuid4() | ||||
|             cursor.execute("INSERT INTO DatabaseMeta VALUES (%s, %s, %s)", | ||||
|                            (meta_uuid, | ||||
|                             schema_major, | ||||
|                             schema_minor | ||||
|                             )) | ||||
|         else: | ||||
|             cursor.execute("UPDATE DatabaseMeta SET schema_major = %s, schema_minor = %s WHERE oid = %s;", | ||||
|                            (schema_major, | ||||
|                             schema_minor, | ||||
|                             meta_uuid | ||||
|                             )) | ||||
|  | ||||
|         conn.commit() | ||||
|         cursor.close() | ||||
|         conn.close() | ||||
|  | ||||
|     def _update_configuration_file(self, config): | ||||
|         ''' Update the user configuration file with the newly created database's | ||||
|             configuration. | ||||
|             ''' | ||||
|         config['postgres'] = OrderedDict( | ||||
|             [('host', self.postgres_host), ('port', self.postgres_port), | ||||
|              ('dbname', self.dbname), ('username', self.username), ('password', self.password)]) | ||||
|         with open(self.config_file, 'w+') as config_file: | ||||
|             yaml.dump(config, config_file) | ||||
|  | ||||
|     def _parse_args(self, args): | ||||
|         self.postgres_host = args.postgres_host | ||||
|         self.postgres_port = args.postgres_port | ||||
|         self.username = args.username | ||||
|         self.password = args.password | ||||
|         self.dbname = args.dbname | ||||
|         self.config_file = args.config_file | ||||
|         self.force = args.force | ||||
|  | ||||
|  | ||||
| class CreateAgendaSubcommand(SubCommand): | ||||
| @@ -431,68 +569,20 @@ def touch(path): | ||||
|         pass | ||||
|  | ||||
|  | ||||
| def _validate_version(args): | ||||
|     conn = connect(user=args.username, | ||||
|                    password=args.password, host=args.postgres_host, port=args.postgres_port) | ||||
|     if conn.server_version < 90400: | ||||
|         msg = 'Postgres version too low. Please ensure that you are using atleast v9.4' | ||||
|         raise CommandError(msg) | ||||
| def _get_schema(schemafilepath): | ||||
|     sqlfile_path = os.path.join( | ||||
|         POSTGRES_SCHEMA_DIR, schemafilepath) | ||||
|  | ||||
|     with open(sqlfile_path, 'r') as sqlfile: | ||||
|         sql_commands = sqlfile.read() | ||||
|  | ||||
| def _check_database_existence(args): | ||||
|     try: | ||||
|         connect(dbname=args.dbname, user=args.username, | ||||
|                 password=args.password, host=args.postgres_host, port=args.postgres_port) | ||||
|     except OperationalError as e: | ||||
|         # Expect an operational error (database's non-existence) | ||||
|         if not re.compile('FATAL:  database ".*" does not exist').match(str(e)): | ||||
|             raise e | ||||
|     else: | ||||
|         if not args.force: | ||||
|             raise CommandError( | ||||
|                 "Database {} already exists. ".format(args.dbname) + | ||||
|                 "Please specify the -f flag to create it from afresh." | ||||
|             ) | ||||
|  | ||||
|  | ||||
| def _create_database_postgres(args):  # pylint: disable=no-self-use | ||||
|     conn = connect(dbname='postgres', user=args.username, | ||||
|                    password=args.password, host=args.postgres_host, port=args.postgres_port) | ||||
|     conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) | ||||
|     cursor = conn.cursor() | ||||
|     cursor.execute('DROP DATABASE IF EXISTS ' + args.dbname) | ||||
|     cursor.execute('CREATE DATABASE ' + args.dbname) | ||||
|     conn.commit() | ||||
|     cursor.close() | ||||
|     conn.close() | ||||
|  | ||||
|  | ||||
| def _apply_database_schema(args, sql_commands, schema_major, schema_minor): | ||||
|     conn = connect(dbname=args.dbname, user=args.username, | ||||
|                    password=args.password, host=args.postgres_host, port=args.postgres_port) | ||||
|     cursor = conn.cursor() | ||||
|     cursor.execute(sql_commands) | ||||
|  | ||||
|     extras.register_uuid() | ||||
|     cursor.execute("INSERT INTO DatabaseMeta VALUES (%s, %s, %s)", | ||||
|                    ( | ||||
|                        uuid.uuid4(), | ||||
|                        schema_major, | ||||
|                        schema_minor | ||||
|                    ) | ||||
|                    ) | ||||
|  | ||||
|     conn.commit() | ||||
|     cursor.close() | ||||
|     conn.close() | ||||
|  | ||||
|  | ||||
| def _update_configuration_file(args, config): | ||||
|     ''' Update the user configuration file with the newly created database's | ||||
|         configuration. | ||||
|         ''' | ||||
|     config['postgres'] = OrderedDict( | ||||
|         [('host', args.postgres_host), ('port', args.postgres_port), | ||||
|          ('dbname', args.dbname), ('username', args.username), ('password', args.password)]) | ||||
|     with open(args.config_file, 'w+') as config_file: | ||||
|         yaml.dump(config, config_file) | ||||
|     schema_major = None | ||||
|     schema_minor = None | ||||
|     # Extract schema version if present | ||||
|     if sql_commands.startswith('--!VERSION'): | ||||
|         splitcommands = sql_commands.split('!ENDVERSION!\n') | ||||
|         schema_major, schema_minor = splitcommands[0].strip('--!VERSION!').split('.') | ||||
|         schema_major = int(schema_major) | ||||
|         schema_minor = int(schema_minor) | ||||
|         sql_commands = splitcommands[1] | ||||
|     return schema_major, schema_minor, sql_commands | ||||
|   | ||||
		Reference in New Issue
	
	Block a user