diff --git a/postgres-appliance/major_upgrade/inplace_upgrade.py b/postgres-appliance/major_upgrade/inplace_upgrade.py index 1e6e6d74f..fe095afca 100644 --- a/postgres-appliance/major_upgrade/inplace_upgrade.py +++ b/postgres-appliance/major_upgrade/inplace_upgrade.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +"""spilo ``--update-version`` machinery.""" import json import logging import os @@ -40,6 +41,16 @@ def patch_wale_prefix(value, new_version): def update_configs(new_version): + """ + Update the Patroni configuration file by setting the new Postgres version and updating the shared_preload_libraries. + Update the WAL-E/WAL-G envdir files by patching the WAL prefix with the new version if the old version is valid and different from the new version. + Checks if the extwlist.extensions parameter is set in the Patroni configuration file and appends the new extensions to it. + Writes the update Patroni configuration file and WAL-E/WAL-G envdir files. + + :param new_version: The new Postgres version to be upgrade. + + Returns: The path to the WAL-E/WAL-G envdir files if they exist, None otherwise. + """ from spilo_commons import append_extensions, get_bin_dir, get_patroni_config, write_file, write_patroni_config config = get_patroni_config() @@ -151,8 +162,7 @@ def get_desired_version(): If that fails, it retrieves the binary directory from the PGVERSION environment variable. Finally, it returns the version of the PostgreSQL binary located in the binary directory. - Returns: - str: The version of the PostgreSQL binary to be used for the upgrade. + Returns: str: The version of the PostgreSQL binary to be used for the upgrade. """ from spilo_commons import get_bin_dir, get_binary_version @@ -173,8 +183,7 @@ def check_patroni_api(self, member): :param member: The member to check the API for. - Returns: - True if the API request was successful and returned a 200 status code, False otherwise. + Returns: True if the API request was successful and returned a 200 status code, False otherwise. """ try: response = self.request(member, timeout=2, retries=0) @@ -183,6 +192,16 @@ def check_patroni_api(self, member): return logger.error('API request to %s name failed: %r', member.name, e) def toggle_pause(self, paused): + """ + It's responsible for enabling or disabling maintenance mode. + If the cluster is currently paused, this method will attempt to disable maintenance mode and + resume normal operation. If the cluster is not currently paused, this method will attempt to + enable maintenance mode and pause normal operation. + + :param paused: A boolean value indicating whether to enable or disable maintenance mode. + + Returns: bool: True if the maintenance mode was successfully enabled or disabled, False otherwise. + """ from patroni.config import get_global_config from patroni.utils import polling_loop @@ -211,6 +230,13 @@ def toggle_pause(self, paused): return logger.error("%s members didn't recognized pause state after %s seconds", remaining, ttl) def resume_cluster(self): + """ + Resumes the cluster by disabling maintenance mode. + If the cluster is currently paused, this method will attempt to disable + maintenance mode and resume normal operation. + + Raises: Exception: If an error occurs while resuming the cluster. + """ if self.paused: try: logger.info('Disabling maintenance mode') @@ -222,6 +248,10 @@ def ensure_replicas_state(self, cluster): """ This method checks the satatus of all replicas and also tries to open connections to all of them and puts into the `self.replica_connections` dict for a future usage. + + :param cluster: cluster object representing the PostgreSQL cluster. + + Returns: bool: True if all replicas are streaming from the primary and are healthy, False otherwise. """ self.replica_connections = {} streaming = {a: l for a, l in self.postgresql.query( @@ -230,6 +260,16 @@ def ensure_replicas_state(self, cluster): .format(self.postgresql.wal_name, self.postgresql.lsn_name))} def ensure_replica_state(member): + """ + Checks the state of a sinble replica and opens a connection to it. + checks if the replication lag on the replica is too high (more than 16 MB). + If the replica is streaming from the primary and is healthy, it opens a connection to it and + puts it into the `self.replica_connections` dictionary for a future usage. + + :param member: A member object representing the replica. + + Returns: bool: True if the replica is streaming from the primary and is healthy, False otherwise. + """ ip = member.conn_kwargs().get('host') lag = streaming.get(ip) if lag is None: @@ -256,6 +296,13 @@ def ensure_replica_state(member): return all(ensure_replica_state(member) for member in cluster.members if member.name != self.postgresql.name) def sanity_checks(self, cluster): + """ + Perform sanity checks before triggering an upgrade. + + :param cluster: cluster object representing the PostgreSQL cluster. + + Raises: Exception: If any of the sanity checks fail. + """ from patroni.config import get_global_config if not cluster.initialize: @@ -274,6 +321,13 @@ def sanity_checks(self, cluster): return self.ensure_replicas_state(cluster) def remove_initialize_key(self): + """ + Removes the initialize key from the cluster. + This method checks if the cluster has an initialize key set, and if so, attempts to remove it by canceling the + initialization process. It uses a polling loop to check the cluster's state multiple times before giving up. + + Returns: bool: True if the initialize key was successfully removed, False otherwise. + """ from patroni.utils import polling_loop for _ in polling_loop(10): @@ -286,6 +340,16 @@ def remove_initialize_key(self): logger.error('Failed to remove initialize key') def wait_for_replicas(self, checkpoint_lsn): + """ + Ensure that all replica nodes of a PostgreSQL database have caught up with the primary node to a specific + Log Sequence Number (LSN). + Importing the polling_loop function from the patroni.utils module. This function is used to create a + loop that polls for a certain condition. + + :param checkpoint_lsn: Sequence Number (LSN) up to which the replica nodes need to catch up + + Returns: bool: True if all replicas have caught up to the checkpoint_lsn. False otherwise. + """ from patroni.utils import polling_loop logger.info('Waiting for replica nodes to catch up with primary') @@ -319,6 +383,18 @@ def wait_for_replicas(self, checkpoint_lsn): logger.error('Node %s did not catched up. Lag=%s', name, checkpoint_lsn - lsn) def create_rsyncd_configs(self): + """ + It is responsible for creating configuration files for rsyncd, a daemon for rsync, which is a tool used + for copying and synchronizing files across systems. + + :param rsyncd_configs_created: A boolean attribute set to True indicating that the rsyncd configurations have been created. + :param rsyncd_conf_dir: A string representing the directory for the rsyncd configuration files (/run/rsync). + :param rsyncd_feedback_dir: A string representing a subdirectory for feedback (/run/rsync/feedback). + :param rsyncd_conf: A string representing the path for the main rsyncd configuration file (rsyncd.conf). + :param secrets_file: A string representing the path for the secrets file (rsyncd.secrets), which is located in the rsyncd configuration directory. + :param auth_users: A string representing a comma-separated list of all the keys in the replica_connections dictionary. + :param replica_ips: A string representing a comma-separated list of the first element of all the values in the replica_connections dictionary. + """ self.rsyncd_configs_created = True self.rsyncd_conf_dir = '/run/rsync' self.rsyncd_feedback_dir = os.path.join(self.rsyncd_conf_dir, 'feedback') @@ -354,11 +430,24 @@ def create_rsyncd_configs(self): os.chmod(secrets_file, 0o600) def start_rsyncd(self): + """ + Starts the rsync daemon for file synchronization. + This method creates the rsync daemon configuration file and starts the rsync daemon process. + The rsync daemon is started with the specified configuration file and runs in the foreground. + """ self.create_rsyncd_configs() self.rsyncd = subprocess.Popen(['rsync', '--daemon', '--no-detach', '--config=' + self.rsyncd_conf]) self.rsyncd_started = True def stop_rsyncd(self): + """ + Stops the rsync daemon and removes the rsync configuration directory. + If the rsync daemon is running, it will be killed. If the rsync configuration + directory exists, it will be removed. + + Raises: OSError: If there is an error killing the rsync daemon or removing the + rsync configuration directory. + """ if self.rsyncd_started: logger.info('Stopping rsyncd') try: @@ -375,6 +464,13 @@ def stop_rsyncd(self): logger.error('Failed to remove %s: %r', self.rsync_conf_dir, e) def checkpoint(self, member): + """ + Perform a checkpoint on a specific member. + + :param member: A tuple containing the name and the database connection of the member. + + Returns: A tuple containing the name of the member and a boolean indicating whether the checkpoint was successful. + """ name, (_, cur) = member try: cur.execute('CHECKPOINT') @@ -384,6 +480,15 @@ def checkpoint(self, member): return name, False def rsync_replicas(self, primary_ip): + """ + It is responsible for synchronizing replicas using rsync, a tool used for copying and synchronizing files across systems. + + :param primary_ip: A string representing the IP address of the primary node. + :param ret: A boolean attribute set to True indicating that the rsync was successful. + :param status: A dictionary containing the status of the rsync operation. + + Returns: bool: True if the rsync operation was successful, False otherwise. + """ from patroni.utils import polling_loop logger.info('Notifying replicas %s to start rsync', ','.join(self.replica_connections.keys())) @@ -441,6 +546,13 @@ def rsync_replicas(self, primary_ip): return ret def wait_replica_restart(self, member): + """ + Waits for the replica to restart after a major upgrade. + + :param member: The replica member object. + + Returns: str: The name of the replica member if it has restarted successfully, None otherwise. + """ from patroni.utils import polling_loop for _ in polling_loop(10): @@ -456,6 +568,13 @@ def wait_replica_restart(self, member): logger.error('Patroni on replica %s was not restarted in 10 seconds', member.name) def wait_replicas_restart(self, cluster): + """ + Waits for the restart of patroni on replicas. + + :param cluster: The cluster object representing the Postgres cluster. + + Returns: bool: True if all replicas successfully restarted, False otherwise. + """ members = [member for member in cluster.members if member.name in self.replica_connections] logger.info('Waiting for restart of patroni on replicas %s', ', '.join(m.name for m in members)) pool = ThreadPool(len(members)) @@ -466,6 +585,13 @@ def wait_replicas_restart(self, cluster): return all(results) def reset_custom_statistics_target(self): + """ + Resets the non-default statistics target before performing analyze. + This method retrieves the list of databases and their corresponding tables and columns + that have a custom statistics target set. It then resets the statistics target to -1 + for each column, effectively disabling custom statistics for those columns. + Also this method requires the `patroni.postgresql.connection` module. + """ from patroni.postgresql.connection import get_connection_cursor logger.info('Resetting non-default statistics target before analyze') @@ -485,6 +611,12 @@ def reset_custom_statistics_target(self): self._statistics[d[0]][table][column] = target def restore_custom_statistics_target(self): + """ + Restores the default statistics targets after an upgrade. + This method connects to each database specified in the `_statistics` attribute and executes + an ALTER TABLE statement for each table and column specified in the `_statistics` attribute. + The ALTER TABLE statement sets the statistics target for the column to the value specified. + """ from patroni.postgresql.connection import get_connection_cursor if not self._statistics: @@ -506,6 +638,14 @@ def restore_custom_statistics_target(self): logger.error("Failed to execute '%s'", query) def reanalyze(self): + """ + Reanalyzes the tables in the PostgreSQL database using the ANALYZE command. + This method iterates over the statistics stored in the `_statistics` attribute and executes the ANALYZE command + for each table in each database. It uses the `get_connection_cursor` function from the `patroni.postgresql.connection` + module to establish a connection to the local PostgreSQL instance. + + Raises: Exception: If there is an error executing the ANALYZE command for any table. + """ from patroni.postgresql.connection import get_connection_cursor if not self._statistics: @@ -525,6 +665,14 @@ def reanalyze(self): logger.error("Failed to execute '%s'", query) def analyze(self): + """ + Analyzes the database by resetting and restoring custom statistics targets. + This method first resets the custom statistics targets, then performs a database analysis, + and finally restores the custom statistics targets. If any error occurs during the process, + it logs the error message. + + Raises: Exception: If an error occurs during the reset or restore of custom statistics targets. + """ try: self.reset_custom_statistics_target() except Exception as e: @@ -536,6 +684,18 @@ def analyze(self): logger.error('Failed to restore custom statistics targets: %r', e) def do_upgrade(self): + """ + This method is responsible for upgrading a PostgreSQL database cluster. + It performs the following steps, checks if the upgrade, checks if the PostgreSQL instance is running and there is a leader, + checks if the cluster is ready to be upgraded, prepares the new PGDATA directory, drops possibly incompatible extensions, + run a pg_upgrade check, drops possibly incompatible objects, enable maintenance mode, stops the PostgreSQL instance, + starts rsyncd, waits for replicas to catch up, run a CHECKPOINT on replicas, execute the pg_upgrade, switches PGDATA directories, + removes the initialize key, kills Patroni, waits for Patroni to restart, starts the PostgreSQL instance, + updates the configuration files, performs a CHECKPOINT on replicas, rsyncs replicas, wait for replicas to restart, + run a database analyze, updates the extensions, run a post-upgrade cleanup, run a backup and execute a post-cleanup. + + Returns: bool: True if the upgrade was successful, False otherwise. + """ from patroni.utils import polling_loop if not self.upgrade_required: @@ -703,6 +863,10 @@ def do_upgrade(self): return ret def post_cleanup(self): + """ + Performs post-cleanup tasks after the upgrade process. + This method stops the rsync daemon, resumes the cluster, and cleans up the new PGDATA directory if it was created. + """ self.stop_rsyncd() self.resume_cluster() @@ -713,6 +877,14 @@ def post_cleanup(self): logger.error('Failed to remove new PGDATA %r', e) def try_upgrade(self, replica_count): + """ + Tries to perform the upgrade by setting the replica count and calling the do_upgrade method. + Finally, it performs post-cleanup operations. + + :param replica_count: The number of replicas to set before performing the upgrade. + + Returns: The result of the do_upgrade method. + """ try: self.replica_count = replica_count return self.do_upgrade() @@ -720,6 +892,11 @@ def try_upgrade(self, replica_count): self.post_cleanup() def start_backup(self, envdir): + """ + Initiates a new backup by calling the postgres_backup.sh script with the specified environment directory and data directory. + + :param envdir: The path string to the environment directory. + """ logger.info('Initiating a new backup...') if not os.fork(): subprocess.call(['nohup', 'envdir', envdir, '/scripts/postgres_backup.sh', self.postgresql.data_dir], @@ -728,6 +905,20 @@ def start_backup(self, envdir): # this function will be running in a clean environment, therefore we can't rely on DCS connection def rsync_replica(config, desired_version, primary_ip, pid): + """ + It is responsible for synch of the replica and primary during the upgrade process. + It imports the PostgresqlUpgrade class from the pg_upgrade module and the polling_loop function from the patroni.utils module. + Check if the PostgreSQL version in replica matches the desired version, stops PostgreSQL instance and switches the PGDATA directory. + Update the configuration files and restarts Patroni, remove the recovery.conf file and restarts Patroni again. + Returns the result of the cleanup_old_pgdata method. + + :param config: A Config object representing the Patroni configuration. + :param desired_version: A string representing the desired version of the PostgreSQL binary to be used for the upgrade. + :param primary_ip: A string representing the IP address of the primary node. + :param pid: An integer representing the process ID of the PostgreSQL backend process. + + Returns: int: 0 if the rsync was successful, 1 otherwise. + """ from pg_upgrade import PostgresqlUpgrade from patroni.utils import polling_loop @@ -813,6 +1004,14 @@ def rsync_replica(config, desired_version, primary_ip, pid): def main(): + """ + Starting point of the script. + Parses command line arguments and performs either an rsync_replica operation or an inplace upgrade. + Returns: + 0 if the operation is successful, + 1 if the operation fails, + 2 if the command line arguments are invalid. + """ from patroni.config import Config from spilo_commons import PATRONI_CONFIG_FILE diff --git a/postgres-appliance/major_upgrade/pg_upgrade.py b/postgres-appliance/major_upgrade/pg_upgrade.py index dee894b99..ea0a0d7d5 100644 --- a/postgres-appliance/major_upgrade/pg_upgrade.py +++ b/postgres-appliance/major_upgrade/pg_upgrade.py @@ -10,10 +10,29 @@ class _PostgresqlUpgrade(Postgresql): + """ + A class representing the PostgreSQL upgrade process. + This class extends the `Postgresql` class and provides methods for adjusting shared_preload_libraries, + starting the old cluster, dropping incompatible extensions and objects, updating extensions, + cleaning up old and new pgdata directories, switching pgdata directories, performing pg_upgrade, + preparing new pgdata, and analyzing the database. + + :ivar _old_bin_dir: The old PostgreSQL binary directory. + :ivar _old_config_values: A dictionary of old configuration values. + :ivar _old_data_dir: The old PostgreSQL data directory. + :ivar _new_data_dir: The new PostgreSQL data directory. + :ivar _version_file: The PostgreSQL version file. + :ivar _INCOMPATIBLE_EXTENSIONS: A tuple of incompatible extensions. + """ _INCOMPATIBLE_EXTENSIONS = ('amcheck_next', 'pg_repack',) def adjust_shared_preload_libraries(self, version): + """ + Adjusts the shared_preload_libraries parameter based on the given version. + + :param version: The string version of PostgreSQL being upgraded to. + """ from spilo_commons import adjust_extensions shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries') @@ -24,17 +43,38 @@ def adjust_shared_preload_libraries(self, version): adjust_extensions(shared_preload_libraries, version) def no_bg_mon(self): + """ + Remove 'bg_mon' from the 'shared_preload_libraries' configuration parameter. + Checks if the 'shared_preload_libraries' configuration parameter is set, and if it is, + remove the 'bg_mon' library from the list of libraries. + + """ shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries') if shared_preload_libraries: tmp = filter(lambda a: a != "bg_mon", map(lambda a: a.strip(), shared_preload_libraries.split(","))) self.config.get('parameters')['shared_preload_libraries'] = ",".join(tmp) def restore_shared_preload_libraries(self): + """ + Restores the value of shared_preload_libraries to its original value. + If the _old_shared_preload_libraries attribute is set, it restores the value of shared_preload_libraries + to the stored value in the _old_shared_preload_libraries attribute. + + Returns: bool: True if the shared_preload_libraries value was successfully restored, False otherwise. + """ if getattr(self, '_old_shared_preload_libraries'): self.config.get('parameters')['shared_preload_libraries'] = self._old_shared_preload_libraries return True def start_old_cluster(self, config, version): + """ + Starts the old cluster with the specified configuration and version. + + :param config (dict): The configuration for the old cluster. + :param version (float): The version of the old cluster. + + Returns: bool: True if the old cluster was successfully started, False otherwise. + """ self.set_bin_dir(version) # make sure we don't archive wals from the old version @@ -47,10 +87,20 @@ def start_old_cluster(self, config, version): return self.bootstrap.bootstrap(config) def get_cluster_version(self): + """ + Get the version of the cluster. + + Returns: str: The version of the cluster. + """ with open(self._version_file) as f: return f.read().strip() def set_bin_dir(self, version): + """ + Sets the binary directory for the specified version. + + :param version: The string version of PostgreSQL. + """ from spilo_commons import get_bin_dir self._old_bin_dir = self._bin_dir @@ -58,15 +108,34 @@ def set_bin_dir(self, version): @property def local_conn_kwargs(self): + """ + Returns the connection kwargs for the local database. + + The returned kwargs include options for synchronous_commit, statement_timeout, and search_path. + The connect_timeout option is removed from the kwargs. + + Returns: dict: The connection kwargs for the local database. + """ conn_kwargs = self.config.local_connect_kwargs conn_kwargs['options'] = '-c synchronous_commit=local -c statement_timeout=0 -c search_path=' conn_kwargs.pop('connect_timeout', None) return conn_kwargs def _get_all_databases(self): + """ + Retrieve a list of all databases in the PostgreSQL cluster. + + Returns: list: A list of database names. + """ return [d[0] for d in self.query('SELECT datname FROM pg_catalog.pg_database WHERE datallowconn')] def drop_possibly_incompatible_extensions(self): + """ + Drops extensions from the cluster which could be incompatible. + It iterates over all databases in the cluster and drops the extensions + specified in the `_INCOMPATIBLE_EXTENSIONS` list if they exist. + It uses the `patroni.postgresql.connection.get_connection_cursor` function to establish a connection to each database. + """ from patroni.postgresql.connection import get_connection_cursor logger.info('Dropping extensions from the cluster which could be incompatible') @@ -80,6 +149,11 @@ def drop_possibly_incompatible_extensions(self): cur.execute("DROP EXTENSION IF EXISTS {0}".format(ext)) def drop_possibly_incompatible_objects(self): + """ + Drops objects from the cluster which could be incompatible. + It iterates over all databases in the cluster and drops the objects from `_INCOMPATIBLE_EXTENSIONS`. + It uses the `patroni.postgresql.connection.get_connection_cursor` function to establish a connection to each database. + """ from patroni.postgresql.connection import get_connection_cursor logger.info('Dropping objects from the cluster which could be incompatible') @@ -110,6 +184,13 @@ def drop_possibly_incompatible_objects(self): logger.error('Failed: %r', e) def update_extensions(self): + """ + Updates the extensions in the PostgreSQL databases. + Connects to each database and executes the 'ALTER EXTENSION UPDATE' command + for each extension found in the database. + + Raises: Any exception raised during the execution of the 'ALTER EXTENSION UPDATE' command. + """ from patroni.postgresql.connection import get_connection_cursor conn_kwargs = self.local_conn_kwargs @@ -128,20 +209,38 @@ def update_extensions(self): @staticmethod def remove_new_data(d): + """ + Remove the new data directory. + + :param d: The string directory path to be removed. + """ if d.endswith('_new') and os.path.isdir(d): shutil.rmtree(d) def cleanup_new_pgdata(self): + """ + Cleans up the new PostgreSQL data directory. + If the `_new_data_dir` attribute is set, this method removes the new data directory. + """ if getattr(self, '_new_data_dir', None): self.remove_new_data(self._new_data_dir) def cleanup_old_pgdata(self): + """ + Removes the old data directory if it exists. + + Returns: bool: True if the old data directory was successfully removed, False otherwise. + """ if os.path.exists(self._old_data_dir): logger.info('Removing %s', self._old_data_dir) shutil.rmtree(self._old_data_dir) return True def switch_pgdata(self): + """ + Switches the PostgreSQL data directory by renaming the current data directory to a old directory, + and renaming the new data directory to the current data directory. + """ self._old_data_dir = self._data_dir + '_old' self.cleanup_old_pgdata() os.rename(self._data_dir, self._old_data_dir) @@ -151,6 +250,10 @@ def switch_pgdata(self): return True def switch_back_pgdata(self): + """ + Switches back to the original data directory by renaming the new data directory to the original data directory name. + If the original data directory exists, it is renamed to a backup name before renaming the new data directory. + """ if os.path.exists(self._data_dir): self._new_data_dir = self._data_dir + '_new' self.cleanup_new_pgdata() @@ -158,6 +261,16 @@ def switch_back_pgdata(self): os.rename(self._old_data_dir, self._data_dir) def pg_upgrade(self, check=False): + """ + It performs the pg_upgrade process using the `pg_upgrade` command to perform the upgrade process. + The `psutil.cpu_count` set the number of CPUs to use in the upgrade, `shutil.rmtree` remove the upgrade directory, + `os.makedirs` creates the upgrade directory, `os.chdir` changes the current directory to the upgrade directory, + `subprocess.call` execute the `pg_upgrade` command. + + :param check: A boolean value indicating whether to perform a check or not. + + Returns: bool: True if the pg_upgrade process was successful, False otherwise. + """ upgrade_dir = self._data_dir + '_upgrade' if os.path.exists(upgrade_dir) and os.path.isdir(upgrade_dir): shutil.rmtree(upgrade_dir) @@ -247,6 +360,11 @@ def prepare_new_pgdata(self, version): return True def do_upgrade(self): + """ + Performs the upgrade process for the PostgreSQL appliance. + + Returns: bool: True if the upgrade process is successful, False otherwise. + """ return self.pg_upgrade() and self.restore_shared_preload_libraries()\ and self.switch_pgdata() and self.cleanup_old_pgdata() @@ -279,6 +397,13 @@ def analyze(self, in_stages=False): def PostgresqlUpgrade(config): + """ + Upgrade the PostgreSQL database using the provided configuration. + + :param config: A dictionary containing the PostgreSQL configuration. + + Returns: _PostgresqlUpgrade: An instance of the _PostgresqlUpgrade class. + """ config['postgresql'].update({'callbacks': {}, 'pg_ctl_timeout': 3600*24*7}) # avoid unnecessary interactions with PGDATA and postgres