Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle a graceful shutdown on request #594

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pghoard/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BaseCompressorEvent:
file_path: Path
backup_site_name: str
source_data: Union[BinaryIO, Path]
callback_queue: CallbackQueue
callback_queue: Optional[CallbackQueue]
metadata: Dict[str, str]


Expand Down
42 changes: 42 additions & 0 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,8 +983,50 @@ def _get_all_threads(self):
all_threads.extend(self.transfer_agents)
return all_threads

def _wait_for_queue_to_be_emptied(
self,
queue: Queue,
queue_name: str,
timeout: Optional[int] = None,
) -> None:
start = time.monotonic()
while True:
if queue.empty():
self.log.info("%r queue has been emptied.", queue_name)
break

if timeout is not None and time.monotonic() - start > timeout:
self.log.warning("Exceeded waiting time for %r queue to be emptied", queue_name)
break

time.sleep(0.1)

def handle_exit_signal(self, _signal=None, _frame=None): # pylint: disable=unused-argument
self.log.warning("Quitting, signal: %r", _signal)
if _signal == signal.SIGTERM:
self.graceful_shutdown()
else:
self.quit()

def graceful_shutdown(self) -> None:
"""
Makes sure all missing files are compressed, uploaded and deleted before all threads are inactive.

Steps to follow:
- Shutdown receivexlogs and walreceivers threads
- Wait for compression, transfer and deletion queues to be empty
- Quit (stop remaining threads and write state file)
"""
self.log.info("Gracefully shutting down...")
self.running = False
for thread in [*self.receivexlogs.values(), *self.walreceivers.values()]:
thread.running = False

# wait for all queues to be emptied
self._wait_for_queue_to_be_emptied(self.compression_queue, "compression")
self._wait_for_queue_to_be_emptied(self.transfer_queue, "transfer")
self._wait_for_queue_to_be_emptied(self.wal_file_deletion_queue, "wal_file_deletion")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment wal_file_deletion_queue is not super high prio, but makes sure we do not re-compress or re-upload files when pghoard is restarted (startup_walk)


self.quit()

def quit(self):
Expand Down
56 changes: 54 additions & 2 deletions test/test_pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
import time
from pathlib import Path
from typing import Any, Dict
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

import pytest

import pghoard.pghoard as pghoard_module
from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file)
from pghoard.common import (
BaseBackupFormat, FileType, FileTypePrefixes, create_alert_file, delete_alert_file, write_json_file
)
from pghoard.compressor import CompressionEvent
from pghoard.pghoard import PGHoard
from pghoard.pgutil import create_connection_string
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent

from .base import PGHoardTestCase
from .util import dict_to_tar_file, switch_wal, wait_for_xlog
Expand Down Expand Up @@ -819,6 +824,53 @@ def test_startup_walk_skip_compression_if_already_compressed(
# uncompressed timeline files are not added to deletion queue, they are immediately unlinked
assert self.pghoard.wal_file_deletion_queue.qsize() == 0

@patch("pghoard.compressor.wal.verify_wal", Mock())
@patch.object(PGReceiveXLog, "run", Mock())
@patch.object(TransferAgent, "get_object_storage")
def test_graceful_shutdown(
self,
mocked_get_object_storage: MagicMock,
) -> None:
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
uncompressed_wal_path = compressed_wal_path + "_incoming"

file_name = "000000010000000000000008"
uncompressed_file_path = os.path.join(uncompressed_wal_path, file_name)
with open(uncompressed_file_path, "wb") as fp:
fp.write(b"foo")

self.pghoard.compression_queue.put(
CompressionEvent(
file_type=FileType.Wal,
file_path=FileTypePrefixes[FileType.Wal] / file_name,
delete_file_after_compression=True,
backup_site_name=self.test_site,
source_data=Path(uncompressed_file_path),
callback_queue=None,
metadata={}
)
)

# run compressors, transfer_agents and wal_file_deleter
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
thread.start()

self.pghoard.graceful_shutdown()

assert self.pghoard.compression_queue.qsize() == 0
assert self.pghoard.transfer_queue.qsize() == 0
assert self.pghoard.wal_file_deletion_queue.qsize() == 0

# called once for uploading renamed partial file
assert mocked_get_object_storage.call_count == 1

# uncompressed file should still exist since WALDeletionThread always keeps last file
assert os.path.exists(uncompressed_file_path)

# verify compressors, transfer_agents and wal_file_deleter are not running
for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]:
assert thread.is_alive() is False


class TestPGHoardWithPG:
def test_auth_alert_files(self, db, pghoard):
Expand Down