Skip to content

Commit

Permalink
Sync dev-16 branch with the latest changes (#973)
Browse files Browse the repository at this point in the history
* Change maintainer to team ACID
* Patroni 3.1.2 + Update Patroni to 3.2.2
* Return pod_ip param to k8s config in Patroni
* Add replay-lag function
* Add toggle to enable grouping log by date/instance in s3
* Remove redundant code
* No pg_partman update from <5.X during major upgrade
* Add function to monitor sequences
* Ensure correct bin_dir for pg params configuration
  • Loading branch information
hughcapet authored Feb 21, 2024
1 parent 2b2d42d commit a4fbfee
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 35 deletions.
1 change: 1 addition & 0 deletions ENVIRONMENT.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Environment Configuration Settings
- **LOG_S3_ENDPOINT**: (optional) S3 Endpoint to use with Boto3
- **LOG_BUCKET_SCOPE_PREFIX**: (optional) using to build S3 file path like `/spilo/{LOG_BUCKET_SCOPE_PREFIX}{SCOPE}{LOG_BUCKET_SCOPE_SUFFIX}/log/`
- **LOG_BUCKET_SCOPE_SUFFIX**: (optional) same as above
- **LOG_GROUP_BY_DATE**: (optional) enable grouping log by date. Default is False - group the log files based on the instance ID.
- **DCS_ENABLE_KUBERNETES_API**: a non-empty value forces Patroni to use Kubernetes as a DCS. Default is empty.
- **KUBERNETES_USE_CONFIGMAPS**: a non-empty value makes Patroni store its metadata in ConfigMaps instead of Endpoints when running on Kubernetes. Default is empty.
- **KUBERNETES_ROLE_LABEL**: name of the label containing Postgres role when running on Kubernetens. Default is 'spilo-role'.
Expand Down
4 changes: 2 additions & 2 deletions postgres-appliance/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ COPY --from=dependencies-builder /builddeps/wal-g /usr/local/bin/
COPY build_scripts/patroni_wale.sh build_scripts/compress_build.sh /builddeps/

# Install patroni and wal-e
ENV PATRONIVERSION=3.0.2
ENV PATRONIVERSION=3.2.2
ENV WALE_VERSION=1.1.1

WORKDIR /
Expand All @@ -90,7 +90,7 @@ COPY --from=builder-false / /

FROM builder-${COMPRESS}

LABEL maintainer="Polina Bungina <polina.bungina@zalando.de>"
LABEL maintainer="Team ACID @ Zalando <team-acid@zalando.de>"

ARG PGVERSION
ARG TIMESCALEDB
Expand Down
1 change: 0 additions & 1 deletion postgres-appliance/bootstrap/clone_with_wale.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def fix_output(output):
started = None
for line in output.decode('utf-8').splitlines():
if not started:
started = re.match(r'^name\s+last_modified\s+', line)
started = re.match(r'^name\s+last_modified\s+', line) or re.match(r'^name\s+modified\s+', line)
if started:
line = line.replace(' modified ', ' last_modified ')
Expand Down
2 changes: 1 addition & 1 deletion postgres-appliance/bootstrap/maybe_pg_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def wait_end_of_recovery(postgresql):

for _ in polling_loop(postgresql.config.get('pg_ctl_timeout'), 10):
postgresql.reset_cluster_info_state(None)
if postgresql.is_leader():
if postgresql.is_primary():
break
logger.info('waiting for end of recovery of the old cluster')

Expand Down
16 changes: 8 additions & 8 deletions postgres-appliance/major_upgrade/inplace_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,22 @@ def check_patroni_api(self, member):
return logger.error('API request to %s name failed: %r', member.name, e)

def toggle_pause(self, paused):
from patroni.config import get_global_config
from patroni.utils import polling_loop

cluster = self.dcs.get_cluster()
config = cluster.config.data.copy()
if cluster.is_paused() == paused:
if get_global_config(cluster).is_paused == paused:
return logger.error('Cluster is %spaused, can not continue', ('' if paused else 'not '))

config['pause'] = paused
if not self.dcs.set_config_value(json.dumps(config, separators=(',', ':')), cluster.config.index):
if not self.dcs.set_config_value(json.dumps(config, separators=(',', ':')), cluster.config.version):
return logger.error('Failed to pause cluster, can not continue')

self.paused = paused

old = {m.name: m.index for m in cluster.members if m.api_url}
ttl = cluster.config.data.get('ttl', self.dcs.ttl)
ttl = config.get('ttl', self.dcs.ttl)
for _ in polling_loop(ttl + 1):
cluster = self.dcs.get_cluster()
if all(m.data.get('pause', False) == paused for m in cluster.members if m.name in old):
Expand Down Expand Up @@ -202,13 +203,15 @@ def ensure_replica_state(member):
return all(ensure_replica_state(member) for member in cluster.members if member.name != self.postgresql.name)

def sanity_checks(self, cluster):
from patroni.config import get_global_config

if not cluster.initialize:
return logger.error('Upgrade can not be triggered because the cluster is not initialized')

if len(cluster.members) != self.replica_count:
return logger.error('Upgrade can not be triggered because the number of replicas does not match (%s != %s)',
len(cluster.members), self.replica_count)
if cluster.is_paused():
if get_global_config(cluster).is_paused:
return logger.error('Upgrade can not be triggered because Patroni is in maintenance mode')

lock_owner = cluster.leader and cluster.leader.name
Expand Down Expand Up @@ -487,7 +490,7 @@ def do_upgrade(self):
self.cluster_version, self.desired_version)
return True

if not (self.postgresql.is_running() and self.postgresql.is_leader()):
if not (self.postgresql.is_running() and self.postgresql.is_primary()):
return logger.error('PostgreSQL is not running or in recovery')

cluster = self.dcs.get_cluster()
Expand Down Expand Up @@ -526,10 +529,7 @@ def do_upgrade(self):
if self.replica_connections:
from patroni.postgresql.misc import parse_lsn

# Make sure we use the pg_controldata from the correct major version
self.postgresql.set_bin_dir(self.cluster_version)
controldata = self.postgresql.controldata()
self.postgresql.set_bin_dir(self.desired_version)

checkpoint_lsn = controldata.get('Latest checkpoint location')
if controldata.get('Database cluster state') != 'shut down' or not checkpoint_lsn:
Expand Down
43 changes: 30 additions & 13 deletions postgres-appliance/major_upgrade/pg_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def restore_shared_preload_libraries(self):
return True

def start_old_cluster(self, config, version):
self.set_bin_dir(version)
self._new_bin_dir = self._bin_dir
self.set_bin_dir_for_version(version)
self._old_bin_dir = self._bin_dir

# make sure we don't archive wals from the old version
self._old_config_values = {'archive_mode': self.config.get('parameters').get('archive_mode')}
Expand All @@ -50,15 +52,17 @@ def get_cluster_version(self):
with open(self._version_file) as f:
return f.read().strip()

def set_bin_dir(self, version):
def set_bin_dir_for_version(self, version):
from spilo_commons import get_bin_dir
self.set_bin_dir(get_bin_dir(version))

self._old_bin_dir = self._bin_dir
self._bin_dir = get_bin_dir(version)
def set_bin_dir(self, bin_dir):
self._bin_dir = bin_dir
self._available_gucs = None

@property
def local_conn_kwargs(self):
conn_kwargs = self.config.local_connect_kwargs
conn_kwargs = self.connection_pool.conn_kwargs
conn_kwargs['options'] = '-c synchronous_commit=local -c statement_timeout=0 -c search_path='
conn_kwargs.pop('connect_timeout', None)
return conn_kwargs
Expand Down Expand Up @@ -117,9 +121,15 @@ def update_extensions(self):
for d in self._get_all_databases():
conn_kwargs['dbname'] = d
with get_connection_cursor(**conn_kwargs) as cur:
cur.execute('SELECT quote_ident(extname) FROM pg_catalog.pg_extension')
for extname in cur.fetchall():
query = 'ALTER EXTENSION {0} UPDATE'.format(extname[0])
cur.execute('SELECT quote_ident(extname), extversion FROM pg_catalog.pg_extension')
for extname, version in cur.fetchall():
# require manual update to 5.X+
if extname == 'pg_partman' and int(version[0]) < 5:
logger.warning("Skipping update of '%s' in database=%s. "
"Extension version: %s. Consider manual update",
extname, d, version)
continue
query = 'ALTER EXTENSION {0} UPDATE'.format(extname)
logger.info("Executing '%s' in the database=%s", query, d)
try:
cur.execute(query)
Expand Down Expand Up @@ -168,7 +178,7 @@ def pg_upgrade(self, check=False):
os.chdir(upgrade_dir)

pg_upgrade_args = ['-k', '-j', str(psutil.cpu_count()),
'-b', self._old_bin_dir, '-B', self._bin_dir,
'-b', self._old_bin_dir, '-B', self._new_bin_dir,
'-d', self._data_dir, '-D', self._new_data_dir,
'-O', "-c timescaledb.restoring='on'",
'-O', "-c archive_mode='off'"]
Expand All @@ -180,19 +190,23 @@ def pg_upgrade(self, check=False):
else:
self.config.write_postgresql_conf()

self.set_bin_dir(self._new_bin_dir)

logger.info('Executing pg_upgrade%s', (' --check' if check else ''))
if subprocess.call([self.pgcommand('pg_upgrade')] + pg_upgrade_args) == 0:
if check:
self.set_bin_dir(self._old_bin_dir)
os.chdir(old_cwd)
shutil.rmtree(upgrade_dir)
return True

def prepare_new_pgdata(self, version):
from spilo_commons import append_extensions

locale = self.query('SHOW lc_collate').fetchone()[0]
encoding = self.query('SHOW server_encoding').fetchone()[0]
locale = self.query('SHOW lc_collate')[0][0]
encoding = self.query('SHOW server_encoding')[0][0]
initdb_config = [{'locale': locale}, {'encoding': encoding}]
if self.query("SELECT current_setting('data_checksums')::bool").fetchone()[0]:
if self.query("SELECT current_setting('data_checksums')::bool")[0][0]:
initdb_config.append('data-checksums')

logger.info('initdb config: %s', initdb_config)
Expand All @@ -206,7 +220,9 @@ def prepare_new_pgdata(self, version):
old_version_file = self._version_file
self._version_file = os.path.join(self._data_dir, 'PG_VERSION')

self.set_bin_dir(version)
self._old_bin_dir = self._bin_dir
self.set_bin_dir_for_version(version)
self._new_bin_dir = self._bin_dir

# shared_preload_libraries for the old cluster, cleaned from incompatible/missing libs
old_shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries')
Expand Down Expand Up @@ -239,6 +255,7 @@ def prepare_new_pgdata(self, version):
self._new_data_dir, self._data_dir = self._data_dir, self._new_data_dir
self.config._postgresql_conf = old_postgresql_conf
self._version_file = old_version_file
self.set_bin_dir(self._old_bin_dir)

if old_shared_preload_libraries:
self.config.get('parameters')['shared_preload_libraries'] = old_shared_preload_libraries
Expand Down
25 changes: 25 additions & 0 deletions postgres-appliance/scripts/_zmon_schema.dump
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,29 @@ CREATE OR REPLACE VIEW zmon_utils.last_status_active_cronjobs AS SELECT * FROM z
REVOKE ALL ON TABLE zmon_utils.last_status_active_cronjobs FROM public;
GRANT SELECT ON TABLE zmon_utils.last_status_active_cronjobs TO robot_zmon;

CREATE OR REPLACE FUNCTION zmon_utils.get_replay_lag(
OUT pid integer,
OUT usesysid oid,
OUT usename name,
OUT application_name text,
OUT replay_lag interval
) RETURNS SETOF record AS
$BODY$
SELECT pid,
usesysid,
usename,
application_name,
replay_lag
FROM pg_stat_replication
ORDER BY replay_lag DESC NULLS LAST;
$BODY$
LANGUAGE sql SECURITY DEFINER STRICT SET search_path to 'pg_catalog';

CREATE OR REPLACE VIEW zmon_utils.replay_lag AS SELECT * FROM zmon_utils.get_replay_lag();

REVOKE EXECUTE ON FUNCTION zmon_utils.get_replay_lag() FROM public;
REVOKE ALL ON TABLE zmon_utils.replay_lag FROM public;

GRANT SELECT ON TABLE zmon_utils.replay_lag TO robot_zmon;

GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA zmon_utils TO robot_zmon;
7 changes: 6 additions & 1 deletion postgres-appliance/scripts/configure_spilo.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ def get_placeholders(provider):
placeholders.setdefault('CLONE_TARGET_TIME', '')
placeholders.setdefault('CLONE_TARGET_INCLUSIVE', True)

placeholders.setdefault('LOG_GROUP_BY_DATE', False)
placeholders.setdefault('LOG_SHIP_SCHEDULE', '1 0 * * *')
placeholders.setdefault('LOG_S3_BUCKET', '')
placeholders.setdefault('LOG_S3_ENDPOINT', '')
Expand Down Expand Up @@ -728,7 +729,9 @@ def get_dcs_config(config, placeholders):
config['kubernetes']['labels'] = kubernetes_labels

if not config['kubernetes'].pop('use_configmaps'):
config['kubernetes'].update({'use_endpoints': True, 'ports': [{'port': 5432, 'name': 'postgresql'}]})
config['kubernetes'].update({'use_endpoints': True,
'pod_ip': placeholders['instance_data']['ip'],
'ports': [{'port': 5432, 'name': 'postgresql'}]})
if str(config['kubernetes'].pop('bypass_api_service', None)).lower() == 'true':
config['kubernetes']['bypass_api_service'] = True
else:
Expand Down Expand Up @@ -756,6 +759,8 @@ def write_log_environment(placeholders):
log_env['LOG_AWS_REGION'] = aws_region

log_s3_key = 'spilo/{LOG_BUCKET_SCOPE_PREFIX}{SCOPE}{LOG_BUCKET_SCOPE_SUFFIX}/log/'.format(**log_env)
if os.getenv('LOG_GROUP_BY_DATE'):
log_s3_key += '{DATE}/'
log_s3_key += placeholders['instance_data']['id']
log_env['LOG_S3_KEY'] = log_s3_key

Expand Down
30 changes: 30 additions & 0 deletions postgres-appliance/scripts/metric_helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,36 @@ $$ LANGUAGE sql IMMUTABLE SECURITY DEFINER STRICT;

CREATE OR REPLACE VIEW pg_stat_statements AS SELECT * FROM pg_stat_statements(true);

CREATE OR REPLACE FUNCTION get_nearly_exhausted_sequences(
IN threshold float,
OUT schemaname name,
OUT sequencename name,
OUT seq_percent_used numeric
) RETURNS SETOF record AS
$_$
SELECT *
FROM (
SELECT
schemaname,
sequencename,
round(abs(
ceil((abs(last_value::numeric - start_value) + 1) / increment_by) /
floor((CASE WHEN increment_by > 0
THEN (max_value::numeric - start_value)
ELSE (start_value::numeric - min_value)
END + 1) / increment_by
) * 100
),
2) AS seq_percent_used
FROM pg_sequences
WHERE NOT CYCLE AND last_value IS NOT NULL
) AS s
WHERE seq_percent_used >= threshold;
$_$
LANGUAGE sql SECURITY DEFINER STRICT SET search_path to 'pg_catalog';

CREATE OR REPLACE VIEW nearly_exhausted_sequences AS SELECT * FROM get_nearly_exhausted_sequences(0.8);

REVOKE ALL ON ALL TABLES IN SCHEMA metric_helpers FROM public;
GRANT SELECT ON ALL TABLES IN SCHEMA metric_helpers TO admin, robot_zmon;

Expand Down
2 changes: 2 additions & 0 deletions postgres-appliance/scripts/upload_pg_log_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def upload_to_s3(local_file_path):
bucket = s3.Bucket(bucket_name)

key_name = os.path.join(os.getenv('LOG_S3_KEY'), os.path.basename(local_file_path))
if os.getenv('LOG_GROUP_BY_DATE'):
key_name = key_name.format(**{'DATE': os.path.basename(local_file_path).split('.')[0]})

chunk_size = 52428800 # 50 MiB
config = TransferConfig(multipart_threshold=chunk_size, multipart_chunksize=chunk_size)
Expand Down
5 changes: 0 additions & 5 deletions postgres-appliance/tests/README

This file was deleted.

21 changes: 21 additions & 0 deletions postgres-appliance/tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Run tests

After building the image, you can test your image by:

1. Setting up the environment variable `SPILO_TEST_IMAGE` to test the specific image. If unset, the default will be `spilo`.
```
export SPILO_TEST_IMAGE=<your_spilo_image>
```
2. Run the test:
```
bash test_spilo.sh
```
To enable debugging for an entire script when it runs:
```
bash -x test_spilo.sh
```
The test will create multiple containers. They will be cleaned up by the last line before running `main` in `test_spilo.sh`. To keep and debug the containers after running the test, this part can be commented.
```
trap cleanup QUIT TERM EXIT
```
2 changes: 2 additions & 0 deletions postgres-appliance/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ services:
loop_wait: 2
postgresql:
parameters:
wal_decode_buffer_size: '521kB'
wal_keep_segments: 8
jit: 'off'
postgresql:
parameters:
shared_buffers: 32MB
Expand Down
8 changes: 4 additions & 4 deletions postgres-appliance/tests/test_spilo.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ function test_spilo() {

# TEST SUITE 3
local clone16_container
clone16_container=$(start_clone_with_wale_16_container) # SCOPE=clone15 CLONE: _SCOPE=upgrade3 _PGVERSION=16 _TARGET_TIME=<next_hour
clone16_container=$(start_clone_with_wale_16_container) # SCOPE=clone15 CLONE: _SCOPE=upgrade3 _PGVERSION=16 _TARGET_TIME=<next_hour>
log_info "[TS3] Started $clone16_container for testing point-in-time recovery (clone with wal-e) with unreachable target on 13+"


Expand Down Expand Up @@ -351,7 +351,7 @@ function test_spilo() {

# TEST SUITE 5
local upgrade_replica_container
upgrade_replica_container=$(start_clone_with_wale_upgrade_replica_container) # SCOPE=upgrade
upgrade_replica_container=$(start_clone_with_wale_upgrade_replica_container) # SCOPE=upgrade
log_info "[TS5] Started $upgrade_replica_container for testing replica bootstrap with wal-e"


Expand All @@ -362,11 +362,11 @@ function test_spilo() {


# TEST SUITE 1
#run_test test_pg_upgrade_to_16_check_failed "$container" # pg_upgrade --check complains about timescaledb
# run_test test_pg_upgrade_to_16_check_failed "$container" # pg_upgrade --check complains about timescaledb

wait_backup "$container"

#drop_timescaledb "$container"
# drop_timescaledb "$container"
log_info "[TS1] Testing in-place major upgrade 15->16"
run_test test_successful_inplace_upgrade_to_16 "$container"
wait_all_streaming "$container"
Expand Down

0 comments on commit a4fbfee

Please sign in to comment.