From f075283dd80da2667b63f4467cf9439108415e8f Mon Sep 17 00:00:00 2001 From: Maksim Novikov Date: Thu, 9 Dec 2021 15:07:56 +0100 Subject: [PATCH] Recreate reader if journal files is deleted To allow this changes we always keep journal reader registered with poller. For every even received we keep a lookup table of said events, that should be processed at later stage. There are 2 relevant types of events: * journal.APPEND - when lines are added to the end of the file * journal.INVALIDATE - log file rotated or removed For APPEND event we need to read all the data in the log until it's exhauisted For INVALIDATE event data should be also read until the end and the reader should be recreated to release fds of deleted files. This fixes resource leak which happened if log file was first renamed and then removed, which usually is the case when rotation happens. --- journalpump/journalpump.py | 278 +++++++++++---- journalpump/senders/base.py | 4 + pytest.ini | 3 + scripts/generate_logs.py | 204 +++++++++++ systest/__init__.py | 0 systest/data/rotated_logs/rotated_logs.txt | 45 +++ ...-0000000000000003-0005d4206c27cc3b.journal | Bin 0 -> 524288 bytes ...-000000000000000e-0005d4206c28150a.journal | Bin 0 -> 524288 bytes ...-0000000000000019-0005d4206c285fc5.journal | Bin 0 -> 524288 bytes ...-0000000000000024-0005d4206c28a26f.journal | Bin 0 -> 524288 bytes systest/test_rotation.py | 325 ++++++++++++++++++ systest/test_rsyslog.py | 18 +- systest/util.py | 16 + test/test_journalpump.py | 4 +- 14 files changed, 814 insertions(+), 83 deletions(-) create mode 100644 pytest.ini create mode 100644 scripts/generate_logs.py create mode 100644 systest/__init__.py create mode 100644 systest/data/rotated_logs/rotated_logs.txt create mode 100644 systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000003-0005d4206c27cc3b.journal create mode 100644 systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-000000000000000e-0005d4206c28150a.journal create mode 100644 systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000019-0005d4206c285fc5.journal create mode 100644 systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000024-0005d4206c28a26f.journal create mode 100644 systest/test_rotation.py create mode 100644 systest/util.py diff --git a/journalpump/journalpump.py b/journalpump/journalpump.py index 01ab427..992d488 100644 --- a/journalpump/journalpump.py +++ b/journalpump/journalpump.py @@ -12,8 +12,8 @@ from .types import GeoIPProtocol from .util import atomic_replace_file, default_json_serialization from functools import reduce -from systemd.journal import Reader -from typing import Type, Union +from systemd import journal +from typing import cast, NamedTuple, Optional, Type, Union import copy import datetime @@ -21,7 +21,6 @@ import logging import re import select -import systemd.journal import time import uuid @@ -31,6 +30,8 @@ except ImportError: GeoIPReader = None +_5_MB = 5 * 1024 * 1024 + def _convert_uuid(s): return str(uuid.UUID(s.decode())) @@ -64,13 +65,23 @@ def convert_realtime(t): } +class SingleMessageReadResult(NamedTuple): + has_more: bool + bytes_read: int + + +class MessagesReadResult(NamedTuple): + exhausted: bool + lines_read: int + + class JournalObject: def __init__(self, cursor=None, entry=None): self.cursor = cursor self.entry = entry or {} -class PumpReader(Reader): +class PumpReader(journal.Reader): def convert_entry(self, entry): """Faster journal lib _convert_entry replacement""" output = {} @@ -127,6 +138,7 @@ def __init__( tags=None, seek_to=None, msg_buffer_max_length=50000, + msg_buffer_max_bytes=_5_MB, searches=None, initial_position=None ): @@ -134,6 +146,7 @@ def __init__( self.log = logging.getLogger("JournalReader:{}".format(name)) self.name = name self.msg_buffer_max_length = msg_buffer_max_length + self.msg_buffer_max_bytes = msg_buffer_max_bytes self.initial_position = initial_position self.read_bytes = 0 self.read_lines = 0 @@ -147,7 +160,6 @@ def __init__( self.field_filters = field_filters self.stats = stats self.cursor = seek_to - self.registered_for_poll = False self.journald_reader = None self.last_journald_create_attempt = 0 self.running = True @@ -156,40 +168,53 @@ def __init__( self.last_stats_send_time = time.monotonic() self.last_journal_msg_time = time.monotonic() self.searches = list(self._build_searches(searches)) + self._is_ready = True + + def invalidate(self) -> None: + """ + Should be called when reader has stale information about files present + e.g. some journal files were deleted and we need to release file descriptors. + """ + if self.journald_reader: + self.journald_reader.close() + self.journald_reader = None - def create_journald_reader_if_missing(self): - if not self.journald_reader and time.monotonic() - self.last_journald_create_attempt > 2: + def fileno(self) -> Optional[int]: + return self.journald_reader and self.journald_reader.fileno() + + def create_journald_reader_if_missing(self) -> None: + if (not self.journald_reader and time.monotonic() - self.last_journald_create_attempt > 2): self.last_journald_create_attempt = time.monotonic() self.journald_reader = self.get_reader(seek_to=self.cursor) - def update_poll_registration_status(self, poller): + @property + def is_ready(self) -> bool: + return bool(self.senders and self.journald_reader and self._is_ready) + + def get_write_limit_bytes(self) -> int: + return self.msg_buffer_max_bytes - max(s.msg_buffer.buffer_size for s in self.senders.values()) + + def get_write_limit_message_count(self) -> int: + return self.msg_buffer_max_length - max(len(s.msg_buffer) for s in self.senders.values()) + + def update_status(self): self.create_journald_reader_if_missing() - if not self.journald_reader: - return + sender_over_limit = self.get_write_limit_message_count() <= 0 + if self.msg_buffer_max_bytes: + sender_over_limit |= self.get_write_limit_bytes() <= 0 - sender_over_limit = any(len(sender.msg_buffer) > self.msg_buffer_max_length for sender in self.senders.values()) - if not self.registered_for_poll and not sender_over_limit: + if not self._is_ready and not sender_over_limit: self.log.info( - "Message buffer size under threshold for all senders, starting processing journal for %r", self.name + "Message buffer size under threshold for all senders, ready to process journal for %r", + self.name, ) - self.register_for_poll(poller) - elif self.registered_for_poll and sender_over_limit: + self._is_ready = True + elif self._is_ready and sender_over_limit: self.log.info( - "Message buffer size for at least one sender over threshold, stopping processing journal for %r", self.name + "Message buffer size for at least one sender over threshold, pause processing journal for %r", + self.name, ) - self.unregister_from_poll(poller) - - def register_for_poll(self, poller): - if self.journald_reader: - poller.register(self.journald_reader, self.journald_reader.get_events()) - self.registered_for_poll = True - self.log.info("Registered reader %r with fd %r", self.name, self.journald_reader.fileno()) - - def unregister_from_poll(self, poller): - if self.journald_reader and self.registered_for_poll: - poller.unregister(self.journald_reader) - self.registered_for_poll = False - self.log.info("Unregistered reader %r with fd %r", self.name, self.journald_reader.fileno()) + self._is_ready = False def get_resume_cursor(self): """Find the sender cursor location where a new JournalReader instance should resume reading from""" @@ -201,7 +226,10 @@ def get_resume_cursor(self): state = sender.get_state() cursor = state["sent"]["cursor"] if cursor is None: - self.log.info("Sender %r needs a full catchup from beginning, resuming from journal start", sender_name) + self.log.warning( + "Sender %r needs a full catchup from beginning, resuming from journal start", + sender_name, + ) return None # TODO: pick oldest sent cursor @@ -235,7 +263,10 @@ def initialize_senders(self): extra_field_values = sender_config.get("extra_field_values", {}) if not isinstance(extra_field_values, dict): - self.log.warning("extra_field_values: %r not a dictionary object, ignoring", extra_field_values) + self.log.warning( + "extra_field_values: %r not a dictionary object, ignoring", + extra_field_values, + ) extra_field_values = {} try: sender = sender_class( @@ -289,7 +320,11 @@ def inc_line_stats(self, *, journal_lines, journal_bytes): self._sent_lines_diff += self.read_lines - self._last_sent_read_lines self._sent_bytes_diff += self.read_bytes - self._last_sent_read_bytes if now - self.last_stats_print_time > 120: - self.log.info("Processed %r journal lines (%r bytes)", self._sent_lines_diff, self._sent_bytes_diff) + self.log.info( + "Processed %r journal lines (%r bytes)", + self._sent_lines_diff, + self._sent_bytes_diff, + ) self._sent_lines_diff = 0 self._sent_bytes_diff = 0 self.last_stats_print_time = now @@ -301,7 +336,10 @@ def inc_line_stats(self, *, journal_lines, journal_bytes): sender.refresh_stats() def read_next(self): - jobject = next(self.journald_reader) + if not self.journald_reader: + return None + + jobject: JournalObject = cast(JournalObject, next(self.journald_reader)) if jobject.cursor: self.cursor = jobject.cursor self.last_journal_msg_time = time.monotonic() @@ -324,12 +362,14 @@ def get_reader(self, seek_to=None, reinit=False): journal_flags = reduce( lambda a, b: a | b, - [getattr(systemd.journal, flag.strip()) for flag in journal_flags], + [getattr(journal, flag.strip()) for flag in journal_flags], ) try: reader_kwargs = dict( - files=self.config.get("journal_files"), flags=journal_flags, path=self.config.get("journal_path") + files=self.config.get("journal_files"), + flags=journal_flags, + path=self.config.get("journal_path"), ) namespace = self.config.get("journal_namespace") if namespace: @@ -343,7 +383,12 @@ def get_reader(self, seek_to=None, reinit=False): self.journald_reader = None return None except FileNotFoundError as ex: - self.log.warning("journal for %r not available yet: %s: %s", self.name, ex.__class__.__name__, ex) + self.log.warning( + "journal for %r not available yet: %s: %s", + self.name, + ex.__class__.__name__, + ex, + ) return None if seek_to: @@ -487,7 +532,7 @@ def __init__(self, jobject, reader, pump): self.pump = pump self.reader = reader - def process(self): + def process(self) -> SingleMessageReadResult: new_entry = {} for key, value in self.jobject.entry.items(): if isinstance(value, bytes): @@ -497,24 +542,25 @@ def process(self): if self.jobject.cursor is None: self.log.debug("No more journal entries to read") - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) if self.reader.searches: if not self.reader.perform_searches(self.jobject): - return True + return SingleMessageReadResult(has_more=True, bytes_read=0) if not self.pump.check_match(new_entry): - return True + return SingleMessageReadResult(has_more=True, bytes_read=0) for sender in self.reader.senders.values(): json_entry = self._get_or_generate_json(sender.field_filter, sender.extra_field_values, new_entry) sender.msg_buffer.add_item(item=json_entry, cursor=self.jobject.cursor) + max_bytes = 0 if self.json_objects: max_bytes = max(len(v) for v in self.json_objects.values()) self.reader.inc_line_stats(journal_bytes=max_bytes, journal_lines=1) - return True + return SingleMessageReadResult(has_more=True, bytes_read=max_bytes) def _get_or_generate_json(self, field_filter, extra_field_values, data): ff_name = "" if field_filter is None else field_filter.name @@ -545,10 +591,11 @@ def _truncate_long_message(self, json_entry): error = "too large message {} bytes vs maximum {} bytes".format(len(json_entry), MAX_KAFKA_MESSAGE_SIZE) if not self.error_reported: self.pump.stats.increase( - "journal.read_error", tags=self.pump.make_tags({ + "journal.read_error", + tags=self.pump.make_tags({ "error": "too_long", "reader": self.reader.name, - }) + }), ) self.log.warning("%s: %s ...", error, json_entry[:1024]) self.error_reported = True @@ -560,6 +607,8 @@ def _truncate_long_message(self, json_entry): class JournalPump(ServiceDaemon, Tagged): + _STALE_FD = object() + def __init__(self, config_path): Tagged.__init__(self) self.stats = None @@ -575,6 +624,8 @@ def __init__(self, config_path): self.configure_field_filters() self.configure_readers() self.stale_readers = set() + self.reader_by_fd = {} + self.poll_interval_ms = 1000 def configure_field_filters(self): filters = self.config.get("field_filters", {}) @@ -594,20 +645,29 @@ def configure_readers(self): self.readers = {} state = self.load_state() + for reader_name, reader_config in new_config.items(): reader_state = state.get("readers", {}).get(reader_name, {}) resume_cursor = None for sender_name, sender in reader_state.get("senders", {}).items(): sender_cursor = sender["sent"]["cursor"] if sender_cursor is None: - self.log.info("Sender %r for reader %r needs full sync from beginning", sender_name, reader_name) + self.log.info( + "Sender %r for reader %r needs full sync from beginning", + sender_name, + reader_name, + ) resume_cursor = None break # TODO: pick the OLDEST cursor resume_cursor = sender_cursor - self.log.info("Reader %r resuming from cursor position: %r", reader_name, resume_cursor) + self.log.info( + "Reader %r resuming from cursor position: %r", + reader_name, + resume_cursor, + ) initial_position = reader_config.get("initial_position") reader = JournalReader( name=reader_name, @@ -616,6 +676,7 @@ def configure_readers(self): geoip=self.geoip, stats=self.stats, msg_buffer_max_length=self.config.get("msg_buffer_max_length", 50000), + msg_buffer_max_bytes=self.config.get("msg_buffer_max_bytes", _5_MB), seek_to=resume_cursor, initial_position=initial_position, tags=self.make_tags(), @@ -652,7 +713,7 @@ def shutdown(self): for reader in self.readers.values(): reader.request_stop() - reader.unregister_from_poll(self.poller) + self.unregister_from_poll(reader) self.stale_readers.add(reader) def sigterm(self, signum, frame): @@ -677,31 +738,53 @@ def check_match(self, entry): return True return False - def read_single_message(self, reader): + def read_single_message(self, reader) -> SingleMessageReadResult: try: - jobject = reader.read_next() + jobject: Optional[JournalObject] = reader.read_next() if jobject is None or jobject.entry is None: - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) return JournalObjectHandler(jobject, reader, self).process() except StopIteration: self.log.debug("No more journal entries to read") - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) except Exception as ex: # pylint: disable=broad-except self.log.exception("Unexpected exception while handling entry for %s", reader.name) self.stats.unexpected_exception(ex=ex, where="mainloop", tags=self.make_tags({"app": "journalpump"})) time.sleep(0.5) - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) - def read_all_available_messages(self, reader, hits): + def read_messages(self, reader, hits, chunk_size) -> MessagesReadResult: lines = 0 - while self.read_single_message(reader): + + exhausted = False + + limit_bytes = reader.get_write_limit_bytes() + limit_count = reader.get_write_limit_message_count() + + for _ in range(chunk_size): + if limit_bytes <= 0: + return MessagesReadResult(exhausted=False, lines_read=lines) + + if limit_count <= 0: + return MessagesReadResult(exhausted=False, lines_read=lines) + + has_more, bytes_read = self.read_single_message(reader) + + if bytes_read: + limit_bytes -= bytes_read + limit_count -= 1 + + if not has_more: + exhausted = True + break + lines += 1 for search in reader.searches: hits[search["name"]] = search.get("hits", 0) - return lines + return MessagesReadResult(exhausted=exhausted, lines_read=lines) def get_state_file_path(self): return self.config.get("json_state_file_path") @@ -727,28 +810,92 @@ def _close_stale_readers(self): while self.stale_readers: reader = self.stale_readers.pop() reader.close() + self.unregister_from_poll(reader) + + def register_for_poll(self, reader: JournalReader) -> bool: + fd = reader.fileno() + # fd in this case is anonymous inotify node + if fd is not None and fd not in self.reader_by_fd: + self.log.info("Registered reader %r with fd %r", self.name, fd) + self.poller.register(fd) + self.reader_by_fd[fd] = reader + return True + + return False + + def unregister_from_poll(self, reader: JournalReader) -> None: + fd = reader.fileno() + if fd is not None and fd in self.reader_by_fd: + self.poller.unregister(fd) + self.reader_by_fd[fd] = self._STALE_FD + self.log.info("Unregistered reader %r with fd %r", self.name, fd) def run(self): last_stats_time = 0 + poll_timeout = 0 + buffered_events = {} + while self.running: self._close_stale_readers() - results = self.poller.poll(1000) + self.log.debug("Waiting for %dms", poll_timeout) + results = self.poller.poll(poll_timeout) + iteration_start_time = time.monotonic_ns() + hits = {} lines = 0 - for fd, _event in results: - for reader in self.readers.values(): - jdr = reader.journald_reader - if not jdr or fd != jdr.fileno(): - continue - if jdr.process() == systemd.journal.APPEND: - lines += self.read_all_available_messages(reader, hits) - break - else: + # We keep valid events in case reader is busy with processing + # previous batch and not ready to act on new one + + for fd, _ in results: + reader = self.reader_by_fd.get(fd) + if reader is self._STALE_FD: + continue + + if reader is None: self.log.error("Could not find reader with fd %r", fd) + continue + + # Call to process clears state, consecutive calls won't return same value + process_evt = reader.journald_reader.process() + # Can be either 0 - journal.NOP, 1 - journal.APPEND or 2 - journal.INVALIDATE + current_evt = buffered_events.get(reader, journal.NOP) + buffered_events[reader] = max(current_evt, process_evt) + + # Remove stale reader, it should happen after we processed + # batch of events received from poll, because we could + # have received stale fds + for key, value in list(self.reader_by_fd.items()): + if value is self._STALE_FD: + del self.reader_by_fd[key] + + for reader, event in list(buffered_events.items()): + if not reader.is_ready: + continue + + # Read messages up to set reader limit + exhausted = False + if event in [journal.APPEND, journal.INVALIDATE]: + exhausted, line_count = self.read_messages(reader, hits, chunk_size=5000) + lines += line_count + + if event == journal.INVALIDATE: + self.unregister_from_poll(reader) + # If file was deleted we could lose some unread messages here + # but if we are so behind that we reading tail file, we probably + # will never catch up and we want to avoid holding descriptors to + # deleted files + reader.invalidate() + + # Only delete set event when we know that all messages were drained + if exhausted: + buffered_events.pop(reader) for reader in self.readers.values(): - reader.update_poll_registration_status(self.poller) + reader.update_status() + if self.register_for_poll(reader): + # Check if there is something to read in newly created reader + buffered_events[reader] = journal.APPEND if hits and time.monotonic() - last_stats_time > 60.0: self.log.info("search hits stats: %s", hits) @@ -767,6 +914,7 @@ def run(self): self.log.debug("No new journal lines received") self.ping_watchdog() + poll_timeout = max(0, self.poll_interval_ms - (time.monotonic_ns() - iteration_start_time) // 1000) self._close_stale_readers() diff --git a/journalpump/senders/base.py b/journalpump/senders/base.py index 02142d4..bd8d25b 100644 --- a/journalpump/senders/base.py +++ b/journalpump/senders/base.py @@ -2,6 +2,7 @@ import logging import random +import sys import time KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30 @@ -45,6 +46,7 @@ def __init__(self): self.total_size = 0 self.last_journal_msg_time = time.monotonic() self.cursor = None + self.buffer_size = 0 def __len__(self): return len(self.messages) @@ -55,6 +57,7 @@ def get_items(self): if self.messages: messages = self.messages self.messages = [] + self.buffer_size = 0 return messages def add_item(self, *, item, cursor): @@ -62,6 +65,7 @@ def add_item(self, *, item, cursor): self.messages.append((item, cursor)) self.last_journal_msg_time = time.monotonic() self.cursor = cursor + self.buffer_size += sys.getsizeof(item) self.entry_num += 1 self.total_size += len(item) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..534a692 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +log_format = %(asctime)s.%(msecs)03d %(levelname)s %(message)s +log_date_format = %Y-%m-%d %H:%M:%S diff --git a/scripts/generate_logs.py b/scripts/generate_logs.py new file mode 100644 index 0000000..6168b60 --- /dev/null +++ b/scripts/generate_logs.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import multiprocessing +import os +import pathlib +import shutil +import socket +import subprocess +import sys +import tempfile +import time +from typing import Optional, Dict + +MESSAGE_DEFAULTS: Dict[str, str] = {"PRIORITY": "6", "SYSLOG_IDENTIFIER": "journald-gen-logs"} + + +def _encode_message(data: Dict[str, str]) -> bytes: + """Encode to journald native protocol message. + + >>> _encode_message({"MEESAGE": "Something happened"}) + b"PRIORITY=6\nSYSLOG_IDENTIFIER=journald-gen-logs\nMESSAGE=Something happened.\n" + """ + message = MESSAGE_DEFAULTS.copy() + message.update(data) + + result = [] + for key, value in message.items(): + result.append(b"%s=%s" % (key.encode("utf-8"), str(value).encode("utf-8"))) + + return b"\n".join(result) + b"\n" + + +def _message_sender(uid: int, message_socket_path: str, queue: multiprocessing.JoinableQueue): + """Send messages to journald using native protocol. + + NB. Message send in a separate process to be able to write to the socket as non-root user. + and get user-.journal entries instead of just syslog.journal ones + """ + os.setuid(uid) + + s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + s.connect(message_socket_path) + + while msg := queue.get(): + s.sendall(_encode_message(msg)) + queue.task_done() + + # ACK None + queue.task_done() + + +class JournalControlProcess: + CONTROL_SOCKET_NAME = "io.systemd.journal" + MESSAGE_SOCKET_NAME = "socket" + JOURNALD_BIN = "/usr/lib/systemd/systemd-journald" + + def __init__(self, *, logs_dir: pathlib.Path, uid: int) -> None: + self._logs_dir: pathlib.Path = logs_dir + self._runtime_dir: Optional[pathlib.Path] = None + self._journald_process: Optional[subprocess.Popen] = None + self._sender_process: Optional[multiprocessing.Process] = None + self._sender_queue = multiprocessing.JoinableQueue() + self._uid = uid + + @property + def _control_socket_path(self) -> str: + assert self._runtime_dir + return str(self._runtime_dir / self.CONTROL_SOCKET_NAME) + + @property + def _message_socket_path(self) -> str: + assert self._runtime_dir + return str(self._runtime_dir / self.MESSAGE_SOCKET_NAME) + + def _start_journald(self) -> subprocess.Popen: + assert self._runtime_dir + + environment = { + "LOGS_DIRECTORY": self._logs_dir, + "RUNTIME_DIRECTORY": self._runtime_dir, + } + journald_process = subprocess.Popen([self.JOURNALD_BIN, "test"], + env=environment, + stdout=sys.stdout, + stderr=sys.stdout) + + cur = time.monotonic() + deadline = cur + 3 + + while cur < deadline: + files = {f.name for f in self._runtime_dir.iterdir()} + if self.CONTROL_SOCKET_NAME in files and self.MESSAGE_SOCKET_NAME in files: + break + time.sleep(0.1) + cur = time.monotonic() + + return journald_process + + def rotate(self) -> None: + """Ask journald to rotate logs and wait for the result.""" + assert self._journald_process + + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(self._control_socket_path) + s.sendall(b'{"method": "io.systemd.Journal.Synchronize"}\0') + s.recv(100) + s.sendall(b'{"method": "io.systemd.Journal.Rotate"}\0') + s.recv(100) + + def send_message(self, message: Dict[str, str]) -> None: + """Send message to journald.""" + assert self._sender_process + + self._sender_queue.put(message) + self._sender_queue.join() + + def _start_sender_processs(self) -> multiprocessing.Process: + sender_process = multiprocessing.Process( + target=_message_sender, args=(self._uid, self._message_socket_path, self._sender_queue) + ) + sender_process.start() + return sender_process + + def __enter__(self) -> JournalControlProcess: + self._runtime_dir = pathlib.Path(tempfile.mkdtemp(prefix="journald_runtime_")) + os.chown(self._runtime_dir, self._uid, -1) + + self._journald_process = self._start_journald() + self._sender_process = self._start_sender_processs() + + return self + + def __exit__(self, *args) -> None: + assert self._runtime_dir + assert self._journald_process + assert self._sender_process + + self._sender_queue.put(None) + self._sender_process.join(timeout=3) + + self._journald_process.terminate() + self._journald_process.wait(timeout=3) + + shutil.rmtree(self._runtime_dir) + + +_PARSER = argparse.ArgumentParser( + usage="""Genrate journald log files. +This program reads messages from stdin in following format + +msg Test 1 +msg {"MESSAGE": "Test 1"} +rotate +msg Test 2 + +msg command argument be either plain message or json object +rotate command invokes journald rotation +""" +) +_PARSER.add_argument('--uid', type=int, default=1000, help='user id of log sender') + + +def main(): + args = _PARSER.parse_args() + + if os.geteuid() != 0: + raise Exception("Should be run as a root user to be able to rotate") + + logs_dir = pathlib.Path(tempfile.mkdtemp(prefix="journald_logs_")) + uid = args.uid + os.chown(logs_dir, uid, -1) + + with JournalControlProcess(logs_dir=logs_dir, uid=uid) as journald_process: + + while entry := input(): + action, *args = entry.strip().split(" ", 1) + + if action == "rotate": + journald_process.rotate() + + elif action == "msg": + if len(args) != 1: + raise ValueError(f"Not enough args for msg {args}") + + msg = args[0].strip() + + if msg.startswith("{"): + msg = json.loads(msg) + else: + msg = {"MESSAGE": msg} + + journald_process.send_message(msg) + + print(f"Logs avaialble in {logs_dir} directory") + print("To see generated logs use following command:") + print(f"journalctl -D {logs_dir}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/systest/__init__.py b/systest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/systest/data/rotated_logs/rotated_logs.txt b/systest/data/rotated_logs/rotated_logs.txt new file mode 100644 index 0000000..2b1bca7 --- /dev/null +++ b/systest/data/rotated_logs/rotated_logs.txt @@ -0,0 +1,45 @@ +msg Message 0 +msg Message 1 +msg Message 2 +msg Message 3 +msg Message 4 +msg Message 5 +msg Message 6 +msg Message 7 +msg Message 8 +msg Message 9 +rotate +msg Message 10 +msg Message 11 +msg Message 12 +msg Message 13 +msg Message 14 +msg Message 15 +msg Message 16 +msg Message 17 +msg Message 18 +msg Message 19 +rotate +msg Message 20 +msg Message 21 +msg Message 22 +msg Message 23 +msg Message 24 +msg Message 25 +msg Message 26 +msg Message 27 +msg Message 28 +msg Message 29 +rotate +msg Message 30 +msg Message 31 +msg Message 32 +msg Message 33 +msg Message 34 +msg Message 35 +msg Message 36 +msg Message 37 +msg Message 38 +msg Message 39 +rotate + diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000003-0005d4206c27cc3b.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000003-0005d4206c27cc3b.journal new file mode 100644 index 0000000000000000000000000000000000000000..2c50b08f167179f8c279d6f205c36d6af44c4fde GIT binary patch literal 524288 zcmeI*e{_}QeE{%_fKs&`{WYtzimgA|v!XX4B%tP0#6VKX50lv9hP#9i35bFOj`OS{ z8`Lp9p5l(SR!avuT3U2cEl4-pqCFme%qi>IBig!x)6II?jn!l4nVxk?-sc;Rx87q6 z5JLFm55BzjeV^y~zMtp&{1X%&&Bq_Ra>lh=j(lX< zIj_Aq|Bq|_Xw>-WyMA)ZvS((rJ^uGA&pKlE&z2uI{l*7hop;I;kH4_Cc>6u6pRvC^ zs`r1pKAb(U=eo3FL>6~^D2p4vo5fja_0cIFo8rh6^Xszzob%)v3&;IlXS+*mL6Phi}avh5=Fua&I^OQ80v*+>xwYh2?HAV7cs z0Rlrupy|t5E)j=LW-=Ly04?K_2M@6w z8k1fxi8$El*WR6t7;&)2Hy!~31PBlyK!Cu|7Fhj_EWe0DJ6qWiAV6UF3hYe3cZxWC z_jG9N*~{O|@`yMzvXTV>0t5&UAh0h2+t+2SBkoII+7cjecnj=Gua`wUyfy4S0RjXL zoIpOk9v1PyjihG+1PJV3fl29eZ6fa9-Uq+8$L`CvMI3JXXMYF`CxK6F z$@V7VaN0Nfvi}9f-JkUtv3P|11KA@Hi`QE|xB?r~-|I&_xY-z;0D;3vU`_h_lZc1Y zzS~y<1PBlyK!Ctd5Llf)e>vh%$Vmxa5xBzP3P~3hr>Sa%RcV7 zHESJlU;5IP009C72plYd73uw@5f4@##yvCycBb!Bi+E^cEz3hiAfM_h;-QkaJP8bj zKxg_syoiG_rZI+&KvVjD!H7dAvqK}3&UIOy5f6>5Wl4a*p&+m({k>$wLm_855+Fc; zKrXlAJDEa6CIko!T;TQed^O_0d(x7?{udaLy8KLvcb$IIisS!g&x+!a_N3R{7T=kz zIXZ37)LhrtT-SO@b!GOz5$W-Kig$f3xA?uM|If6vn2Yy~M7tQjJ#Db&wrMxD*1mjE zY#%Yk-&iz$%Ox!{8mAZPE~sf}t(#U?(_Fn^(Xu61wk@1HeqP6w;}c|Y`rB0x&#qY=?c;NzQu|!=*PJ$}TfMNQV%0BWx1w1t^4L}6RcLLV z($Lb>*xVXWS(C`4JoPYnecQ8ZKmEe==rF!7dT;w|P2^LTHr#aWpKRDY@0C^2E@I^K zk-M@-qJ9cZbr)2Zl~)yed@40uyy^1I|NZ*S=~J|Gx%eEOo_0N(6p#PFDQjMP;=z%B zm74d37{94_{EK_W&*w{xKWpku&yH&QkI~UlJg+Tux9fQ<#*YFoU-r2-Uw-KNGh_RR zG5+e}@uzo>-#vKv$9^w(=tm#CuO&zfAP|;v$~JkjwNkNI|@CA?wO0PNj*h= zF;81Yw{$#w!oqnmP<(%OVQSWI`v-+yy3NAtep zwX-;nmYQy+i)VJbH?-EwY_0CPrfX@(6@_JIr~fQDyK8ZKVd>djnM~vvh0ylYW8XR9 z_C1?pp!mIFp{U>RiaZ-y?m79PJ-_NZKSulTisfimXlcB-d1_6exn|0Y*1GzdLeD=f ztyAios>>=X@|9)z%6xfQMS0bPvfieV=lQ9lU!J}Ds?BdM?>o-o^G{3nn>CE_qYgVi zzvS0X|6pzUR`bX**XO&g*!TR##@6o3M|JsS`SQyA-1e%DvZ{)zNpt43UpAq#s=Tbd zt$lJw`Q*8&tH?X*@Xx+})rsex^vI$ZDc)CINc~2B3)2S6$NblEGavkyzT+y|h3n4Z z{OYGnt?fQedQzM?r()v7wtPkT+={A>{M?C?E2<{Wt*U6RENh=Ur)^TtVcd5d_bzlV z|K_#J=H2tJuk@Wq;&ZVJd;9H8A@cle?@}(;c;kriPo4JHf0q{fLgYC%y>F)Pc&Tk{ zX>FKNUsF9{PJ73k_Ohz{+>VO9`DC-6%Wc2rs;iG4Sy3Au#`o40qJ3Hs`81~GahtYI z{>>f#9E{PtZ(eJQ^XbW@rD@94n(Cz;T}%61;`-Kd)2&Zl_o+Wj?}pFi;`fP#zINFV zBhyIdrG#SnoF~s%IPS-zb0<94aOtS-ZO%Gk_Rp3dH~q#3U!8Z#6OX^J_J&`~8PV|5 zZ`~XBJ8@qX_Ze}Y6VD6cej={3ab1t=U0he=`V-f=xGuzXC(hGxJ&5ySoS)-773ae^ zAH;bn-q#Yxf7EjvS5ZGv$1#6VFX0&Tb!763^U-bb`9|q}GQU3Cp!VxoY}%N`nE$xn zT6N)j{@}fjk4RhRa^V*3(=dJ4rQ-XosK4kp&U4rO0k+HCJFq-w4zS(k0k#{H?hM|o%j^NR z+cv;d^ zdq-|<9`5JGct3K?Z+!l3%uCEy)WOlGzfynaEw}c)ZV>I`dK&W>?TYIl?iXWy)WK1A zw%vZ`KYo07w2wSW_1oV%C@;_Mpo{Au&f6cdOSrqdX?c=%_k4L-WI_SP-@BNAOQ3t0!_Wlza{;6w4TAhnL zO7+{{I;faX)!Sup9UPSg`ckTcr6ua1Y|$%6F6gZKx8Ys~@p(Tnzw!OkF)wl4%!~<2 zG3IAmdPcnF{%pl<>2nL$rO%t&_}!w0X))>`>g&xLPRljkbIhvPI`WGC#MRN8?m^)#-xEZruw?RM16h#qnCgDrA=pyAMW)P zpEn!xTbt@7=A~49#r(Y6x{7($ZzjQkLmU%5Q9T^kdWh>&T-PSA**U)H@%N`^(cP~X zZ_37x^IEpF_x1eZ>stI?Ao`8#TI~n^;-Vu@opWBakMnA&e*1e}n^fZd`8{c%AExWt z`V!Z*MR%?L#Gl`N@vy&ditinZ`Hj!rjd_XriaMD1wK0#bU-$VnF<;R>?w8{6XjfbZ z+1>2kI*B?coc)2yRnNEmZ?um*O7+{{I+$Gf_Rpi@b@^vf9c(R82TQ-Od&95KJ?FCF zet#41XN&ob&+CqPiTR8=xNk}4Uygj@eJ96!Mfn^2aUI0#-LXFE;I~_5{c!I5 zvwlC?M;@j6?PncSR1u;743_kFGaiJI*2!B#QLa%(H}eI-0wYe=2xQM$fH!h{jGzt zs<(f>b8H&ujZ_EEmZ*cSUtMxU!}=>eJlyLb-d7Xz8^3QZo|l*(kpKY#1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF s5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF3}u1;2cK!HX8-^I literal 0 HcmV?d00001 diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-000000000000000e-0005d4206c28150a.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-000000000000000e-0005d4206c28150a.journal new file mode 100644 index 0000000000000000000000000000000000000000..c3672546f4466253d767168f24022357350ada8b GIT binary patch literal 524288 zcmeI*e{_}QeE{&QLTi~7<}cLaETVRtPUI$r1aPTB3?wZfSrQ8t?IncpL#ZG?YH6op z?KZ6CNOx*yb#|z;qSmTa5I5ISdu*%S)KxvhbJU`>tGb?zY3(>?o^46q=Nr#%xW^C$ zlJMaVzPa~(pXd3$pXdAjxW8vMeX^yluDLGP`S0Ma|I)t)4qEp1$R``WbM-Bwuls7p zv(!~(-&}Ok`eWP`ri+B|Fz@j?2esRqy-0M@u{P-xapxRo}U(vNO5$Eho_j|nEf&0 z=y6NOJTp9Z%jhg#d1~VY!#Wp@P3w(HF~W)IkKrlqNaMzGv2OxnzLBjLv2U%cwXX!~ z{yG~e;=UT!I0Og~AV7e?z!7Nrt1Op@11B??3`BulH)L5w{Cv6{@xA7}J=J4R{ASkU z_7vZ1|3kg~#v8LOj5yS{*Y*u8fqB^9z%tr79v*5CTAY>kNTw*FvSo=@8s zu{i%Z={Oj%c=^7*y)`}G6LDXUza!OK#JxU#s@Gh^y*}XkJ3!ZSpxyu0rYH~~K!Cu2 z5ZIN@rx6E4Qj!?70&COb>4<}NLxxD=ho0HQS(wLu~VG4gmrL28_V=+p<(54w%FwLf`-t$fxJS zA|8MQ18S2u+>xadaX=*}nISK*J$?Qy;*j598@N{m7H`bDkGNL{(>VbG1PBlyuwMj5 zrOy*b+%HKOlmGz&1PBlyK!5-N0s~Os)%1E#!~vM%drE0j`n-O`_tcrj1PBlyP*UKh z-^&(7EV-fu1PBlyK!8A>1-5U=R*l%_hF0trSeL$kIpV%%S(m<#C*r;y-*|&dU_<)7 zuZV*yd)e-9fo&VJ3?@CazcYD%#vhfA-x1%RUWaZkPhFR7Tg0KerM8R!0RjXF5FkK+ zz}^?gr_U=!-1{RMfxzGqSe$+jFXG_IT&DdI7;}HtDq>UmJ*0^J=~Ej51P%;=wdwnr zA|4o9z5ljyL;AdA#Qm3vkqHbwfu|nGa*jCowm`-N2oNAZfWSZ(I4C9fi4<@B==!zC z{mss`#d+^c&$}(ZC0jBgts#wy{gYTfVvN73X#AG*T4pxRDAb=*JFB&RdVOtk^@Yn;uei8v>4Nc# zIxZf+blIYn-CaaplT(MAN3K}@<_ncCME~)8Qm_8f3Xx}Xy8ZmezBuyc7rwPV+DD8$ z<94)bsBLMPI-|C_p=0IBwnZId$`VlIH6b)i@Xt?=^&`gk>x##p(K&wS;Kkd0;SYa)v!E3 z<2Ixf9{kC~OCLG=(Mhp>#29y5@whdO4Gq=H*Q{E+?BX*zSB~+=r-qx)`-6|Xa$$3n zSuWn+To@XiN-JD@LBp^Ir(Jwi ztRFG*%l~b5PfuOeOl>OEPM=;|(^~(j+Uk6apAK`mYYzX_Q*A$g>8R)@J`Y^@f9>9J zx+twU`tARD;urUSEtsQu&;Gi&IFFXvPNs9`bh^)Kt)0_ay>iXURUMx#te%qoSTSYg z^7g{2DJwIX$TJF|?TMd0aQqEB?~Q@tb;Uwazu^^mUN>&UnJ+F|xGUO446nEy?Fuc8 z=Qh{W7Mg3P&TOr3s4aB;*U~z*p{cs8vLatumaoj0msOTklvQ*$jXbBNj(#)crAzL8 z`|_UsEZ$#Qy5Fo}j30Hl_)k~7@#GI~O1~l!S>}3tHx~O&Yiw-oJbhG`FU*%$<`=YA zb(B?AR85*czkT5ul~v_s?QQLoJIW_7NL@wVQHNjr_9Z9$@rjQvi;?1e)rHh=Pw37vAZ&JB7&eQ{A_6xyEY_8vn$p zfBD^XvnNEJqtf%LJ^M>tV@vC-sSUN&XUuQ!nBQJjm0!@&Io07+mU`H>=8{WC98ysi z9men06{3Aw5cxEw+hgw9GWmBm{&O%!^PYLFE6%4YmzJifHMP~NI##ag$s^8hE$grQ z$16_zqx5e0TrOTuEcCR?hBzdRbaqPU@N|2`(c_kmd1iR-wXuyC4C}ma!ol;NzkKwJ zYaV`c(MgX#_VP`$PyWihO>e%qCa!nlx+<#d$T(KXIOm z^Fo|=;y4}WgE%h6@i~rDaXgIUK^&Ljdy-@Sk9v;%D(WZdIOZ?vB^+bEK9IcPcyxXI z{!qG}%-@`?Q1|UDHf_pc%zx}Chiraj$(Uc(y_UMmg#bCPu{@4*XKi`= ziS3V+9~Q@(*#1)emOdU89p@%b>`tk;enzH5Kb`7l^2xiepRr?q`R0Uq=fC!D^|Qb0 z<-OL&;*A+d%%$66oYf4A5J#DwCpY3w@^e@jj{cLw{ zKieIet_=3nWnMqqZS7~fQTO%Fb8$c0ZR=;dW53tGJeT*eT^zsSxOnE%yH0%eu^V2_ zM(sS0ZOs0P{UYvyj~si8^13U^AhtFb+GBh&s{P5jddT1`HJ>&K8^dM zU9UPw7mTqy>foM#JG0~BpPm1iXdii$>bI|TP+5^ZWl?;7{QWf0Z&MvSU7`+-J8H|U zfB*a&4MV*S;&Z(*zdO=1Au%s8Ur`4&J3n;kl%t;Md0ZOp9FYc^a!hueee5Z_zfOAb_UWgu{=++l4fQ&R z-_MEpjqg{Bd5Py&S{EdAMc zuS(y7mdiz6(SP(3?RwQ$cKGjAU&lZ_48VMZ(z)Ce2yUICFU#Y;0se0 z{_D#%AM1KbxBFkTkL#tlKic)GgQ{$eSRQq7){m>6ZJajdvS=T9lR|2*cP{$E*{7`@>U9wBr;Pa>m#%+fUShtY4u1SkYsbF2X5^p6 zd`0`XUXJ^tU9UPwPk6`jsDs%%x86DO>Wj{b_K`=ae*0Jl<@x;X>)@C)(A-o9Q%>Fe zU9v`a~AWPPmh0MUShtY4pwY9?UOs3k4SrUE*I^KA1_6_UUiT? z{OI*~=SSB*`ioO9{b0}Yc9BP^e*0VpRl9$@Gdk75EvXK!EKvvdeEf)Q2QN6|xuISM z@xGdv-+29}cwS;Z6$ua^K!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs Q0RjXF5FkK+z(5xGKl-qt>i_@% literal 0 HcmV?d00001 diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000019-0005d4206c285fc5.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000019-0005d4206c285fc5.journal new file mode 100644 index 0000000000000000000000000000000000000000..da083b89d059377fe281d2734d70852d0f460d6e GIT binary patch literal 524288 zcmeI*e{fahnE>D`f?7vgcGg|0wHDl|b-JQAAt9h!O7H$3%)bu((W zKXCGY|5fd0Mo*l+^hic3z!U9FoOt$7FHsV_Cc~tv)iv6H`1k#e8@6 z$Ckp$i^e}cCUyqO_2R$;#^0K47ja;%Y_+ch z>b{S9{+kVqs*%lG^w*7&&KA-k4 zVsZX+(s3|i@%nvzdsBM8C*r;ye@Cjfh(kVps@Gh^As_Jl9iaC)&^`aINl_S90w-_C z(up{%l9LPp0t5&gFap=4$I}rHn8ZhDqDQ3XOCpZY-LzW-2oNA}fD4R&G~0oQ2l(y` zdHOpxWkW<9@&OG%fB*pk1PF|Hf$iyar-&ndhwUH%0tc^K1e1PBm# z7XoX(ooyWPT{In%O?RjF3q~B0L3BE71XkUjr4n)2Bqk98Jp$L3e-K2jTdp>j&3vp!%JXn`aE&O;gy|i2n;WQ?dkcmh{G#8 z*$^N=;NTV5p1uz%;=wC%aP0Y0k7WB9@!;6&{kNCfzL$+1asOpvG$v=4!8W|GqeIt>HC5r z4((`qCqUr+6qxc@)?38)vu}Ove}OgW^O6zwe?~GOK!5-N0t5&U7)b)#zn|?*#F4ab z`(j@XNrRl1;yowbe9f`n*m+Iy*gMnnZk^xE)*P8OXqr{uIIF(pg6hfHf>CLCKE-=J zo$EaMq+RExo4I)3NVJRb+tUVXZa??tmb#b!GPaKx5hxv6nhOF(5!B9HRa!_>Q5U%2zE zm!?OD@q5v4x6jr@KJ{tCbyt1%z8B}ea$U5G82Nnoo@`0fPob&)oa(ajs;HO97D{3!79<)41*Dw?@NC4+#la^ z|4&+?eY`)d(6fE-vKY4^ZLsml%FCbl)Rrl+eZ&}dbMd&-8XFp_JFi@}V9DZ1JsZdP z6H~(t7ku<%zr18tlvys`-(2Wz*RxHGzc6io{80;ESulF_rRip0i1By#8^7V48TB)3 zBZcu@?MoMSF6%n2=U&^fv~^iWq4%zPYUh=yr^qknX~UT2jwg;=G(QH4-_I^g%leJ{ zn$iYeyr^OH#W{POo^OZw_^T1``-_S|!8r?u3dUt67z@zY^0chd*|V_WNU zFCG*9#OHwvf6(r2r_ZGg$G!Ett^fAumx4K(_uXF?6z9=g+ru<_PLKP{mfATj)m>M1 zE$g_nu>AD&$I{cgI@=4&PVdTOBF`v<)~!!{@3^n+Tpt6)>xzYgW5|L!X* zUXH^-H1EqZ7DT&3bK~q;(`pN|YHMb+)Hl=?djD%~scC4cE}LADpInxooG&k%lCP|& z*wZxfJUeyt>gg|Dw*IY^efP6?ez0`ES;H7V>Ttn|rT_brAKsb1#V)eU_2u1N?0a@& zV@uEJqq_X!eEH=3y!NV&vZ{)zDQ#`-7f+g8RbJNK+CH_TeCoW^RpcFY`1x;LcKn$q zY*`W`#rvuYso%(NQQBbT5wDG#^Y}mY-LIlu)ZzBx{2FSe)%Dy@dQ+@ytEjAO%~zDq ztElS8&#RnTQB^sws-k^zS^Lzs)+xPr+nB2XW zHeNKkXW40^=Kg%;xal`N{`&k0&piFooj1<;hlwZs^#5ApdMB={;yNR)bK-X|;(8*^ zvvFRJ^Ie=*bLaq=ns!`lgqM8vf}zVE+zWGxmo?ZTB3et zKKN(9|Hca+pR%|5+28f@Uh88)cLoykyE)yCc`2@+-4ABl$NUu6&7ALK>x3$H+Z~av4BoBF+(EY6G{|;iA0C|Nf9Y8^_>nYFb2=`TEq>?o>ekO*F=5y9<-11uaWQ^>CgwN3Pc!Bv<}2#p zx64*cKL6G~>Wuk{_HjOq<wnJWBOD&^oB9%udL~ zb#Qzd=*y`NI!n~Sic^~(y{+cn>XBXt@jF>DzwtSMn3tHZsDs64TzKml+uwLL<}2FA zc`=qpyMA?$Jv{4sd>?i2FDpN=diLzH3!;7GQL5j8*1?prtniBK;G=1vZK)2vTcQru z&3mZsu^<2YbtAnF;`jPue&hSJVqW5UXHHB|iZMT%(j($Ek7g@w-;l-b^m+WXj}O+StHv45rd9dLbR z4~uif^>soTXk2S{9sXvC`f8oFy1VInnawIp#OM|3BuXRDH$#yxY2pdDd?x z!GS~ECwilL7}|P>^G}@Dx-P%*@P~JOXLZa$91r8T7R#euzw=spVIbDWdF{MkcP`s; z{i{z#`#7$a>UW^$wW%eppFfrcx*(m`rYw2q$5Eg7!KhDMpPySZ($8!0y~i=XZ$Ro{i4sqJ3O1$2>;6esz$Z@Q(FyJow^2p0cX(1D&Tw`^ckIzXPs=s&{@oJs}PB zT&jbIOVq)n2mk*1`zzPqHPY)K-X|3E8{b0~^AhtFb#PnT4<|gZ>4v`7iK2b+&?aU67wSxAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF W5FkK+009C72oNAZfB=EvEb#whO|QNH literal 0 HcmV?d00001 diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000024-0005d4206c28a26f.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000024-0005d4206c28a26f.journal new file mode 100644 index 0000000000000000000000000000000000000000..7a822282782d8254deb10c9c0bdbe2eeb7eab2b5 GIT binary patch literal 524288 zcmeI*e{huLodEDvp>>?La#{yJp%OL(7eEF03q zC}6^eKlo<%eSUtP?>_Ilo6YXjnhWbHDrzfo9siE*{4f0*bL&lQTXN^0*4{Y0=^yT& zy7bcWzZf!V%8q}zec``NZG8SmH=T3z6~DZ3_>^Uv_q3e;!t<}MdTrGDQ$BXn)Fm4~ z^31|7z4fcBcBgiR{r;o-e%f(TcE|2-qytA~aoZ=exaNs0UX~6Ym*S`t&rC7jp8YU) z^;vU9{CsF`QD(( zd_U_c;-TtSKLiL6AV7e?hex31JK0zwet1S^Bp*hBz2D7774d87cEtBP=ItpTyW-zw zEpAWo{kA{Y>$iU|n_@rn@c5y={dju4C*q;*e@DusK==Jynxb$}1$Nw@ zH6QVycBXLxy$FnWG%FCXm*Nf+ATWRhmZs;^5eM*82F1{iO|O?k92Ao>2?7KTiNN;9 zvThGSh1O}?Wxb!)85eMo#2J5(orN1YR zI9TUtUIYjXe1WqzW^;%*@MmNO1PBlyK!5;&!z-{R{k?v~!#f(i6Cgl<009C72oN~3 z1a3*6ry23cn!VW`Zh>3U=PyS*+~d*zz!q5X!)*K!Kj_)4NZdaD{HE+^#6MQH^VoBb zWF;c@R=&66`Sg8#5qm4|IDsQcV0QXGyog8A+|4zYt4W{t7m>-q6{uL3wIA`|_NH|L z1O~FeiunH3#|QE(jGX`h0!N;}_VoHg#3N6@j0q4RaAXJ^m8N+?ig%A(zVwv4b}ucQ z{qFR-+q~7;k>gT^AYT`{ew zmWy%hDDY~iubnupuBN)SKA^HHF^=NY!uTIF{$ka+uTO~vXQYnOcW z-q%`o-W>HJ#&}M?JG&?Hr=h0u{Ia6r(#V$>$JwdkytUV?|DV0LrFVJfa`8Dlo%K44 z6!yRA^c8Qtu=$v8hFfZ;%cG{S|BE{N&*yvU|FTKTe=(%-zlKIT@%*--quw9(9|>N( z@RE1l_{m>>G0I1b`QK95|CEmYJ323%_tii7*@?H`|5AO_kN2lFbd>+YeydV}P5(IN zhNr&r%(y5YG5Xz7*zct3s;aVii`r(-y>4_z;pl%ct)HW|@Y-?`l zeCU30-lEh}j4#${B`GV6KfjECr>V)R9|^*(d&UTjS4O`((5ee-GTSU%hvou1N)lzw>`v zUw!npV2aGzFRKE1xIbx~_u^R*2NC!`-1OlY0g)X+AeHIs>P zMj|wBeQv|4-`%}FI*Q*bHWaiQUNO$vFP*UX<1amTOw@}QUU56>HPlsKR6D7>p|*VD z)cVS*@`ld;>gp#})sz*DEy<59%8$(#7mY9N_+ekw80UGZp7%Up%6#$I{H&(Z=AzP)(s46q zHeES-Y-w>(Q)AQk=Hl_QQd2SRY!dT*gW^VKp@2hS|?Z)`#qyje{`{wZJ zoBz4%aTWE#b$j9XswPgV=y;rT4sp!Pk}+c%^CiWzN=lpav&M`sDIGJbw4`ZlQPcRD zjpI5W#$Au&eK$I8|K_bLTON32XV*RwpNrk_ZoB)25aYaf->qD(df8EZb!Sp6Yaa#M;Nk|K@u?4aTV6wXPL~23)t3+HxbK{!ulVJS!>26Uyr<># z7oLB8)y@r-Pj6bXx-G7E;<_rXGvYcYzPBo_C*nLC=k++>#d$T(KXIOm^Fo|=VxNxl zLF|jMKgT{5`(f+{u`k8*{&@UHKF8xK@+a~*)-UoU9AmvclDuL+S{dJ$ovtVIKgbGH z+?U0gHCc@Hj|a-b)8Dw`iEo}fJT;dKx2PZYN4;*>TPXqKc$@f0 zj&&)_pS=%c$T%N)}H+okBhiJ z>UG-}i?b4OJod%%@{i7%x%0xV{Wcz#J+<5CeQ{j25f|=@Bhx@nxjQ?rzTLyVSbXmN zw_bDFL%Rn1z8If>9_t(5=Nan~>lJzM_06k`#{S6<<8nF|_2YcnYaXNv#%MS4;M7-# z+`VRN)lZ{-jH9P^`)mmpW=rc`)hW17BZ#?YzsMt-SN8y9Rq6#NV~Y z`tC@tgv7eUdPN>=JHF+(_Rq8|iS>&5abAr3qh7cDpyQgo^Isf~JXpNwj6Ik9_QbB& z%VHcowcFP`7(e>Jd2m)XjNFEl2Y=l|9&Ehi$p=QPtX((Q^C12nIMz4b-yiD|*E`db zTZCAj$I~<76^~{|R;JI1ZQqpLUh_oM*gr-dM1H;b?2E_e7mbVVaxt!GKWaz4ZuvDP zD-p*dzpC#0&M~!(=gp7$F~6SL?PGq8%NJ!QxWawr&r(OfPx^ z*z;@m+6*MtH{S0T>k%#ru?9|aKDGxUHkO%Ko=c2*z;if zV;M-SZ~UEStV^s{><1^FF!TDCo0iAZ(OlFoe7+R*y5&K7XGR>4JXra_nBPrmd*YuW z4`LiWwcFP`7@a@x^XXBkqmlP!=Y=7U9{9X)$DQk5t-SbzUk~;?i1&xY`o`~@3)dyq zM user-1000@....journal + * Create new user-1000.journal + * Delete old file + """ + + def __init__(self, destination: Path) -> None: + orig_path = Path(__file__).parent / "data" / "rotated_logs" + self._orig_log_files: List[Path] = list(reversed(sorted(orig_path.glob("*.journal")))) + self._rotate_to: Optional[Path] = None + self._destination: Path = destination + self._current_log_files: List[Path] = [] + + @property + def log_files(self): + return self._current_log_files.copy() + + def rotate(self): + log_file = self._orig_log_files.pop() + tail_file = self._destination / "user-1000.journal" + + if self._rotate_to: + shutil.move(tail_file, self._rotate_to) + _LOG.info("Moved %s log file to %s", tail_file, self._rotate_to) + self._current_log_files.insert(1, self._rotate_to) + else: + assert not self._current_log_files + self._current_log_files.insert(0, self._destination / "user-1000.journal") + + dst_path = self._destination / "user-1000.journal" + _LOG.info("Added %s log file", dst_path) + shutil.copyfile(log_file, dst_path) + self._rotate_to = self._destination / log_file.name + + def remove(self, *, last: int): + for _ in range(last): + log_file = self._current_log_files.pop() + log_file.unlink() + _LOG.info("Removed %s log file", log_file) + if not self._current_log_files: + self._rotate_to = None + + +def test_log_rotator(tmp_path): + log_path = tmp_path / "logs" + log_path.mkdir() + log_file_handler = LogFiles(log_path) + + assert len(list(log_path.glob("*"))) == 0 + + log_file_handler.rotate() + + log_files = [p.name for p in log_path.glob("*")] + assert len(log_files) == 1 + assert "user-1000.journal" in log_files + + log_file_handler.rotate() + + log_files = [p.name for p in log_path.glob("*")] + assert len(log_files) == 2 + assert "user-1000.journal" in log_files + + log_file_handler.remove(last=1) + + log_files = [p.name for p in log_path.glob("*")] + assert len(log_files) == 1 + assert "user-1000.journal" in log_files + + +class _MsgBuffer(MsgBuffer): + """Wrapper around MsgBuffer which allows to wait for messages.""" + + def __init__(self) -> None: + super().__init__() + self.has_messages = threading.Event() + self.wait_threshold: Optional[int] = None + + def add_item(self, *, item, cursor): + super().add_item(item=item, cursor=cursor) + if self.wait_threshold and len(self.messages) >= self.wait_threshold: + self.has_messages.set() + + def get_items(self): + res = super().get_items() + self.wait_threshold = None + self.has_messages.clear() + return res + + def set_threshold(self, th: int) -> None: + self.wait_threshold = th + if len(self.messages) >= self.wait_threshold: + self.has_messages.set() + + +class StubSender: + field_filter = None + extra_field_values = None + + def __init__(self, *args, **kwargs): # pylint: disable=unused-argument + self.msg_buffer = _MsgBuffer() + + def start(self): + pass + + def request_stop(self): + pass + + def refresh_stats(self, *args, **kwargs): # pylint: disable=unused-argument + pass + + def __call__(self, *args, **kwargs): # pylint: disable=unused-argument + return self + + def get_messages(self, count: int, timeout: int): + self.msg_buffer.set_threshold(count) + assert self.msg_buffer.has_messages.wait(timeout), f"Timeout. Total messages received: {len(self.msg_buffer)}" + + messages = self.msg_buffer.get_items() + return [json.loads(m[0]) for m in messages] + + +@pytest.fixture(name="journal_log_dir") +def fixture_journal_log_dir(tmp_path): + log_path = tmp_path / "logs" + log_path.mkdir() + return log_path + + +@pytest.fixture(name="journalpump_factory") +def fixture_journalpump_factory(mocker, tmp_path, journal_log_dir): + pump_thread = None + pump = None + + def _start_journalpump(sender, *, pump_conf=None): + nonlocal pump_thread + nonlocal pump + pump_conf = pump_conf or {} + + mocker.patch.object(PumpReader, "has_persistent_files", return_value=True) + mocker.patch.object(PumpReader, "has_runtime_files", return_value=True) + mocker.patch.object(JournalReader, "sender_classes", {"stub_sender": sender}) + + config_path = tmp_path / "journalpump.json" + with open(config_path, "w") as fp: + json.dump( + { + "readers": { + "my-reader": { + "journal_path": str(journal_log_dir), + "initial_position": "head", + "senders": { + "test-sender": { + "output_type": "stub_sender" + }, + }, + "searches": [{ + "fields": { + "MESSAGE": "Message.*", + }, + "name": "test-messages", + }], + }, + }, + **pump_conf, + }, + fp, + ) + + pump = JournalPump(str(config_path)) + pump.poll_interval_ms = 100 + pump_thread = threading.Thread(target=pump.run) + pump_thread.start() + assert journalpump_initialized(pump) + return pump + + yield _start_journalpump + + if pump_thread and pump: + pump.running = False + pump_thread.join(timeout=3) + + +def test_journalpump_rotated_files(journalpump_factory, journal_log_dir): + stub_sender = StubSender() + journalpump_factory(stub_sender) + lf = LogFiles(journal_log_dir) + lf.rotate() + messages = stub_sender.get_messages(10, timeout=3) + assert set(m["MESSAGE"] for m in messages) == {f"Message {i}" for i in range(0, 10)} + + lf.rotate() + lf.rotate() + + messages = stub_sender.get_messages(20, timeout=3) + assert set(m["MESSAGE"] for m in messages) == {f"Message {i}" for i in range(10, 30)} + + +@pytest.mark.parametrize("msg_buffer_max_length", [3, 5, 10]) +def test_journalpump_rotated_files_threshold(journalpump_factory, journal_log_dir, msg_buffer_max_length): + stub_sender = StubSender() + + pump = journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_length": msg_buffer_max_length}) + + lf = LogFiles(journal_log_dir) + lf.rotate() + + reader: JournalReader = next(iter(pump.readers.values())) + assert reader.msg_buffer_max_length == msg_buffer_max_length + + lf.rotate() + lf.rotate() + + for it in range(30 // msg_buffer_max_length): + messages = stub_sender.get_messages(msg_buffer_max_length, timeout=3) + expected = {f"Message {i}" for i in range(it * msg_buffer_max_length, msg_buffer_max_length * (it + 1))} + assert len(messages) == msg_buffer_max_length + assert set(m["MESSAGE"] for m in messages) == expected + + +@pytest.mark.parametrize("size,num_messages", [(50, 1), (1000, 2)]) +def test_journalpump_rotated_files_threshold_bytes(journalpump_factory, journal_log_dir, size, num_messages): + stub_sender = StubSender() + + pump = journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_bytes": size}) + + lf = LogFiles(journal_log_dir) + lf.rotate() + lf.rotate() + lf.rotate() + + reader: JournalReader = next(iter(pump.readers.values())) + assert reader.msg_buffer_max_bytes == size + + for it in range(30 // num_messages): + messages = stub_sender.get_messages(num_messages, timeout=3) + expected = {f"Message {i}" for i in range(it * num_messages, num_messages * (it + 1))} + assert len(messages) == num_messages + assert set(m["MESSAGE"] for m in messages) == expected + + +def _lsof_is_file_open(filenames: List[str]) -> Dict[str, bool]: + """Check if file is open using lsof""" + # psutil doesn't show deleted files, but this exactly what we want to test + output = (subprocess.check_output(["lsof", "-p", str(os.getpid()), "-w"]).decode().split("\n")) + result = {fn: False for fn in filenames} + for line in output: + for fn in result: + if fn not in line: + continue + result[fn] = True + return result + + +def _wait_for(predicate: Callable[[], bool], timeout: int): + cur = time.monotonic() + deadline = cur + timeout + + while cur < deadline: + if predicate(): + return True + + cur = time.monotonic() + time.sleep(0.2) + + return False + + +@pytest.mark.skipif(not shutil.which("lsof"), reason="lsof is not available") +def test_journalpump_rotated_files_deletion(journalpump_factory, journal_log_dir): + stub_sender = StubSender() + journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_length": 1}) + + def _get_message(): + messages = stub_sender.get_messages(1, timeout=3) + assert len(messages) == 1 + return messages[0]["MESSAGE"] + + lf = LogFiles(journal_log_dir) + lf.rotate() + + assert _get_message() == "Message 0" + + lf.rotate() + + # Loop once to get new files open + assert _get_message() == "Message 1" + + log_files = [str(f) for f in lf.log_files] + assert len(log_files) == 2 + + assert _wait_for(lambda: all(_lsof_is_file_open(log_files).values()), timeout=3) + + lf.remove(last=1) + + # Took buffered message + # Dropped messages 3-9 as file was deleted + assert _get_message() in ["Message 2", "Message 10"] + assert _get_message() in ["Message 10", "Message 11"] + + def _only_head_open(): + open_files = _lsof_is_file_open(log_files) + return open_files[log_files[0]] and not open_files[log_files[-1]] + + assert _wait_for(_only_head_open, timeout=3), f"Expected {log_files[-1]} to not be open" diff --git a/systest/test_rsyslog.py b/systest/test_rsyslog.py index 88c03cd..1bfa7a4 100644 --- a/systest/test_rsyslog.py +++ b/systest/test_rsyslog.py @@ -3,6 +3,7 @@ # This file is under the Apache License, Version 2.0. # See the file `LICENSE` for details. +from .util import journalpump_initialized from journalpump.journalpump import JournalPump from subprocess import Popen from time import sleep @@ -68,21 +69,6 @@ def stop(self): self.process = None -def _journalpump_initialized(journalpump): - retry = 0 - senders = [] - while retry < 3 and not senders: - sleep(1) - readers = [reader for _, reader in journalpump.readers.items()] - senders = [] - if readers: - for reader in readers: - senders.extend([sender for _, sender in reader.senders.items()]) - retry += 1 - - return journalpump.running and senders - - def _run_pump_test(*, config_path, logfile): journalpump = None threads = [] @@ -92,7 +78,7 @@ def _run_pump_test(*, config_path, logfile): pump.start() threads.append(pump) - assert _journalpump_initialized(journalpump), "Failed to initialize journalpump" + assert journalpump_initialized(journalpump), "Failed to initialize journalpump" identifier = "".join(random.sample(string.ascii_uppercase + string.digits, k=8)) logger = logging.getLogger("rsyslog-tester") logger.info("Info message for %s", identifier) diff --git a/systest/util.py b/systest/util.py new file mode 100644 index 0000000..dca0846 --- /dev/null +++ b/systest/util.py @@ -0,0 +1,16 @@ +from time import sleep + + +def journalpump_initialized(journalpump): + retry = 0 + senders = [] + while retry < 3 and not senders: + sleep(1) + readers = [reader for _, reader in journalpump.readers.items()] + senders = [] + if readers: + for reader in readers: + senders.extend([sender for _, sender in reader.senders.items()]) + retry += 1 + + return journalpump.running and senders diff --git a/test/test_journalpump.py b/test/test_journalpump.py index 6ea10cd..586c777 100644 --- a/test/test_journalpump.py +++ b/test/test_journalpump.py @@ -379,7 +379,7 @@ def setUp(self): def test_filtered_processing(self): jobject = JournalObject(entry=OrderedDict(a=1, b=2, c=3, REALTIME_TIMESTAMP=1), cursor=10) handler = JournalObjectHandler(jobject, self.reader, self.pump) - assert handler.process() is True + assert handler.process()[0] is True sender_a_msgs = [(json.loads(msg.decode("utf-8")), cursor) for msg, cursor in self.sender_a.msg_buffer.messages] assert ({"a": 1}, 10) in sender_a_msgs sender_b_msgs = [(json.loads(msg.decode("utf-8")), cursor) for msg, cursor in self.sender_b.msg_buffer.messages] @@ -397,7 +397,7 @@ def test_too_large_data(self): too_large = OrderedDict(a=1, b="x" * MAX_KAFKA_MESSAGE_SIZE) jobject = JournalObject(entry=too_large, cursor=10) handler = JournalObjectHandler(jobject, self.reader, self.pump) - assert handler.process() is True + assert handler.process()[0] is True sender_a_msgs = [(json.loads(msg.decode("utf-8")), cursor) for msg, cursor in self.sender_a.msg_buffer.messages] assert ({"a": 1}, 10) in sender_a_msgs assert "too large message" in str(self.sender_b.msg_buffer.messages)