From 4f65d8c3f31ac10a5f3667a0ddc3f2ac083be100 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 15 May 2024 16:21:27 -0700 Subject: [PATCH] [add_session_metadata processor] Enrich events with user and group names (#39537) Update the add_session_metadata processor to add user and group names to enriched events, rather than just IDs, as it was doing previously. This also renames the UpdateDB function to SyncDB. Previously this function was confusing, because it didn't always update the DB. With ebpf, the DB update is done separately. By renaming and updating the func comment, it should be more clear that the function should synchronize the DB so it's ready for enriching events, either by waiting until the DB is updated, or doing the synchronization itself, as it does with procfs backend. --- CHANGELOG.next.asciidoc | 1 + .../sessionmd/add_session_metadata.go | 2 +- .../sessionmd/add_session_metadata_test.go | 76 ++++++++++++------- .../processors/sessionmd/processdb/db.go | 40 ++++++++++ .../processors/sessionmd/processdb/names.go | 29 +++++++ .../provider/ebpf_provider/ebpf_provider.go | 8 +- .../procfs_provider/procfs_provider.go | 4 +- .../procfs_provider/procfs_provider_test.go | 10 +-- .../processors/sessionmd/provider/provider.go | 3 +- 9 files changed, 134 insertions(+), 39 deletions(-) create mode 100644 x-pack/auditbeat/processors/sessionmd/processdb/names.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2b870c03f990..b3b966364bbb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add procfs backend to the `add_session_metadata` processor. {pull}38799[38799] - Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776] - Reduce data size for add_session_metadata processor by removing unneeded fields {pull}39500[39500] +- Enrich process events with user and group names, with add_session_metadata processor {pull}39537[39537] *Auditbeat* diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index 766e9623b9ea..4fa86c25d029 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -113,7 +113,7 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error } - err = p.provider.UpdateDB(ev, pid) + err = p.provider.SyncDB(ev, pid) if err != nil { return ev, err } diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go index 602f80f58367..95892482f80e 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go @@ -43,6 +43,14 @@ var ( Pgid: uint32(100), Sid: uint32(40), }, + Creds: types.CredInfo{ + Ruid: 0, + Euid: 0, + Suid: 0, + Rgid: 0, + Egid: 0, + Sgid: 0, + }, CWD: "/", Filename: "/bin/ls", }, @@ -78,12 +86,24 @@ var ( "pid": uint32(100), "parent": mapstr.M{ "pid": uint32(50), + "user": mapstr.M{ + "id": "0", + "name": "root", + }, }, "session_leader": mapstr.M{ "pid": uint32(40), + "user": mapstr.M{ + "id": "0", + "name": "root", + }, }, "group_leader": mapstr.M{ "pid": uint32(100), + "user": mapstr.M{ + "id": "0", + "name": "root", + }, }, }, }, @@ -318,33 +338,35 @@ var ( func TestEnrich(t *testing.T) { for _, tt := range enrichTests { - reader := procfs.NewMockReader() - db, err := processdb.NewDB(reader, *logger) - require.Nil(t, err) + t.Run(tt.testName, func(t *testing.T) { + reader := procfs.NewMockReader() + db, err := processdb.NewDB(reader, *logger) + require.Nil(t, err) - for _, ev := range tt.mockProcesses { - db.InsertExec(ev) - } - s := addSessionMetadata{ - logger: logger, - db: db, - config: tt.config, - } + for _, ev := range tt.mockProcesses { + db.InsertExec(ev) + } + s := addSessionMetadata{ + logger: logger, + db: db, + config: tt.config, + } - // avoid taking address of loop variable - i := tt.input - actual, err := s.enrich(&i) - if tt.expect_error { - require.Error(t, err, "%s: error unexpectedly nil", tt.testName) - } else { - require.Nil(t, err, "%s: enrich error: %w", tt.testName, err) - require.NotNil(t, actual, "%s: returned nil event", tt.testName) + // avoid taking address of loop variable + i := tt.input + actual, err := s.enrich(&i) + if tt.expect_error { + require.Error(t, err, "%s: error unexpectedly nil", tt.testName) + } else { + require.Nil(t, err, "%s: enrich error: %w", tt.testName, err) + require.NotNil(t, actual, "%s: returned nil event", tt.testName) - //Validate output - if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" { - t.Errorf("field mismatch:\n%s", diff) + //Validate output + if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" { + t.Errorf("field mismatch:\n%s", diff) + } } - } + }) } } @@ -364,8 +386,10 @@ func ignoreMissingFrom(m mapstr.M) cmp.Option { // Note: This validates test code only func TestFilter(t *testing.T) { for _, tt := range filterTests { - if eq := cmp.Equal(tt.mx, tt.my, ignoreMissingFrom(tt.mx)); eq != tt.expected { - t.Errorf("%s: unexpected comparator result", tt.testName) - } + t.Run(tt.testName, func(t *testing.T) { + if eq := cmp.Equal(tt.mx, tt.my, ignoreMissingFrom(tt.mx)); eq != tt.expected { + t.Errorf("%s: unexpected comparator result", tt.testName) + } + }) } } diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index b8c624abe00a..28c848ddfdbc 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -447,7 +447,15 @@ func fullProcessFromDBProcess(p Process) types.Process { euid := p.Creds.Euid egid := p.Creds.Egid ret.User.ID = strconv.FormatUint(uint64(euid), 10) + username, ok := getUserName(ret.User.ID) + if ok { + ret.User.Name = username + } ret.Group.ID = strconv.FormatUint(uint64(egid), 10) + groupname, ok := getGroupName(ret.Group.ID) + if ok { + ret.Group.Name = groupname + } ret.Thread.Capabilities.Permitted, _ = capabilities.FromUint64(p.Creds.CapPermitted) ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective) ret.TTY.CharDevice.Major = p.CTTY.Major @@ -471,7 +479,15 @@ func fillParent(process *types.Process, parent Process) { process.Parent.WorkingDirectory = parent.Cwd process.Parent.Interactive = &interactive process.Parent.User.ID = strconv.FormatUint(uint64(euid), 10) + username, ok := getUserName(process.Parent.User.ID) + if ok { + process.Parent.User.Name = username + } process.Parent.Group.ID = strconv.FormatUint(uint64(egid), 10) + groupname, ok := getGroupName(process.Parent.Group.ID) + if ok { + process.Parent.Group.Name = groupname + } } func fillGroupLeader(process *types.Process, groupLeader Process) { @@ -488,7 +504,15 @@ func fillGroupLeader(process *types.Process, groupLeader Process) { process.GroupLeader.WorkingDirectory = groupLeader.Cwd process.GroupLeader.Interactive = &interactive process.GroupLeader.User.ID = strconv.FormatUint(uint64(euid), 10) + username, ok := getUserName(process.GroupLeader.User.ID) + if ok { + process.GroupLeader.User.Name = username + } process.GroupLeader.Group.ID = strconv.FormatUint(uint64(egid), 10) + groupname, ok := getGroupName(process.GroupLeader.Group.ID) + if ok { + process.GroupLeader.Group.Name = groupname + } } func fillSessionLeader(process *types.Process, sessionLeader Process) { @@ -505,7 +529,15 @@ func fillSessionLeader(process *types.Process, sessionLeader Process) { process.SessionLeader.WorkingDirectory = sessionLeader.Cwd process.SessionLeader.Interactive = &interactive process.SessionLeader.User.ID = strconv.FormatUint(uint64(euid), 10) + username, ok := getUserName(process.SessionLeader.User.ID) + if ok { + process.SessionLeader.User.Name = username + } process.SessionLeader.Group.ID = strconv.FormatUint(uint64(egid), 10) + groupname, ok := getGroupName(process.SessionLeader.Group.ID) + if ok { + process.SessionLeader.Group.Name = groupname + } } func fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Process) { @@ -522,7 +554,15 @@ func fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Pr process.EntryLeader.WorkingDirectory = entryLeader.Cwd process.EntryLeader.Interactive = &interactive process.EntryLeader.User.ID = strconv.FormatUint(uint64(euid), 10) + username, ok := getUserName(process.EntryLeader.User.ID) + if ok { + process.EntryLeader.User.Name = username + } process.EntryLeader.Group.ID = strconv.FormatUint(uint64(egid), 10) + groupname, ok := getGroupName(process.EntryLeader.Group.ID) + if ok { + process.EntryLeader.Group.Name = groupname + } process.EntryLeader.EntryMeta.Type = string(entryType) } diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/names.go b/x-pack/auditbeat/processors/sessionmd/processdb/names.go new file mode 100644 index 000000000000..6584e6e9986f --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/processdb/names.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package processdb + +import ( + "os/user" +) + +// getUserName will return the name associated with the user ID, if it exists +func getUserName(id string) (string, bool) { + user, err := user.LookupId(id) + if err != nil { + return "", false + } + return user.Username, true +} + +// getGroupName will return the name associated with the group ID, if it exists +func getGroupName(id string) (string, bool) { + group, err := user.LookupGroupId(id) + if err != nil { + return "", false + } + return group.Name, true +} diff --git a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go index f1b8bae0b671..8c08ae199a73 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go @@ -153,9 +153,9 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr } const ( - maxWaitLimit = 200 * time.Millisecond // Maximum time UpdateDB will wait for process - combinedWaitLimit = 2 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration - backoffDuration = 10 * time.Second // UpdateDB will stop waiting for processes for this time + maxWaitLimit = 200 * time.Millisecond // Maximum time SyncDB will wait for process + combinedWaitLimit = 2 * time.Second // Multiple SyncDB calls will wait up to this amount within resetDuration + backoffDuration = 10 * time.Second // SyncDB will stop waiting for processes for this time resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset ) @@ -176,7 +176,7 @@ var ( // If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during // which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up // waiting for these processes, at the cost of possibly not enriching some processes. -func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { +func (s prvdr) SyncDB(ev *beat.Event, pid uint32) error { if s.db.HasProcess(pid) { return nil } diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go index 6525b860b6d2..1578e89f915e 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go @@ -40,8 +40,8 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea }, nil } -// UpdateDB will update the process DB with process info from procfs or the event itself -func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { +// SyncDB will update the process DB with process info from procfs or the event itself +func (s prvdr) SyncDB(ev *beat.Event, pid uint32) error { syscall, err := ev.GetValue(syscallField) if err != nil { return fmt.Errorf("event not supported, no syscall data") diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go index c438efcfe1ae..455cb3c0433a 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event, expected.PIDs.Tgid) + err = provider.SyncDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event, expected.PIDs.Tgid) + err = provider.SyncDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event, expected.PIDs.Tgid) + err = provider.SyncDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event, expected.PIDs.Tgid) + err = provider.SyncDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event, expected.PIDs.Tgid) + err = provider.SyncDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) diff --git a/x-pack/auditbeat/processors/sessionmd/provider/provider.go b/x-pack/auditbeat/processors/sessionmd/provider/provider.go index 6452eb9e2bf7..e95da3ec2006 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/provider.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" ) +// SyncDB should ensure the DB is in a state to handle the event before returning. type Provider interface { - UpdateDB(*beat.Event, uint32) error + SyncDB(event *beat.Event, pid uint32) error }