Skip to content

Commit

Permalink
Recreate reader if journal files is deleted
Browse files Browse the repository at this point in the history
To allow this changes we always keep journal reader registered with
poller. For every even received we keep a lookup table of said events,
that should be processed at later stage.

There are 2 relevant types of events:
* journal.APPEND - when lines are added to the end of the file
* journal.INVALIDATE - log file rotated or removed

For APPEND event we need to read all the data in the log until it's
exhauisted

For INVALIDATE event data should be also read until the end and the
reader should be recreated to release fds of deleted files.

This fixes resource leak which happened if log file was first
renamed and then removed, which usually is the case when rotation
happens.
  • Loading branch information
Maksim Novikov committed Jan 4, 2022
1 parent 189f5d9 commit f075283
Show file tree
Hide file tree
Showing 14 changed files with 814 additions and 83 deletions.
278 changes: 213 additions & 65 deletions journalpump/journalpump.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions journalpump/senders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import random
import sys
import time

KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self):
self.total_size = 0
self.last_journal_msg_time = time.monotonic()
self.cursor = None
self.buffer_size = 0

def __len__(self):
return len(self.messages)
Expand All @@ -55,13 +57,15 @@ def get_items(self):
if self.messages:
messages = self.messages
self.messages = []
self.buffer_size = 0
return messages

def add_item(self, *, item, cursor):
with self.lock: # pylint: disable=not-context-manager
self.messages.append((item, cursor))
self.last_journal_msg_time = time.monotonic()
self.cursor = cursor
self.buffer_size += sys.getsizeof(item)

self.entry_num += 1
self.total_size += len(item)
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
log_format = %(asctime)s.%(msecs)03d %(levelname)s %(message)s
log_date_format = %Y-%m-%d %H:%M:%S
204 changes: 204 additions & 0 deletions scripts/generate_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import multiprocessing
import os
import pathlib
import shutil
import socket
import subprocess
import sys
import tempfile
import time
from typing import Optional, Dict

MESSAGE_DEFAULTS: Dict[str, str] = {"PRIORITY": "6", "SYSLOG_IDENTIFIER": "journald-gen-logs"}


def _encode_message(data: Dict[str, str]) -> bytes:
"""Encode to journald native protocol message.
>>> _encode_message({"MEESAGE": "Something happened"})
b"PRIORITY=6\nSYSLOG_IDENTIFIER=journald-gen-logs\nMESSAGE=Something happened.\n"
"""
message = MESSAGE_DEFAULTS.copy()
message.update(data)

result = []
for key, value in message.items():
result.append(b"%s=%s" % (key.encode("utf-8"), str(value).encode("utf-8")))

return b"\n".join(result) + b"\n"


def _message_sender(uid: int, message_socket_path: str, queue: multiprocessing.JoinableQueue):
"""Send messages to journald using native protocol.
NB. Message send in a separate process to be able to write to the socket as non-root user.
and get user-<uid>.journal entries instead of just syslog.journal ones
"""
os.setuid(uid)

s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect(message_socket_path)

while msg := queue.get():
s.sendall(_encode_message(msg))
queue.task_done()

# ACK None
queue.task_done()


class JournalControlProcess:
CONTROL_SOCKET_NAME = "io.systemd.journal"
MESSAGE_SOCKET_NAME = "socket"
JOURNALD_BIN = "/usr/lib/systemd/systemd-journald"

def __init__(self, *, logs_dir: pathlib.Path, uid: int) -> None:
self._logs_dir: pathlib.Path = logs_dir
self._runtime_dir: Optional[pathlib.Path] = None
self._journald_process: Optional[subprocess.Popen] = None
self._sender_process: Optional[multiprocessing.Process] = None
self._sender_queue = multiprocessing.JoinableQueue()
self._uid = uid

@property
def _control_socket_path(self) -> str:
assert self._runtime_dir
return str(self._runtime_dir / self.CONTROL_SOCKET_NAME)

@property
def _message_socket_path(self) -> str:
assert self._runtime_dir
return str(self._runtime_dir / self.MESSAGE_SOCKET_NAME)

def _start_journald(self) -> subprocess.Popen:
assert self._runtime_dir

environment = {
"LOGS_DIRECTORY": self._logs_dir,
"RUNTIME_DIRECTORY": self._runtime_dir,
}
journald_process = subprocess.Popen([self.JOURNALD_BIN, "test"],
env=environment,
stdout=sys.stdout,
stderr=sys.stdout)

cur = time.monotonic()
deadline = cur + 3

while cur < deadline:
files = {f.name for f in self._runtime_dir.iterdir()}
if self.CONTROL_SOCKET_NAME in files and self.MESSAGE_SOCKET_NAME in files:
break
time.sleep(0.1)
cur = time.monotonic()

return journald_process

def rotate(self) -> None:
"""Ask journald to rotate logs and wait for the result."""
assert self._journald_process

s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(self._control_socket_path)
s.sendall(b'{"method": "io.systemd.Journal.Synchronize"}\0')
s.recv(100)
s.sendall(b'{"method": "io.systemd.Journal.Rotate"}\0')
s.recv(100)

def send_message(self, message: Dict[str, str]) -> None:
"""Send message to journald."""
assert self._sender_process

self._sender_queue.put(message)
self._sender_queue.join()

def _start_sender_processs(self) -> multiprocessing.Process:
sender_process = multiprocessing.Process(
target=_message_sender, args=(self._uid, self._message_socket_path, self._sender_queue)
)
sender_process.start()
return sender_process

def __enter__(self) -> JournalControlProcess:
self._runtime_dir = pathlib.Path(tempfile.mkdtemp(prefix="journald_runtime_"))
os.chown(self._runtime_dir, self._uid, -1)

self._journald_process = self._start_journald()
self._sender_process = self._start_sender_processs()

return self

def __exit__(self, *args) -> None:
assert self._runtime_dir
assert self._journald_process
assert self._sender_process

self._sender_queue.put(None)
self._sender_process.join(timeout=3)

self._journald_process.terminate()
self._journald_process.wait(timeout=3)

shutil.rmtree(self._runtime_dir)


_PARSER = argparse.ArgumentParser(
usage="""Genrate journald log files.
This program reads messages from stdin in following format
msg Test 1
msg {"MESSAGE": "Test 1"}
rotate
msg Test 2
msg command argument be either plain message or json object
rotate command invokes journald rotation
"""
)
_PARSER.add_argument('--uid', type=int, default=1000, help='user id of log sender')


def main():
args = _PARSER.parse_args()

if os.geteuid() != 0:
raise Exception("Should be run as a root user to be able to rotate")

logs_dir = pathlib.Path(tempfile.mkdtemp(prefix="journald_logs_"))
uid = args.uid
os.chown(logs_dir, uid, -1)

with JournalControlProcess(logs_dir=logs_dir, uid=uid) as journald_process:

while entry := input():
action, *args = entry.strip().split(" ", 1)

if action == "rotate":
journald_process.rotate()

elif action == "msg":
if len(args) != 1:
raise ValueError(f"Not enough args for msg {args}")

msg = args[0].strip()

if msg.startswith("{"):
msg = json.loads(msg)
else:
msg = {"MESSAGE": msg}

journald_process.send_message(msg)

print(f"Logs avaialble in {logs_dir} directory")
print("To see generated logs use following command:")
print(f"journalctl -D {logs_dir}")
return 0


if __name__ == "__main__":
sys.exit(main())
Empty file added systest/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions systest/data/rotated_logs/rotated_logs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
msg Message 0
msg Message 1
msg Message 2
msg Message 3
msg Message 4
msg Message 5
msg Message 6
msg Message 7
msg Message 8
msg Message 9
rotate
msg Message 10
msg Message 11
msg Message 12
msg Message 13
msg Message 14
msg Message 15
msg Message 16
msg Message 17
msg Message 18
msg Message 19
rotate
msg Message 20
msg Message 21
msg Message 22
msg Message 23
msg Message 24
msg Message 25
msg Message 26
msg Message 27
msg Message 28
msg Message 29
rotate
msg Message 30
msg Message 31
msg Message 32
msg Message 33
msg Message 34
msg Message 35
msg Message 36
msg Message 37
msg Message 38
msg Message 39
rotate

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit f075283

Please sign in to comment.