forked from Aiven-Open/pghoard
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_walreceiver.py
129 lines (109 loc) · 4.96 KB
/
test_walreceiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import logging
import os.path
import time
from unittest import mock
import psycopg2
import pytest
from pghoard.wal import get_current_lsn
from .conftest import PGHoardForTest
from .util import switch_wal, wait_for_xlog
def get_transfer_agent_upload_xlog_state(pghoard: PGHoardForTest):
transfer_agent_state = pghoard.transfer_agent_state.get(pghoard.test_site)
if transfer_agent_state is None:
return {}
return transfer_agent_state["upload"]["xlog"]
def stop_walreceiver(pghoard: PGHoardForTest):
walreceiver = pghoard.walreceivers.pop(pghoard.test_site)
walreceiver.running = False
walreceiver.join()
return walreceiver.last_flushed_lsn
class TestWalReceiver:
@pytest.mark.parametrize("replication_slot", [None, "foobar"])
def test_walreceiver(self, db, pghoard_walreceiver, replication_slot):
"""
Test the happy-path of the wal receiver.
"""
log = logging.getLogger(__class__.__name__)
conn = db.connect()
conn.autocommit = True
pghoard = pghoard_walreceiver
node = pghoard.config["backup_sites"][pghoard.test_site]["nodes"][0]
if "slot" in node:
log.warning("using slot %s from config", node["slot"])
else:
node["slot"] = replication_slot
# The transfer agent state will be used to check what
# was uploaded
# Before starting the walreceiver, get the current wal name.
wal_name = get_current_lsn(node).walfile_name
# Start streaming, force a wal rotation, and check the wal has been
# archived
pghoard.start_walreceiver(pghoard.test_site, node, None)
switch_wal(conn)
# Check that we uploaded one file, and it is the right one.
wait_for_xlog(pghoard, 1)
last_flushed_lsn = stop_walreceiver(pghoard)
# Record the last flushed lsn
state = get_transfer_agent_upload_xlog_state(pghoard)
assert state.get("xlogs_since_basebackup") == 1
assert state.get("latest_filename") == wal_name
# Generate some more wal while the walreceiver is not running,
# and check that we can fetch it once done using the recorded state
for _ in range(3):
switch_wal(conn)
conn.close()
# The last wal file is the previous one, as the current one is not
# complete.
lsn = get_current_lsn(node)
previous_wal_name = lsn.previous_walfile_start_lsn.walfile_name
pghoard.start_walreceiver(pghoard.test_site, node, last_flushed_lsn)
wait_for_xlog(pghoard, 4)
last_flushed_lsn = stop_walreceiver(pghoard)
state = get_transfer_agent_upload_xlog_state(pghoard)
assert state.get("xlogs_since_basebackup") == 4
assert state.get("latest_filename") == previous_wal_name
@pytest.mark.timeout(60)
def test_walreceiver_database_error(self, db, pghoard_walreceiver):
"""Verify that we can recover from a DatabaseError exception
"""
# Used for monkeypatching a psycopg2 Cursor object
class FakeCursor:
_raised = False
@classmethod
@property
def raised(cls):
return cls._raised
@classmethod
def read_message(cls):
cls._raised = True
raise psycopg2.DatabaseError
conn = db.connect()
conn.autocommit = True
pghoard = pghoard_walreceiver
node = pghoard.config["backup_sites"][pghoard.test_site]["nodes"][0]
pghoard.start_walreceiver(pghoard.test_site, node, None)
# Wait for a Cursor object to be created/assigned
while pghoard.walreceivers[pghoard.test_site].c is None:
time.sleep(0.5)
# Monkeypatch method in order to raise an exception
with mock.patch.object(pghoard.walreceivers[pghoard.test_site].c, "read_message", FakeCursor.read_message):
while FakeCursor.raised is False:
time.sleep(0.5)
switch_wal(conn)
wait_for_xlog(pghoard, 1)
conn.close()
def test_walreceiver_multiple_timelines(self, recovery_db, pghoard_walreceiver_recovery):
"""As we want to fetch all timeline history files when starting up, promote a PG instance
to bump the timeline and create a history file.
"""
recovery_db.run_cmd("pg_ctl", "-D", recovery_db.pgdata, "promote")
pghoard = pghoard_walreceiver_recovery
node = pghoard.config["backup_sites"][pghoard.test_site]["nodes"][0]
pghoard.start_walreceiver(pghoard.test_site, node, None)
with recovery_db.connect() as conn:
switch_wal(conn)
wait_for_xlog(pghoard, 1)
storage = pghoard.get_or_create_site_storage(site=pghoard.test_site)
files = storage.list_path(os.path.join("test_walreceiver_multiple_timelines", "timeline"))
assert len(files) == 1
assert files[0]["name"] == "test_walreceiver_multiple_timelines/timeline/00000002.history"