Skip to content

Commit

Permalink
add new docstring
Browse files Browse the repository at this point in the history
  • Loading branch information
jopadi committed Nov 22, 2023
1 parent be7e79a commit f1ae318
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 4 deletions.
207 changes: 203 additions & 4 deletions postgres-appliance/major_upgrade/inplace_upgrade.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python
"""spilo ``--update-version`` machinery."""
import json
import logging
import os
Expand Down Expand Up @@ -40,6 +41,16 @@ def patch_wale_prefix(value, new_version):


def update_configs(new_version):
"""
Update the Patroni configuration file by setting the new Postgres version and updating the shared_preload_libraries.
Update the WAL-E/WAL-G envdir files by patching the WAL prefix with the new version if the old version is valid and different from the new version.
Checks if the extwlist.extensions parameter is set in the Patroni configuration file and appends the new extensions to it.
Writes the update Patroni configuration file and WAL-E/WAL-G envdir files.
:param new_version: The new Postgres version to be upgrade.
Returns: The path to the WAL-E/WAL-G envdir files if they exist, None otherwise.
"""
from spilo_commons import append_extensions, get_bin_dir, get_patroni_config, write_file, write_patroni_config

config = get_patroni_config()
Expand Down Expand Up @@ -151,8 +162,7 @@ def get_desired_version():
If that fails, it retrieves the binary directory from the PGVERSION environment variable.
Finally, it returns the version of the PostgreSQL binary located in the binary directory.
Returns:
str: The version of the PostgreSQL binary to be used for the upgrade.
Returns: str: The version of the PostgreSQL binary to be used for the upgrade.
"""
from spilo_commons import get_bin_dir, get_binary_version

Expand All @@ -173,8 +183,7 @@ def check_patroni_api(self, member):
:param member: The member to check the API for.
Returns:
True if the API request was successful and returned a 200 status code, False otherwise.
Returns: True if the API request was successful and returned a 200 status code, False otherwise.
"""
try:
response = self.request(member, timeout=2, retries=0)
Expand All @@ -183,6 +192,16 @@ def check_patroni_api(self, member):
return logger.error('API request to %s name failed: %r', member.name, e)

def toggle_pause(self, paused):
"""
It's responsible for enabling or disabling maintenance mode.
If the cluster is currently paused, this method will attempt to disable maintenance mode and
resume normal operation. If the cluster is not currently paused, this method will attempt to
enable maintenance mode and pause normal operation.
:param paused: A boolean value indicating whether to enable or disable maintenance mode.
Returns: bool: True if the maintenance mode was successfully enabled or disabled, False otherwise.
"""
from patroni.config import get_global_config
from patroni.utils import polling_loop

Expand Down Expand Up @@ -211,6 +230,13 @@ def toggle_pause(self, paused):
return logger.error("%s members didn't recognized pause state after %s seconds", remaining, ttl)

def resume_cluster(self):
"""
Resumes the cluster by disabling maintenance mode.
If the cluster is currently paused, this method will attempt to disable
maintenance mode and resume normal operation.
Raises: Exception: If an error occurs while resuming the cluster.
"""
if self.paused:
try:
logger.info('Disabling maintenance mode')
Expand All @@ -222,6 +248,10 @@ def ensure_replicas_state(self, cluster):
"""
This method checks the satatus of all replicas and also tries to open connections
to all of them and puts into the `self.replica_connections` dict for a future usage.
:param cluster: cluster object representing the PostgreSQL cluster.
Returns: bool: True if all replicas are streaming from the primary and are healthy, False otherwise.
"""
self.replica_connections = {}
streaming = {a: l for a, l in self.postgresql.query(
Expand All @@ -230,6 +260,16 @@ def ensure_replicas_state(self, cluster):
.format(self.postgresql.wal_name, self.postgresql.lsn_name))}

def ensure_replica_state(member):
"""
Checks the state of a sinble replica and opens a connection to it.
checks if the replication lag on the replica is too high (more than 16 MB).
If the replica is streaming from the primary and is healthy, it opens a connection to it and
puts it into the `self.replica_connections` dictionary for a future usage.
:param member: A member object representing the replica.
Returns: bool: True if the replica is streaming from the primary and is healthy, False otherwise.
"""
ip = member.conn_kwargs().get('host')
lag = streaming.get(ip)
if lag is None:
Expand All @@ -256,6 +296,13 @@ def ensure_replica_state(member):
return all(ensure_replica_state(member) for member in cluster.members if member.name != self.postgresql.name)

def sanity_checks(self, cluster):
"""
Perform sanity checks before triggering an upgrade.
:param cluster: cluster object representing the PostgreSQL cluster.
Raises: Exception: If any of the sanity checks fail.
"""
from patroni.config import get_global_config

if not cluster.initialize:
Expand All @@ -274,6 +321,13 @@ def sanity_checks(self, cluster):
return self.ensure_replicas_state(cluster)

def remove_initialize_key(self):
"""
Removes the initialize key from the cluster.
This method checks if the cluster has an initialize key set, and if so, attempts to remove it by canceling the
initialization process. It uses a polling loop to check the cluster's state multiple times before giving up.
Returns: bool: True if the initialize key was successfully removed, False otherwise.
"""
from patroni.utils import polling_loop

for _ in polling_loop(10):
Expand All @@ -286,6 +340,16 @@ def remove_initialize_key(self):
logger.error('Failed to remove initialize key')

def wait_for_replicas(self, checkpoint_lsn):
"""
Ensure that all replica nodes of a PostgreSQL database have caught up with the primary node to a specific
Log Sequence Number (LSN).
Importing the polling_loop function from the patroni.utils module. This function is used to create a
loop that polls for a certain condition.
:param checkpoint_lsn: Sequence Number (LSN) up to which the replica nodes need to catch up
Returns: bool: True if all replicas have caught up to the checkpoint_lsn. False otherwise.
"""
from patroni.utils import polling_loop

logger.info('Waiting for replica nodes to catch up with primary')
Expand Down Expand Up @@ -319,6 +383,18 @@ def wait_for_replicas(self, checkpoint_lsn):
logger.error('Node %s did not catched up. Lag=%s', name, checkpoint_lsn - lsn)

def create_rsyncd_configs(self):
"""
It is responsible for creating configuration files for rsyncd, a daemon for rsync, which is a tool used
for copying and synchronizing files across systems.
:param rsyncd_configs_created: A boolean attribute set to True indicating that the rsyncd configurations have been created.
:param rsyncd_conf_dir: A string representing the directory for the rsyncd configuration files (/run/rsync).
:param rsyncd_feedback_dir: A string representing a subdirectory for feedback (/run/rsync/feedback).
:param rsyncd_conf: A string representing the path for the main rsyncd configuration file (rsyncd.conf).
:param secrets_file: A string representing the path for the secrets file (rsyncd.secrets), which is located in the rsyncd configuration directory.
:param auth_users: A string representing a comma-separated list of all the keys in the replica_connections dictionary.
:param replica_ips: A string representing a comma-separated list of the first element of all the values in the replica_connections dictionary.
"""
self.rsyncd_configs_created = True
self.rsyncd_conf_dir = '/run/rsync'
self.rsyncd_feedback_dir = os.path.join(self.rsyncd_conf_dir, 'feedback')
Expand Down Expand Up @@ -354,11 +430,24 @@ def create_rsyncd_configs(self):
os.chmod(secrets_file, 0o600)

def start_rsyncd(self):
"""
Starts the rsync daemon for file synchronization.
This method creates the rsync daemon configuration file and starts the rsync daemon process.
The rsync daemon is started with the specified configuration file and runs in the foreground.
"""
self.create_rsyncd_configs()
self.rsyncd = subprocess.Popen(['rsync', '--daemon', '--no-detach', '--config=' + self.rsyncd_conf])
self.rsyncd_started = True

def stop_rsyncd(self):
"""
Stops the rsync daemon and removes the rsync configuration directory.
If the rsync daemon is running, it will be killed. If the rsync configuration
directory exists, it will be removed.
Raises: OSError: If there is an error killing the rsync daemon or removing the
rsync configuration directory.
"""
if self.rsyncd_started:
logger.info('Stopping rsyncd')
try:
Expand All @@ -375,6 +464,13 @@ def stop_rsyncd(self):
logger.error('Failed to remove %s: %r', self.rsync_conf_dir, e)

def checkpoint(self, member):
"""
Perform a checkpoint on a specific member.
:param member: A tuple containing the name and the database connection of the member.
Returns: A tuple containing the name of the member and a boolean indicating whether the checkpoint was successful.
"""
name, (_, cur) = member
try:
cur.execute('CHECKPOINT')
Expand All @@ -384,6 +480,15 @@ def checkpoint(self, member):
return name, False

def rsync_replicas(self, primary_ip):
"""
It is responsible for synchronizing replicas using rsync, a tool used for copying and synchronizing files across systems.
:param primary_ip: A string representing the IP address of the primary node.
:param ret: A boolean attribute set to True indicating that the rsync was successful.
:param status: A dictionary containing the status of the rsync operation.
Returns: bool: True if the rsync operation was successful, False otherwise.
"""
from patroni.utils import polling_loop

logger.info('Notifying replicas %s to start rsync', ','.join(self.replica_connections.keys()))
Expand Down Expand Up @@ -441,6 +546,13 @@ def rsync_replicas(self, primary_ip):
return ret

def wait_replica_restart(self, member):
"""
Waits for the replica to restart after a major upgrade.
:param member: The replica member object.
Returns: str: The name of the replica member if it has restarted successfully, None otherwise.
"""
from patroni.utils import polling_loop

for _ in polling_loop(10):
Expand All @@ -456,6 +568,13 @@ def wait_replica_restart(self, member):
logger.error('Patroni on replica %s was not restarted in 10 seconds', member.name)

def wait_replicas_restart(self, cluster):
"""
Waits for the restart of patroni on replicas.
:param cluster: The cluster object representing the Postgres cluster.
Returns: bool: True if all replicas successfully restarted, False otherwise.
"""
members = [member for member in cluster.members if member.name in self.replica_connections]
logger.info('Waiting for restart of patroni on replicas %s', ', '.join(m.name for m in members))
pool = ThreadPool(len(members))
Expand All @@ -466,6 +585,13 @@ def wait_replicas_restart(self, cluster):
return all(results)

def reset_custom_statistics_target(self):
"""
Resets the non-default statistics target before performing analyze.
This method retrieves the list of databases and their corresponding tables and columns
that have a custom statistics target set. It then resets the statistics target to -1
for each column, effectively disabling custom statistics for those columns.
Also this method requires the `patroni.postgresql.connection` module.
"""
from patroni.postgresql.connection import get_connection_cursor

logger.info('Resetting non-default statistics target before analyze')
Expand All @@ -485,6 +611,12 @@ def reset_custom_statistics_target(self):
self._statistics[d[0]][table][column] = target

def restore_custom_statistics_target(self):
"""
Restores the default statistics targets after an upgrade.
This method connects to each database specified in the `_statistics` attribute and executes
an ALTER TABLE statement for each table and column specified in the `_statistics` attribute.
The ALTER TABLE statement sets the statistics target for the column to the value specified.
"""
from patroni.postgresql.connection import get_connection_cursor

if not self._statistics:
Expand All @@ -506,6 +638,14 @@ def restore_custom_statistics_target(self):
logger.error("Failed to execute '%s'", query)

def reanalyze(self):
"""
Reanalyzes the tables in the PostgreSQL database using the ANALYZE command.
This method iterates over the statistics stored in the `_statistics` attribute and executes the ANALYZE command
for each table in each database. It uses the `get_connection_cursor` function from the `patroni.postgresql.connection`
module to establish a connection to the local PostgreSQL instance.
Raises: Exception: If there is an error executing the ANALYZE command for any table.
"""
from patroni.postgresql.connection import get_connection_cursor

if not self._statistics:
Expand All @@ -525,6 +665,14 @@ def reanalyze(self):
logger.error("Failed to execute '%s'", query)

def analyze(self):
"""
Analyzes the database by resetting and restoring custom statistics targets.
This method first resets the custom statistics targets, then performs a database analysis,
and finally restores the custom statistics targets. If any error occurs during the process,
it logs the error message.
Raises: Exception: If an error occurs during the reset or restore of custom statistics targets.
"""
try:
self.reset_custom_statistics_target()
except Exception as e:
Expand All @@ -536,6 +684,18 @@ def analyze(self):
logger.error('Failed to restore custom statistics targets: %r', e)

def do_upgrade(self):
"""
This method is responsible for upgrading a PostgreSQL database cluster.
It performs the following steps, checks if the upgrade, checks if the PostgreSQL instance is running and there is a leader,
checks if the cluster is ready to be upgraded, prepares the new PGDATA directory, drops possibly incompatible extensions,
run a pg_upgrade check, drops possibly incompatible objects, enable maintenance mode, stops the PostgreSQL instance,
starts rsyncd, waits for replicas to catch up, run a CHECKPOINT on replicas, execute the pg_upgrade, switches PGDATA directories,
removes the initialize key, kills Patroni, waits for Patroni to restart, starts the PostgreSQL instance,
updates the configuration files, performs a CHECKPOINT on replicas, rsyncs replicas, wait for replicas to restart,
run a database analyze, updates the extensions, run a post-upgrade cleanup, run a backup and execute a post-cleanup.
Returns: bool: True if the upgrade was successful, False otherwise.
"""
from patroni.utils import polling_loop

if not self.upgrade_required:
Expand Down Expand Up @@ -703,6 +863,10 @@ def do_upgrade(self):
return ret

def post_cleanup(self):
"""
Performs post-cleanup tasks after the upgrade process.
This method stops the rsync daemon, resumes the cluster, and cleans up the new PGDATA directory if it was created.
"""
self.stop_rsyncd()
self.resume_cluster()

Expand All @@ -713,13 +877,26 @@ def post_cleanup(self):
logger.error('Failed to remove new PGDATA %r', e)

def try_upgrade(self, replica_count):
"""
Tries to perform the upgrade by setting the replica count and calling the do_upgrade method.
Finally, it performs post-cleanup operations.
:param replica_count: The number of replicas to set before performing the upgrade.
Returns: The result of the do_upgrade method.
"""
try:
self.replica_count = replica_count
return self.do_upgrade()
finally:
self.post_cleanup()

def start_backup(self, envdir):
"""
Initiates a new backup by calling the postgres_backup.sh script with the specified environment directory and data directory.
:param envdir: The path string to the environment directory.
"""
logger.info('Initiating a new backup...')
if not os.fork():
subprocess.call(['nohup', 'envdir', envdir, '/scripts/postgres_backup.sh', self.postgresql.data_dir],
Expand All @@ -728,6 +905,20 @@ def start_backup(self, envdir):

# this function will be running in a clean environment, therefore we can't rely on DCS connection
def rsync_replica(config, desired_version, primary_ip, pid):
"""
It is responsible for synch of the replica and primary during the upgrade process.
It imports the PostgresqlUpgrade class from the pg_upgrade module and the polling_loop function from the patroni.utils module.
Check if the PostgreSQL version in replica matches the desired version, stops PostgreSQL instance and switches the PGDATA directory.
Update the configuration files and restarts Patroni, remove the recovery.conf file and restarts Patroni again.
Returns the result of the cleanup_old_pgdata method.
:param config: A Config object representing the Patroni configuration.
:param desired_version: A string representing the desired version of the PostgreSQL binary to be used for the upgrade.
:param primary_ip: A string representing the IP address of the primary node.
:param pid: An integer representing the process ID of the PostgreSQL backend process.
Returns: int: 0 if the rsync was successful, 1 otherwise.
"""
from pg_upgrade import PostgresqlUpgrade
from patroni.utils import polling_loop

Expand Down Expand Up @@ -813,6 +1004,14 @@ def rsync_replica(config, desired_version, primary_ip, pid):


def main():
"""
Starting point of the script.
Parses command line arguments and performs either an rsync_replica operation or an inplace upgrade.
Returns:
0 if the operation is successful,
1 if the operation fails,
2 if the command line arguments are invalid.
"""
from patroni.config import Config
from spilo_commons import PATRONI_CONFIG_FILE

Expand Down
Loading

0 comments on commit f1ae318

Please sign in to comment.