From 3943b5050d0149c33cecc88d46bc906a2b1d7595 Mon Sep 17 00:00:00 2001 From: tosuke <13393900+tosuke@users.noreply.github.com> Date: Thu, 18 Jul 2024 04:00:37 +0900 Subject: [PATCH] [pkg/stanza] parse Journal JSON export format --- pkg/stanza/operator/input/journald/input.go | 58 +++++++++++++++++++ .../operator/input/journald/input_test.go | 3 +- 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/pkg/stanza/operator/input/journald/input.go b/pkg/stanza/operator/input/journald/input.go index f7af8c3ad6e5..4ffe4dc09723 100644 --- a/pkg/stanza/operator/input/journald/input.go +++ b/pkg/stanza/operator/input/journald/input.go @@ -16,6 +16,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" jsoniter "github.com/json-iterator/go" "go.uber.org/zap" @@ -175,6 +176,9 @@ func (operator *Input) parseJournalEntry(line []byte) (*entry.Entry, string, err if err != nil { return nil, "", err } + if err := parseJournalJSONObject(&body); err != nil { + return nil, "", err + } timestamp, ok := body["__REALTIME_TIMESTAMP"] if !ok { @@ -220,3 +224,57 @@ func (operator *Input) Stop() error { operator.wg.Wait() return nil } + +func parseJournalJSONObject(o *map[string]any) error { + if o == nil { + return nil + } + for k, v := range *o { + if s, ok := v.([]any); ok { + newV, err := parseJournalJSONArray(s) + if err != nil { + return err + } + (*o)[k] = newV + } + } + return nil +} + +func parseJournalJSONArray(s []any) (any, error) { + if str, ok := parseJournalBinaryString(s); ok { + return str, nil + } + res := make([]any, 0, len(s)) + for _, v := range s { + if vs, ok := v.([]any); ok { + newV, err := parseJournalJSONArray(vs) + if err != nil { + return nil, err + } + res = append(res, newV) + } else { + res = append(res, v) + } + } + return res, nil +} + +func parseJournalBinaryString(s []any) (string, bool) { + b := make([]byte, len(s)) + for i, v := range s { + f, ok := v.(float64) + if !ok { + return "", false + } + c := byte(f) + if float64(c) != f { + return "", false + } + b[i] = c + } + if !utf8.Valid(b) { + return "", false + } + return string(b), true +} diff --git a/pkg/stanza/operator/input/journald/input_test.go b/pkg/stanza/operator/input/journald/input_test.go index 86c7319a9381..2d8b1d4f0887 100644 --- a/pkg/stanza/operator/input/journald/input_test.go +++ b/pkg/stanza/operator/input/journald/input_test.go @@ -33,7 +33,7 @@ func (f *fakeJournaldCmd) Start() error { } func (f *fakeJournaldCmd) StdoutPipe() (io.ReadCloser, error) { - response := `{ "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", "_TRANSPORT": "journal", "_UID": "1000", "_EXE": "/usr/lib/systemd/systemd", "_AUDIT_LOGINUID": "1000", "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", "_PID": "13894", "_CMDLINE": "/lib/systemd/systemd --user", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", "_SELINUX_CONTEXT": "unconfined\n", "CODE_FUNC": "unit_log_success", "SYSLOG_IDENTIFIER": "systemd", "_HOSTNAME": "myhostname", "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope", "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", "SYSLOG_FACILITY": "3", "_SYSTEMD_SLICE": "user-1000.slice", "_AUDIT_SESSION": "286", "CODE_FILE": "../src/core/unit.c", "_SYSTEMD_USER_UNIT": "init.scope", "_COMM": "systemd", "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", "CODE_LINE": "5487", "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", "PRIORITY": "6", "_GID": "1000", "__REALTIME_TIMESTAMP": "1587047866229555", "_SYSTEMD_UNIT": "user@1000.service", "_SYSTEMD_USER_SLICE": "-.slice", "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", "__MONOTONIC_TIMESTAMP": "685540311557", "_SYSTEMD_OWNER_UID": "1000" } + response := `{ "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", "_TRANSPORT": "journal", "_UID": "1000", "_EXE": "/usr/lib/systemd/systemd", "_AUDIT_LOGINUID": "1000", "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", "_PID": "13894", "_CMDLINE": "/lib/systemd/systemd --user", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", "_SELINUX_CONTEXT": "unconfined\n", "CODE_FUNC": "unit_log_success", "SYSLOG_IDENTIFIER": "systemd", "_HOSTNAME": "myhostname", "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope", "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", "SYSLOG_FACILITY": "3", "_SYSTEMD_SLICE": "user-1000.slice", "_AUDIT_SESSION": "286", "CODE_FILE": "../src/core/unit.c", "_SYSTEMD_USER_UNIT": "init.scope", "_COMM": "systemd", "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", "CODE_LINE": "5487", "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", "PRIORITY": "6", "_GID": "1000", "__REALTIME_TIMESTAMP": "1587047866229555", "_SYSTEMD_UNIT": "user@1000.service", "_SYSTEMD_USER_SLICE": "-.slice", "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", "__MONOTONIC_TIMESTAMP": "685540311557", "_SYSTEMD_OWNER_UID": "1000", "BINARY": [116,101,115,116,13,10] } ` reader := bytes.NewReader([]byte(response)) return io.NopCloser(reader), nil @@ -114,6 +114,7 @@ func TestInputJournald(t *testing.T) { "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", "__MONOTONIC_TIMESTAMP": "685540311557", "_SYSTEMD_OWNER_UID": "1000", + "BINARY": "test\r\n", } select {