Skip to content

Commit

Permalink
handle a graceful shutdown on request
Browse files Browse the repository at this point in the history
[BF-1358]
  • Loading branch information
kathia-barahona committed Jul 4, 2023
1 parent 6dce5bf commit 6a78754
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pghoard/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BaseCompressorEvent:
file_path: Path
backup_site_name: str
source_data: Union[BinaryIO, Path]
callback_queue: CallbackQueue
callback_queue: Optional[CallbackQueue]
metadata: Dict[str, str]


Expand Down
90 changes: 90 additions & 0 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
from pghoard.wal import WAL_SEG_SIZE
from pghoard.walreceiver import WALReceiver
from pghoard.webserver import WebServer

Expand Down Expand Up @@ -934,8 +935,97 @@ def _get_all_threads(self):
all_threads.extend(self.transfer_agents)
return all_threads

def _rename_partial_files_if_completed(self, site: str, wal_location: str) -> None:
"""
Check for remaining partial WAL files generated by pg_receivewal, in case the partial file is completed,
it will be renamed.
"""
# consider only the last partial file (pg_receivewal should only generate one)
last_partial_file = None
for partial_file in os.listdir(wal_location):
if not partial_file.endswith(".partial"):
continue

if not last_partial_file or partial_file > last_partial_file:
last_partial_file = partial_file

if last_partial_file is None:
return

# check if the partial file needs to be renamed
partial_file_path = os.path.join(wal_location, last_partial_file)
renamed_partial_file_path = partial_file_path.replace(".partial", "")
file_stats = os.stat(os.path.join(wal_location, last_partial_file))

if file_stats.st_size != WAL_SEG_SIZE:
# TODO: handle partial files that are incompleted
return

# this will not trigger inotify
os.rename(partial_file_path, renamed_partial_file_path)

compression_event = CompressionEvent(
file_type=FileType.Wal,
file_path=FileTypePrefixes[FileType.Wal] / last_partial_file.replace(".partial", ""),
delete_file_after_compression=True,
backup_site_name=site,
source_data=Path(renamed_partial_file_path),
callback_queue=None,
metadata={}
)
self.compression_queue.put(compression_event)

def _wait_for_queue_to_be_emptied(
self,
queue: Queue,
queue_name: str,
timeout: Optional[int] = None,
) -> None:
start = time.monotonic()
while True:
if queue.empty():
self.log.info("%r queue has been emptied.", queue_name)
break

if timeout is not None and time.monotonic() - start > timeout:
self.log.warning("Exceeded waiting time for %r queue to be emptied", queue_name)
break

time.sleep(0.1)

def handle_exit_signal(self, _signal=None, _frame=None): # pylint: disable=unused-argument
self.log.warning("Quitting, signal: %r", _signal)
if _signal == signal.SIGTERM:
self.graceful_shutdown()
else:
self.quit()

def graceful_shutdown(self) -> None:
"""
Makes sure all missing files are compressed, uploaded and deleted. Also handles completed partial files
that might have not been renamed after shutting down receivexlogs.
Steps to follow:
- Shutdown receivexlogs and walreceivers threads
- Check for partial segments and rename them (if completed)
- Wait for compression, transfer and deletion queues to be empty
- Quit (stop remaining threads and write state file)
"""
self.log.info("Gracefully shutting down...")
self.running = False
for site, thread in {**self.receivexlogs, **self.walreceivers}.items():
thread.running = False

if not isinstance(thread, PGReceiveXLog):
continue

self._rename_partial_files_if_completed(site=site, wal_location=thread.wal_location)

# wait for all queues to be emptied
self._wait_for_queue_to_be_emptied(self.compression_queue, "compression")
self._wait_for_queue_to_be_emptied(self.transfer_queue, "transfer")
self._wait_for_queue_to_be_emptied(self.wal_file_deletion_queue, "wal_file_deletion")

self.quit()

def quit(self):
Expand Down
50 changes: 49 additions & 1 deletion test/test_pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@
import time
from pathlib import Path
from typing import Any, Dict
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

import pytest

import pghoard.pghoard as pghoard_module
from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file)
from pghoard.pghoard import PGHoard
from pghoard.pgutil import create_connection_string
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent
from pghoard.wal import WAL_SEG_SIZE

from .base import PGHoardTestCase
from .util import dict_to_tar_file, switch_wal, wait_for_xlog
Expand Down Expand Up @@ -755,6 +758,51 @@ def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType,
upload_event = self.pghoard.transfer_queue.get(timeout=1.0)
assert upload_event.file_type == file_type

@patch("pghoard.compressor.wal.verify_wal", Mock())
@patch.object(PGReceiveXLog, "run", Mock())
@patch.object(TransferAgent, "get_object_storage")
@pytest.mark.parametrize("is_completed", (True, False))
def test_graceful_shutdown_with_partial_wal_file(
self,
mocked_get_object_storage: MagicMock,
is_completed: bool,
) -> None:
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
uncompressed_wal_path = compressed_wal_path + "_incoming"

self.config["backup_sites"][self.test_site]["active_backup_mode"] = "pg_receivexlog"

self.pghoard.receivexlog_listener(
self.test_site, self.config["backup_sites"][self.test_site]["nodes"][0], uncompressed_wal_path
)

assert len(self.pghoard.receivexlogs) == 1

partial_file_name = "000000010000000000000008.partial"

# generate a .partial file
with open(os.path.join(uncompressed_wal_path, partial_file_name), "wb") as fp:
if is_completed:
fp.seek(WAL_SEG_SIZE - 1)
fp.write(b"\0")

# run compressors, transfer_agents and wal_file_deleter
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
thread.start()

self.pghoard.graceful_shutdown()

assert self.pghoard.compression_queue.qsize() == 0
assert self.pghoard.transfer_queue.qsize() == 0
assert self.pghoard.wal_file_deletion_queue.qsize() == 0

# called once for uploading renamed partial file
assert mocked_get_object_storage.call_count == (1 if is_completed else 0)

# verify compressors, transfer_agents and wal_file_deleter are not running
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
assert thread.is_alive() is False


class TestPGHoardWithPG:
def test_auth_alert_files(self, db, pghoard):
Expand Down

0 comments on commit 6a78754

Please sign in to comment.