diff --git a/journalpump/journalpump.py b/journalpump/journalpump.py index 01ab427..992d488 100644 --- a/journalpump/journalpump.py +++ b/journalpump/journalpump.py @@ -12,8 +12,8 @@ from .types import GeoIPProtocol from .util import atomic_replace_file, default_json_serialization from functools import reduce -from systemd.journal import Reader -from typing import Type, Union +from systemd import journal +from typing import cast, NamedTuple, Optional, Type, Union import copy import datetime @@ -21,7 +21,6 @@ import logging import re import select -import systemd.journal import time import uuid @@ -31,6 +30,8 @@ except ImportError: GeoIPReader = None +_5_MB = 5 * 1024 * 1024 + def _convert_uuid(s): return str(uuid.UUID(s.decode())) @@ -64,13 +65,23 @@ def convert_realtime(t): } +class SingleMessageReadResult(NamedTuple): + has_more: bool + bytes_read: int + + +class MessagesReadResult(NamedTuple): + exhausted: bool + lines_read: int + + class JournalObject: def __init__(self, cursor=None, entry=None): self.cursor = cursor self.entry = entry or {} -class PumpReader(Reader): +class PumpReader(journal.Reader): def convert_entry(self, entry): """Faster journal lib _convert_entry replacement""" output = {} @@ -127,6 +138,7 @@ def __init__( tags=None, seek_to=None, msg_buffer_max_length=50000, + msg_buffer_max_bytes=_5_MB, searches=None, initial_position=None ): @@ -134,6 +146,7 @@ def __init__( self.log = logging.getLogger("JournalReader:{}".format(name)) self.name = name self.msg_buffer_max_length = msg_buffer_max_length + self.msg_buffer_max_bytes = msg_buffer_max_bytes self.initial_position = initial_position self.read_bytes = 0 self.read_lines = 0 @@ -147,7 +160,6 @@ def __init__( self.field_filters = field_filters self.stats = stats self.cursor = seek_to - self.registered_for_poll = False self.journald_reader = None self.last_journald_create_attempt = 0 self.running = True @@ -156,40 +168,53 @@ def __init__( self.last_stats_send_time = time.monotonic() self.last_journal_msg_time = time.monotonic() self.searches = list(self._build_searches(searches)) + self._is_ready = True + + def invalidate(self) -> None: + """ + Should be called when reader has stale information about files present + e.g. some journal files were deleted and we need to release file descriptors. + """ + if self.journald_reader: + self.journald_reader.close() + self.journald_reader = None - def create_journald_reader_if_missing(self): - if not self.journald_reader and time.monotonic() - self.last_journald_create_attempt > 2: + def fileno(self) -> Optional[int]: + return self.journald_reader and self.journald_reader.fileno() + + def create_journald_reader_if_missing(self) -> None: + if (not self.journald_reader and time.monotonic() - self.last_journald_create_attempt > 2): self.last_journald_create_attempt = time.monotonic() self.journald_reader = self.get_reader(seek_to=self.cursor) - def update_poll_registration_status(self, poller): + @property + def is_ready(self) -> bool: + return bool(self.senders and self.journald_reader and self._is_ready) + + def get_write_limit_bytes(self) -> int: + return self.msg_buffer_max_bytes - max(s.msg_buffer.buffer_size for s in self.senders.values()) + + def get_write_limit_message_count(self) -> int: + return self.msg_buffer_max_length - max(len(s.msg_buffer) for s in self.senders.values()) + + def update_status(self): self.create_journald_reader_if_missing() - if not self.journald_reader: - return + sender_over_limit = self.get_write_limit_message_count() <= 0 + if self.msg_buffer_max_bytes: + sender_over_limit |= self.get_write_limit_bytes() <= 0 - sender_over_limit = any(len(sender.msg_buffer) > self.msg_buffer_max_length for sender in self.senders.values()) - if not self.registered_for_poll and not sender_over_limit: + if not self._is_ready and not sender_over_limit: self.log.info( - "Message buffer size under threshold for all senders, starting processing journal for %r", self.name + "Message buffer size under threshold for all senders, ready to process journal for %r", + self.name, ) - self.register_for_poll(poller) - elif self.registered_for_poll and sender_over_limit: + self._is_ready = True + elif self._is_ready and sender_over_limit: self.log.info( - "Message buffer size for at least one sender over threshold, stopping processing journal for %r", self.name + "Message buffer size for at least one sender over threshold, pause processing journal for %r", + self.name, ) - self.unregister_from_poll(poller) - - def register_for_poll(self, poller): - if self.journald_reader: - poller.register(self.journald_reader, self.journald_reader.get_events()) - self.registered_for_poll = True - self.log.info("Registered reader %r with fd %r", self.name, self.journald_reader.fileno()) - - def unregister_from_poll(self, poller): - if self.journald_reader and self.registered_for_poll: - poller.unregister(self.journald_reader) - self.registered_for_poll = False - self.log.info("Unregistered reader %r with fd %r", self.name, self.journald_reader.fileno()) + self._is_ready = False def get_resume_cursor(self): """Find the sender cursor location where a new JournalReader instance should resume reading from""" @@ -201,7 +226,10 @@ def get_resume_cursor(self): state = sender.get_state() cursor = state["sent"]["cursor"] if cursor is None: - self.log.info("Sender %r needs a full catchup from beginning, resuming from journal start", sender_name) + self.log.warning( + "Sender %r needs a full catchup from beginning, resuming from journal start", + sender_name, + ) return None # TODO: pick oldest sent cursor @@ -235,7 +263,10 @@ def initialize_senders(self): extra_field_values = sender_config.get("extra_field_values", {}) if not isinstance(extra_field_values, dict): - self.log.warning("extra_field_values: %r not a dictionary object, ignoring", extra_field_values) + self.log.warning( + "extra_field_values: %r not a dictionary object, ignoring", + extra_field_values, + ) extra_field_values = {} try: sender = sender_class( @@ -289,7 +320,11 @@ def inc_line_stats(self, *, journal_lines, journal_bytes): self._sent_lines_diff += self.read_lines - self._last_sent_read_lines self._sent_bytes_diff += self.read_bytes - self._last_sent_read_bytes if now - self.last_stats_print_time > 120: - self.log.info("Processed %r journal lines (%r bytes)", self._sent_lines_diff, self._sent_bytes_diff) + self.log.info( + "Processed %r journal lines (%r bytes)", + self._sent_lines_diff, + self._sent_bytes_diff, + ) self._sent_lines_diff = 0 self._sent_bytes_diff = 0 self.last_stats_print_time = now @@ -301,7 +336,10 @@ def inc_line_stats(self, *, journal_lines, journal_bytes): sender.refresh_stats() def read_next(self): - jobject = next(self.journald_reader) + if not self.journald_reader: + return None + + jobject: JournalObject = cast(JournalObject, next(self.journald_reader)) if jobject.cursor: self.cursor = jobject.cursor self.last_journal_msg_time = time.monotonic() @@ -324,12 +362,14 @@ def get_reader(self, seek_to=None, reinit=False): journal_flags = reduce( lambda a, b: a | b, - [getattr(systemd.journal, flag.strip()) for flag in journal_flags], + [getattr(journal, flag.strip()) for flag in journal_flags], ) try: reader_kwargs = dict( - files=self.config.get("journal_files"), flags=journal_flags, path=self.config.get("journal_path") + files=self.config.get("journal_files"), + flags=journal_flags, + path=self.config.get("journal_path"), ) namespace = self.config.get("journal_namespace") if namespace: @@ -343,7 +383,12 @@ def get_reader(self, seek_to=None, reinit=False): self.journald_reader = None return None except FileNotFoundError as ex: - self.log.warning("journal for %r not available yet: %s: %s", self.name, ex.__class__.__name__, ex) + self.log.warning( + "journal for %r not available yet: %s: %s", + self.name, + ex.__class__.__name__, + ex, + ) return None if seek_to: @@ -487,7 +532,7 @@ def __init__(self, jobject, reader, pump): self.pump = pump self.reader = reader - def process(self): + def process(self) -> SingleMessageReadResult: new_entry = {} for key, value in self.jobject.entry.items(): if isinstance(value, bytes): @@ -497,24 +542,25 @@ def process(self): if self.jobject.cursor is None: self.log.debug("No more journal entries to read") - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) if self.reader.searches: if not self.reader.perform_searches(self.jobject): - return True + return SingleMessageReadResult(has_more=True, bytes_read=0) if not self.pump.check_match(new_entry): - return True + return SingleMessageReadResult(has_more=True, bytes_read=0) for sender in self.reader.senders.values(): json_entry = self._get_or_generate_json(sender.field_filter, sender.extra_field_values, new_entry) sender.msg_buffer.add_item(item=json_entry, cursor=self.jobject.cursor) + max_bytes = 0 if self.json_objects: max_bytes = max(len(v) for v in self.json_objects.values()) self.reader.inc_line_stats(journal_bytes=max_bytes, journal_lines=1) - return True + return SingleMessageReadResult(has_more=True, bytes_read=max_bytes) def _get_or_generate_json(self, field_filter, extra_field_values, data): ff_name = "" if field_filter is None else field_filter.name @@ -545,10 +591,11 @@ def _truncate_long_message(self, json_entry): error = "too large message {} bytes vs maximum {} bytes".format(len(json_entry), MAX_KAFKA_MESSAGE_SIZE) if not self.error_reported: self.pump.stats.increase( - "journal.read_error", tags=self.pump.make_tags({ + "journal.read_error", + tags=self.pump.make_tags({ "error": "too_long", "reader": self.reader.name, - }) + }), ) self.log.warning("%s: %s ...", error, json_entry[:1024]) self.error_reported = True @@ -560,6 +607,8 @@ def _truncate_long_message(self, json_entry): class JournalPump(ServiceDaemon, Tagged): + _STALE_FD = object() + def __init__(self, config_path): Tagged.__init__(self) self.stats = None @@ -575,6 +624,8 @@ def __init__(self, config_path): self.configure_field_filters() self.configure_readers() self.stale_readers = set() + self.reader_by_fd = {} + self.poll_interval_ms = 1000 def configure_field_filters(self): filters = self.config.get("field_filters", {}) @@ -594,20 +645,29 @@ def configure_readers(self): self.readers = {} state = self.load_state() + for reader_name, reader_config in new_config.items(): reader_state = state.get("readers", {}).get(reader_name, {}) resume_cursor = None for sender_name, sender in reader_state.get("senders", {}).items(): sender_cursor = sender["sent"]["cursor"] if sender_cursor is None: - self.log.info("Sender %r for reader %r needs full sync from beginning", sender_name, reader_name) + self.log.info( + "Sender %r for reader %r needs full sync from beginning", + sender_name, + reader_name, + ) resume_cursor = None break # TODO: pick the OLDEST cursor resume_cursor = sender_cursor - self.log.info("Reader %r resuming from cursor position: %r", reader_name, resume_cursor) + self.log.info( + "Reader %r resuming from cursor position: %r", + reader_name, + resume_cursor, + ) initial_position = reader_config.get("initial_position") reader = JournalReader( name=reader_name, @@ -616,6 +676,7 @@ def configure_readers(self): geoip=self.geoip, stats=self.stats, msg_buffer_max_length=self.config.get("msg_buffer_max_length", 50000), + msg_buffer_max_bytes=self.config.get("msg_buffer_max_bytes", _5_MB), seek_to=resume_cursor, initial_position=initial_position, tags=self.make_tags(), @@ -652,7 +713,7 @@ def shutdown(self): for reader in self.readers.values(): reader.request_stop() - reader.unregister_from_poll(self.poller) + self.unregister_from_poll(reader) self.stale_readers.add(reader) def sigterm(self, signum, frame): @@ -677,31 +738,53 @@ def check_match(self, entry): return True return False - def read_single_message(self, reader): + def read_single_message(self, reader) -> SingleMessageReadResult: try: - jobject = reader.read_next() + jobject: Optional[JournalObject] = reader.read_next() if jobject is None or jobject.entry is None: - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) return JournalObjectHandler(jobject, reader, self).process() except StopIteration: self.log.debug("No more journal entries to read") - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) except Exception as ex: # pylint: disable=broad-except self.log.exception("Unexpected exception while handling entry for %s", reader.name) self.stats.unexpected_exception(ex=ex, where="mainloop", tags=self.make_tags({"app": "journalpump"})) time.sleep(0.5) - return False + return SingleMessageReadResult(has_more=False, bytes_read=0) - def read_all_available_messages(self, reader, hits): + def read_messages(self, reader, hits, chunk_size) -> MessagesReadResult: lines = 0 - while self.read_single_message(reader): + + exhausted = False + + limit_bytes = reader.get_write_limit_bytes() + limit_count = reader.get_write_limit_message_count() + + for _ in range(chunk_size): + if limit_bytes <= 0: + return MessagesReadResult(exhausted=False, lines_read=lines) + + if limit_count <= 0: + return MessagesReadResult(exhausted=False, lines_read=lines) + + has_more, bytes_read = self.read_single_message(reader) + + if bytes_read: + limit_bytes -= bytes_read + limit_count -= 1 + + if not has_more: + exhausted = True + break + lines += 1 for search in reader.searches: hits[search["name"]] = search.get("hits", 0) - return lines + return MessagesReadResult(exhausted=exhausted, lines_read=lines) def get_state_file_path(self): return self.config.get("json_state_file_path") @@ -727,28 +810,92 @@ def _close_stale_readers(self): while self.stale_readers: reader = self.stale_readers.pop() reader.close() + self.unregister_from_poll(reader) + + def register_for_poll(self, reader: JournalReader) -> bool: + fd = reader.fileno() + # fd in this case is anonymous inotify node + if fd is not None and fd not in self.reader_by_fd: + self.log.info("Registered reader %r with fd %r", self.name, fd) + self.poller.register(fd) + self.reader_by_fd[fd] = reader + return True + + return False + + def unregister_from_poll(self, reader: JournalReader) -> None: + fd = reader.fileno() + if fd is not None and fd in self.reader_by_fd: + self.poller.unregister(fd) + self.reader_by_fd[fd] = self._STALE_FD + self.log.info("Unregistered reader %r with fd %r", self.name, fd) def run(self): last_stats_time = 0 + poll_timeout = 0 + buffered_events = {} + while self.running: self._close_stale_readers() - results = self.poller.poll(1000) + self.log.debug("Waiting for %dms", poll_timeout) + results = self.poller.poll(poll_timeout) + iteration_start_time = time.monotonic_ns() + hits = {} lines = 0 - for fd, _event in results: - for reader in self.readers.values(): - jdr = reader.journald_reader - if not jdr or fd != jdr.fileno(): - continue - if jdr.process() == systemd.journal.APPEND: - lines += self.read_all_available_messages(reader, hits) - break - else: + # We keep valid events in case reader is busy with processing + # previous batch and not ready to act on new one + + for fd, _ in results: + reader = self.reader_by_fd.get(fd) + if reader is self._STALE_FD: + continue + + if reader is None: self.log.error("Could not find reader with fd %r", fd) + continue + + # Call to process clears state, consecutive calls won't return same value + process_evt = reader.journald_reader.process() + # Can be either 0 - journal.NOP, 1 - journal.APPEND or 2 - journal.INVALIDATE + current_evt = buffered_events.get(reader, journal.NOP) + buffered_events[reader] = max(current_evt, process_evt) + + # Remove stale reader, it should happen after we processed + # batch of events received from poll, because we could + # have received stale fds + for key, value in list(self.reader_by_fd.items()): + if value is self._STALE_FD: + del self.reader_by_fd[key] + + for reader, event in list(buffered_events.items()): + if not reader.is_ready: + continue + + # Read messages up to set reader limit + exhausted = False + if event in [journal.APPEND, journal.INVALIDATE]: + exhausted, line_count = self.read_messages(reader, hits, chunk_size=5000) + lines += line_count + + if event == journal.INVALIDATE: + self.unregister_from_poll(reader) + # If file was deleted we could lose some unread messages here + # but if we are so behind that we reading tail file, we probably + # will never catch up and we want to avoid holding descriptors to + # deleted files + reader.invalidate() + + # Only delete set event when we know that all messages were drained + if exhausted: + buffered_events.pop(reader) for reader in self.readers.values(): - reader.update_poll_registration_status(self.poller) + reader.update_status() + if self.register_for_poll(reader): + # Check if there is something to read in newly created reader + buffered_events[reader] = journal.APPEND if hits and time.monotonic() - last_stats_time > 60.0: self.log.info("search hits stats: %s", hits) @@ -767,6 +914,7 @@ def run(self): self.log.debug("No new journal lines received") self.ping_watchdog() + poll_timeout = max(0, self.poll_interval_ms - (time.monotonic_ns() - iteration_start_time) // 1000) self._close_stale_readers() diff --git a/journalpump/senders/base.py b/journalpump/senders/base.py index 02142d4..bd8d25b 100644 --- a/journalpump/senders/base.py +++ b/journalpump/senders/base.py @@ -2,6 +2,7 @@ import logging import random +import sys import time KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30 @@ -45,6 +46,7 @@ def __init__(self): self.total_size = 0 self.last_journal_msg_time = time.monotonic() self.cursor = None + self.buffer_size = 0 def __len__(self): return len(self.messages) @@ -55,6 +57,7 @@ def get_items(self): if self.messages: messages = self.messages self.messages = [] + self.buffer_size = 0 return messages def add_item(self, *, item, cursor): @@ -62,6 +65,7 @@ def add_item(self, *, item, cursor): self.messages.append((item, cursor)) self.last_journal_msg_time = time.monotonic() self.cursor = cursor + self.buffer_size += sys.getsizeof(item) self.entry_num += 1 self.total_size += len(item) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..534a692 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +log_format = %(asctime)s.%(msecs)03d %(levelname)s %(message)s +log_date_format = %Y-%m-%d %H:%M:%S diff --git a/scripts/generate_logs.py b/scripts/generate_logs.py new file mode 100644 index 0000000..6168b60 --- /dev/null +++ b/scripts/generate_logs.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import multiprocessing +import os +import pathlib +import shutil +import socket +import subprocess +import sys +import tempfile +import time +from typing import Optional, Dict + +MESSAGE_DEFAULTS: Dict[str, str] = {"PRIORITY": "6", "SYSLOG_IDENTIFIER": "journald-gen-logs"} + + +def _encode_message(data: Dict[str, str]) -> bytes: + """Encode to journald native protocol message. + + >>> _encode_message({"MEESAGE": "Something happened"}) + b"PRIORITY=6\nSYSLOG_IDENTIFIER=journald-gen-logs\nMESSAGE=Something happened.\n" + """ + message = MESSAGE_DEFAULTS.copy() + message.update(data) + + result = [] + for key, value in message.items(): + result.append(b"%s=%s" % (key.encode("utf-8"), str(value).encode("utf-8"))) + + return b"\n".join(result) + b"\n" + + +def _message_sender(uid: int, message_socket_path: str, queue: multiprocessing.JoinableQueue): + """Send messages to journald using native protocol. + + NB. Message send in a separate process to be able to write to the socket as non-root user. + and get user-.journal entries instead of just syslog.journal ones + """ + os.setuid(uid) + + s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + s.connect(message_socket_path) + + while msg := queue.get(): + s.sendall(_encode_message(msg)) + queue.task_done() + + # ACK None + queue.task_done() + + +class JournalControlProcess: + CONTROL_SOCKET_NAME = "io.systemd.journal" + MESSAGE_SOCKET_NAME = "socket" + JOURNALD_BIN = "/usr/lib/systemd/systemd-journald" + + def __init__(self, *, logs_dir: pathlib.Path, uid: int) -> None: + self._logs_dir: pathlib.Path = logs_dir + self._runtime_dir: Optional[pathlib.Path] = None + self._journald_process: Optional[subprocess.Popen] = None + self._sender_process: Optional[multiprocessing.Process] = None + self._sender_queue = multiprocessing.JoinableQueue() + self._uid = uid + + @property + def _control_socket_path(self) -> str: + assert self._runtime_dir + return str(self._runtime_dir / self.CONTROL_SOCKET_NAME) + + @property + def _message_socket_path(self) -> str: + assert self._runtime_dir + return str(self._runtime_dir / self.MESSAGE_SOCKET_NAME) + + def _start_journald(self) -> subprocess.Popen: + assert self._runtime_dir + + environment = { + "LOGS_DIRECTORY": self._logs_dir, + "RUNTIME_DIRECTORY": self._runtime_dir, + } + journald_process = subprocess.Popen([self.JOURNALD_BIN, "test"], + env=environment, + stdout=sys.stdout, + stderr=sys.stdout) + + cur = time.monotonic() + deadline = cur + 3 + + while cur < deadline: + files = {f.name for f in self._runtime_dir.iterdir()} + if self.CONTROL_SOCKET_NAME in files and self.MESSAGE_SOCKET_NAME in files: + break + time.sleep(0.1) + cur = time.monotonic() + + return journald_process + + def rotate(self) -> None: + """Ask journald to rotate logs and wait for the result.""" + assert self._journald_process + + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(self._control_socket_path) + s.sendall(b'{"method": "io.systemd.Journal.Synchronize"}\0') + s.recv(100) + s.sendall(b'{"method": "io.systemd.Journal.Rotate"}\0') + s.recv(100) + + def send_message(self, message: Dict[str, str]) -> None: + """Send message to journald.""" + assert self._sender_process + + self._sender_queue.put(message) + self._sender_queue.join() + + def _start_sender_processs(self) -> multiprocessing.Process: + sender_process = multiprocessing.Process( + target=_message_sender, args=(self._uid, self._message_socket_path, self._sender_queue) + ) + sender_process.start() + return sender_process + + def __enter__(self) -> JournalControlProcess: + self._runtime_dir = pathlib.Path(tempfile.mkdtemp(prefix="journald_runtime_")) + os.chown(self._runtime_dir, self._uid, -1) + + self._journald_process = self._start_journald() + self._sender_process = self._start_sender_processs() + + return self + + def __exit__(self, *args) -> None: + assert self._runtime_dir + assert self._journald_process + assert self._sender_process + + self._sender_queue.put(None) + self._sender_process.join(timeout=3) + + self._journald_process.terminate() + self._journald_process.wait(timeout=3) + + shutil.rmtree(self._runtime_dir) + + +_PARSER = argparse.ArgumentParser( + usage="""Genrate journald log files. +This program reads messages from stdin in following format + +msg Test 1 +msg {"MESSAGE": "Test 1"} +rotate +msg Test 2 + +msg command argument be either plain message or json object +rotate command invokes journald rotation +""" +) +_PARSER.add_argument('--uid', type=int, default=1000, help='user id of log sender') + + +def main(): + args = _PARSER.parse_args() + + if os.geteuid() != 0: + raise Exception("Should be run as a root user to be able to rotate") + + logs_dir = pathlib.Path(tempfile.mkdtemp(prefix="journald_logs_")) + uid = args.uid + os.chown(logs_dir, uid, -1) + + with JournalControlProcess(logs_dir=logs_dir, uid=uid) as journald_process: + + while entry := input(): + action, *args = entry.strip().split(" ", 1) + + if action == "rotate": + journald_process.rotate() + + elif action == "msg": + if len(args) != 1: + raise ValueError(f"Not enough args for msg {args}") + + msg = args[0].strip() + + if msg.startswith("{"): + msg = json.loads(msg) + else: + msg = {"MESSAGE": msg} + + journald_process.send_message(msg) + + print(f"Logs avaialble in {logs_dir} directory") + print("To see generated logs use following command:") + print(f"journalctl -D {logs_dir}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/systest/__init__.py b/systest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/systest/data/rotated_logs/rotated_logs.txt b/systest/data/rotated_logs/rotated_logs.txt new file mode 100644 index 0000000..2b1bca7 --- /dev/null +++ b/systest/data/rotated_logs/rotated_logs.txt @@ -0,0 +1,45 @@ +msg Message 0 +msg Message 1 +msg Message 2 +msg Message 3 +msg Message 4 +msg Message 5 +msg Message 6 +msg Message 7 +msg Message 8 +msg Message 9 +rotate +msg Message 10 +msg Message 11 +msg Message 12 +msg Message 13 +msg Message 14 +msg Message 15 +msg Message 16 +msg Message 17 +msg Message 18 +msg Message 19 +rotate +msg Message 20 +msg Message 21 +msg Message 22 +msg Message 23 +msg Message 24 +msg Message 25 +msg Message 26 +msg Message 27 +msg Message 28 +msg Message 29 +rotate +msg Message 30 +msg Message 31 +msg Message 32 +msg Message 33 +msg Message 34 +msg Message 35 +msg Message 36 +msg Message 37 +msg Message 38 +msg Message 39 +rotate + diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000003-0005d4206c27cc3b.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000003-0005d4206c27cc3b.journal new file mode 100644 index 0000000..2c50b08 Binary files /dev/null and b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000003-0005d4206c27cc3b.journal differ diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-000000000000000e-0005d4206c28150a.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-000000000000000e-0005d4206c28150a.journal new file mode 100644 index 0000000..c367254 Binary files /dev/null and b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-000000000000000e-0005d4206c28150a.journal differ diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000019-0005d4206c285fc5.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000019-0005d4206c285fc5.journal new file mode 100644 index 0000000..da083b8 Binary files /dev/null and b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000019-0005d4206c285fc5.journal differ diff --git a/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000024-0005d4206c28a26f.journal b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000024-0005d4206c28a26f.journal new file mode 100644 index 0000000..7a82228 Binary files /dev/null and b/systest/data/rotated_logs/user-1000@e12db0190c834c7db506c5753bf5e469-0000000000000024-0005d4206c28a26f.journal differ diff --git a/systest/test_rotation.py b/systest/test_rotation.py new file mode 100644 index 0000000..8b1526f --- /dev/null +++ b/systest/test_rotation.py @@ -0,0 +1,325 @@ +from .util import journalpump_initialized +from journalpump.journalpump import JournalPump, JournalReader, PumpReader +from journalpump.senders.base import MsgBuffer +from pathlib import Path +from typing import Callable, Dict, List, Optional + +import json +import logging +import os +import pytest +import shutil +import subprocess +import threading +import time + +_LOG = logging.getLogger(__name__) + + +class LogFiles: + """Emulates rotation of log files. + + journald rotates files using following algorithm: + * Move user-1000.journal -> user-1000@....journal + * Create new user-1000.journal + * Delete old file + """ + + def __init__(self, destination: Path) -> None: + orig_path = Path(__file__).parent / "data" / "rotated_logs" + self._orig_log_files: List[Path] = list(reversed(sorted(orig_path.glob("*.journal")))) + self._rotate_to: Optional[Path] = None + self._destination: Path = destination + self._current_log_files: List[Path] = [] + + @property + def log_files(self): + return self._current_log_files.copy() + + def rotate(self): + log_file = self._orig_log_files.pop() + tail_file = self._destination / "user-1000.journal" + + if self._rotate_to: + shutil.move(tail_file, self._rotate_to) + _LOG.info("Moved %s log file to %s", tail_file, self._rotate_to) + self._current_log_files.insert(1, self._rotate_to) + else: + assert not self._current_log_files + self._current_log_files.insert(0, self._destination / "user-1000.journal") + + dst_path = self._destination / "user-1000.journal" + _LOG.info("Added %s log file", dst_path) + shutil.copyfile(log_file, dst_path) + self._rotate_to = self._destination / log_file.name + + def remove(self, *, last: int): + for _ in range(last): + log_file = self._current_log_files.pop() + log_file.unlink() + _LOG.info("Removed %s log file", log_file) + if not self._current_log_files: + self._rotate_to = None + + +def test_log_rotator(tmp_path): + log_path = tmp_path / "logs" + log_path.mkdir() + log_file_handler = LogFiles(log_path) + + assert len(list(log_path.glob("*"))) == 0 + + log_file_handler.rotate() + + log_files = [p.name for p in log_path.glob("*")] + assert len(log_files) == 1 + assert "user-1000.journal" in log_files + + log_file_handler.rotate() + + log_files = [p.name for p in log_path.glob("*")] + assert len(log_files) == 2 + assert "user-1000.journal" in log_files + + log_file_handler.remove(last=1) + + log_files = [p.name for p in log_path.glob("*")] + assert len(log_files) == 1 + assert "user-1000.journal" in log_files + + +class _MsgBuffer(MsgBuffer): + """Wrapper around MsgBuffer which allows to wait for messages.""" + + def __init__(self) -> None: + super().__init__() + self.has_messages = threading.Event() + self.wait_threshold: Optional[int] = None + + def add_item(self, *, item, cursor): + super().add_item(item=item, cursor=cursor) + if self.wait_threshold and len(self.messages) >= self.wait_threshold: + self.has_messages.set() + + def get_items(self): + res = super().get_items() + self.wait_threshold = None + self.has_messages.clear() + return res + + def set_threshold(self, th: int) -> None: + self.wait_threshold = th + if len(self.messages) >= self.wait_threshold: + self.has_messages.set() + + +class StubSender: + field_filter = None + extra_field_values = None + + def __init__(self, *args, **kwargs): # pylint: disable=unused-argument + self.msg_buffer = _MsgBuffer() + + def start(self): + pass + + def request_stop(self): + pass + + def refresh_stats(self, *args, **kwargs): # pylint: disable=unused-argument + pass + + def __call__(self, *args, **kwargs): # pylint: disable=unused-argument + return self + + def get_messages(self, count: int, timeout: int): + self.msg_buffer.set_threshold(count) + assert self.msg_buffer.has_messages.wait(timeout), f"Timeout. Total messages received: {len(self.msg_buffer)}" + + messages = self.msg_buffer.get_items() + return [json.loads(m[0]) for m in messages] + + +@pytest.fixture(name="journal_log_dir") +def fixture_journal_log_dir(tmp_path): + log_path = tmp_path / "logs" + log_path.mkdir() + return log_path + + +@pytest.fixture(name="journalpump_factory") +def fixture_journalpump_factory(mocker, tmp_path, journal_log_dir): + pump_thread = None + pump = None + + def _start_journalpump(sender, *, pump_conf=None): + nonlocal pump_thread + nonlocal pump + pump_conf = pump_conf or {} + + mocker.patch.object(PumpReader, "has_persistent_files", return_value=True) + mocker.patch.object(PumpReader, "has_runtime_files", return_value=True) + mocker.patch.object(JournalReader, "sender_classes", {"stub_sender": sender}) + + config_path = tmp_path / "journalpump.json" + with open(config_path, "w") as fp: + json.dump( + { + "readers": { + "my-reader": { + "journal_path": str(journal_log_dir), + "initial_position": "head", + "senders": { + "test-sender": { + "output_type": "stub_sender" + }, + }, + "searches": [{ + "fields": { + "MESSAGE": "Message.*", + }, + "name": "test-messages", + }], + }, + }, + **pump_conf, + }, + fp, + ) + + pump = JournalPump(str(config_path)) + pump.poll_interval_ms = 100 + pump_thread = threading.Thread(target=pump.run) + pump_thread.start() + assert journalpump_initialized(pump) + return pump + + yield _start_journalpump + + if pump_thread and pump: + pump.running = False + pump_thread.join(timeout=3) + + +def test_journalpump_rotated_files(journalpump_factory, journal_log_dir): + stub_sender = StubSender() + journalpump_factory(stub_sender) + lf = LogFiles(journal_log_dir) + lf.rotate() + messages = stub_sender.get_messages(10, timeout=3) + assert set(m["MESSAGE"] for m in messages) == {f"Message {i}" for i in range(0, 10)} + + lf.rotate() + lf.rotate() + + messages = stub_sender.get_messages(20, timeout=3) + assert set(m["MESSAGE"] for m in messages) == {f"Message {i}" for i in range(10, 30)} + + +@pytest.mark.parametrize("msg_buffer_max_length", [3, 5, 10]) +def test_journalpump_rotated_files_threshold(journalpump_factory, journal_log_dir, msg_buffer_max_length): + stub_sender = StubSender() + + pump = journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_length": msg_buffer_max_length}) + + lf = LogFiles(journal_log_dir) + lf.rotate() + + reader: JournalReader = next(iter(pump.readers.values())) + assert reader.msg_buffer_max_length == msg_buffer_max_length + + lf.rotate() + lf.rotate() + + for it in range(30 // msg_buffer_max_length): + messages = stub_sender.get_messages(msg_buffer_max_length, timeout=3) + expected = {f"Message {i}" for i in range(it * msg_buffer_max_length, msg_buffer_max_length * (it + 1))} + assert len(messages) == msg_buffer_max_length + assert set(m["MESSAGE"] for m in messages) == expected + + +@pytest.mark.parametrize("size,num_messages", [(50, 1), (1000, 2)]) +def test_journalpump_rotated_files_threshold_bytes(journalpump_factory, journal_log_dir, size, num_messages): + stub_sender = StubSender() + + pump = journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_bytes": size}) + + lf = LogFiles(journal_log_dir) + lf.rotate() + lf.rotate() + lf.rotate() + + reader: JournalReader = next(iter(pump.readers.values())) + assert reader.msg_buffer_max_bytes == size + + for it in range(30 // num_messages): + messages = stub_sender.get_messages(num_messages, timeout=3) + expected = {f"Message {i}" for i in range(it * num_messages, num_messages * (it + 1))} + assert len(messages) == num_messages + assert set(m["MESSAGE"] for m in messages) == expected + + +def _lsof_is_file_open(filenames: List[str]) -> Dict[str, bool]: + """Check if file is open using lsof""" + # psutil doesn't show deleted files, but this exactly what we want to test + output = (subprocess.check_output(["lsof", "-p", str(os.getpid()), "-w"]).decode().split("\n")) + result = {fn: False for fn in filenames} + for line in output: + for fn in result: + if fn not in line: + continue + result[fn] = True + return result + + +def _wait_for(predicate: Callable[[], bool], timeout: int): + cur = time.monotonic() + deadline = cur + timeout + + while cur < deadline: + if predicate(): + return True + + cur = time.monotonic() + time.sleep(0.2) + + return False + + +@pytest.mark.skipif(not shutil.which("lsof"), reason="lsof is not available") +def test_journalpump_rotated_files_deletion(journalpump_factory, journal_log_dir): + stub_sender = StubSender() + journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_length": 1}) + + def _get_message(): + messages = stub_sender.get_messages(1, timeout=3) + assert len(messages) == 1 + return messages[0]["MESSAGE"] + + lf = LogFiles(journal_log_dir) + lf.rotate() + + assert _get_message() == "Message 0" + + lf.rotate() + + # Loop once to get new files open + assert _get_message() == "Message 1" + + log_files = [str(f) for f in lf.log_files] + assert len(log_files) == 2 + + assert _wait_for(lambda: all(_lsof_is_file_open(log_files).values()), timeout=3) + + lf.remove(last=1) + + # Took buffered message + # Dropped messages 3-9 as file was deleted + assert _get_message() in ["Message 2", "Message 10"] + assert _get_message() in ["Message 10", "Message 11"] + + def _only_head_open(): + open_files = _lsof_is_file_open(log_files) + return open_files[log_files[0]] and not open_files[log_files[-1]] + + assert _wait_for(_only_head_open, timeout=3), f"Expected {log_files[-1]} to not be open" diff --git a/systest/test_rsyslog.py b/systest/test_rsyslog.py index 88c03cd..1bfa7a4 100644 --- a/systest/test_rsyslog.py +++ b/systest/test_rsyslog.py @@ -3,6 +3,7 @@ # This file is under the Apache License, Version 2.0. # See the file `LICENSE` for details. +from .util import journalpump_initialized from journalpump.journalpump import JournalPump from subprocess import Popen from time import sleep @@ -68,21 +69,6 @@ def stop(self): self.process = None -def _journalpump_initialized(journalpump): - retry = 0 - senders = [] - while retry < 3 and not senders: - sleep(1) - readers = [reader for _, reader in journalpump.readers.items()] - senders = [] - if readers: - for reader in readers: - senders.extend([sender for _, sender in reader.senders.items()]) - retry += 1 - - return journalpump.running and senders - - def _run_pump_test(*, config_path, logfile): journalpump = None threads = [] @@ -92,7 +78,7 @@ def _run_pump_test(*, config_path, logfile): pump.start() threads.append(pump) - assert _journalpump_initialized(journalpump), "Failed to initialize journalpump" + assert journalpump_initialized(journalpump), "Failed to initialize journalpump" identifier = "".join(random.sample(string.ascii_uppercase + string.digits, k=8)) logger = logging.getLogger("rsyslog-tester") logger.info("Info message for %s", identifier) diff --git a/systest/util.py b/systest/util.py new file mode 100644 index 0000000..dca0846 --- /dev/null +++ b/systest/util.py @@ -0,0 +1,16 @@ +from time import sleep + + +def journalpump_initialized(journalpump): + retry = 0 + senders = [] + while retry < 3 and not senders: + sleep(1) + readers = [reader for _, reader in journalpump.readers.items()] + senders = [] + if readers: + for reader in readers: + senders.extend([sender for _, sender in reader.senders.items()]) + retry += 1 + + return journalpump.running and senders diff --git a/test/test_journalpump.py b/test/test_journalpump.py index 6ea10cd..586c777 100644 --- a/test/test_journalpump.py +++ b/test/test_journalpump.py @@ -379,7 +379,7 @@ def setUp(self): def test_filtered_processing(self): jobject = JournalObject(entry=OrderedDict(a=1, b=2, c=3, REALTIME_TIMESTAMP=1), cursor=10) handler = JournalObjectHandler(jobject, self.reader, self.pump) - assert handler.process() is True + assert handler.process()[0] is True sender_a_msgs = [(json.loads(msg.decode("utf-8")), cursor) for msg, cursor in self.sender_a.msg_buffer.messages] assert ({"a": 1}, 10) in sender_a_msgs sender_b_msgs = [(json.loads(msg.decode("utf-8")), cursor) for msg, cursor in self.sender_b.msg_buffer.messages] @@ -397,7 +397,7 @@ def test_too_large_data(self): too_large = OrderedDict(a=1, b="x" * MAX_KAFKA_MESSAGE_SIZE) jobject = JournalObject(entry=too_large, cursor=10) handler = JournalObjectHandler(jobject, self.reader, self.pump) - assert handler.process() is True + assert handler.process()[0] is True sender_a_msgs = [(json.loads(msg.decode("utf-8")), cursor) for msg, cursor in self.sender_a.msg_buffer.messages] assert ({"a": 1}, 10) in sender_a_msgs assert "too large message" in str(self.sender_b.msg_buffer.messages)