Skip to content

Commit

Permalink
handle a graceful shutdown on request
Browse files Browse the repository at this point in the history
[BF-1358]
  • Loading branch information
kathia-barahona committed Jul 6, 2023
1 parent 87a01e9 commit b9e7bfb
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pghoard/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BaseCompressorEvent:
file_path: Path
backup_site_name: str
source_data: Union[BinaryIO, Path]
callback_queue: CallbackQueue
callback_queue: Optional[CallbackQueue]
metadata: Dict[str, str]


Expand Down
42 changes: 42 additions & 0 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,8 +983,50 @@ def _get_all_threads(self):
all_threads.extend(self.transfer_agents)
return all_threads

def _wait_for_queue_to_be_emptied(
self,
queue: Queue,
queue_name: str,
timeout: Optional[int] = None,
) -> None:
start = time.monotonic()
while True:
if queue.empty():
self.log.info("%r queue has been emptied.", queue_name)
break

if timeout is not None and time.monotonic() - start > timeout:
self.log.warning("Exceeded waiting time for %r queue to be emptied", queue_name)
break

time.sleep(0.1)

def handle_exit_signal(self, _signal=None, _frame=None): # pylint: disable=unused-argument
self.log.warning("Quitting, signal: %r", _signal)
if _signal == signal.SIGTERM:
self.graceful_shutdown()
else:
self.quit()

def graceful_shutdown(self) -> None:
"""
Makes sure all missing files are compressed, uploaded and deleted before all threads are inactive.
Steps to follow:
- Shutdown receivexlogs and walreceivers threads
- Wait for compression, transfer and deletion queues to be empty
- Quit (stop remaining threads and write state file)
"""
self.log.info("Gracefully shutting down...")
self.running = False
for thread in [*self.receivexlogs.values(), *self.walreceivers.values()]:
thread.running = False

# wait for all queues to be emptied
self._wait_for_queue_to_be_emptied(self.compression_queue, "compression")
self._wait_for_queue_to_be_emptied(self.transfer_queue, "transfer")
self._wait_for_queue_to_be_emptied(self.wal_file_deletion_queue, "wal_file_deletion")

self.quit()

def quit(self):
Expand Down
56 changes: 54 additions & 2 deletions test/test_pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
import time
from pathlib import Path
from typing import Any, Dict
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

import pytest

import pghoard.pghoard as pghoard_module
from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file)
from pghoard.common import (
BaseBackupFormat, FileType, FileTypePrefixes, create_alert_file, delete_alert_file, write_json_file
)
from pghoard.compressor import CompressionEvent
from pghoard.pghoard import PGHoard
from pghoard.pgutil import create_connection_string
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent

from .base import PGHoardTestCase
from .util import dict_to_tar_file, switch_wal, wait_for_xlog
Expand Down Expand Up @@ -819,6 +824,53 @@ def test_startup_walk_skip_compression_if_already_compressed(
# uncompressed timeline files are not added to deletion queue, they are immediately unlinked
assert self.pghoard.wal_file_deletion_queue.qsize() == 0

@patch("pghoard.compressor.wal.verify_wal", Mock())
@patch.object(PGReceiveXLog, "run", Mock())
@patch.object(TransferAgent, "get_object_storage")
def test_graceful_shutdown(
self,
mocked_get_object_storage: MagicMock,
) -> None:
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
uncompressed_wal_path = compressed_wal_path + "_incoming"

file_name = "000000010000000000000008"
uncompressed_file_path = os.path.join(uncompressed_wal_path, file_name)
with open(uncompressed_file_path, "wb") as fp:
fp.write(b"foo")

self.pghoard.compression_queue.put(
CompressionEvent(
file_type=FileType.Wal,
file_path=FileTypePrefixes[FileType.Wal] / file_name,
delete_file_after_compression=True,
backup_site_name=self.test_site,
source_data=Path(uncompressed_file_path),
callback_queue=None,
metadata={}
)
)

# run compressors, transfer_agents and wal_file_deleter
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
thread.start()

self.pghoard.graceful_shutdown()

assert self.pghoard.compression_queue.qsize() == 0
assert self.pghoard.transfer_queue.qsize() == 0
assert self.pghoard.wal_file_deletion_queue.qsize() == 0

# called once for uploading renamed partial file
assert mocked_get_object_storage.call_count == 1

# uncompressed file should still exist since WALDeletionThread always keeps last file
assert os.path.exists(uncompressed_file_path)

# verify compressors, transfer_agents and wal_file_deleter are not running
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
assert thread.is_alive() is False


class TestPGHoardWithPG:
def test_auth_alert_files(self, db, pghoard):
Expand Down

0 comments on commit b9e7bfb

Please sign in to comment.