Skip to content

Commit

Permalink
[auditbeat] Update docs to promote add_session_metadata processor to …
Browse files Browse the repository at this point in the history
…GA (elastic#41295)

The add_session_metadata processor will be promoted to GA in 8.16, this updates the documentation to reflect that.

This also has some other documentation improvements; more godoc comments on functions, and rename SyncDB to Sync to reflect it doesn't sync a DB in all providers.
  • Loading branch information
mjwolf authored Oct 18, 2024
1 parent 9e6a942 commit 7be47da
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"strconv"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/processdb"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs"
Expand Down Expand Up @@ -47,7 +46,6 @@ type addSessionMetadata struct {
}

func New(cfg *cfg.C) (beat.Processor, error) {
cfgwarn.Beta("add_session_metadata processor is a beta feature.")
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
Expand Down Expand Up @@ -129,7 +127,7 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error
}

err = p.provider.SyncDB(ev, pid)
err = p.provider.Sync(ev, pid)
if err != nil {
return ev, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
[[add-session-metadata]]
=== Add session metadata (Beta)
=== Add session metadata
++++
<titleabbrev>add_session_metadata</titleabbrev>
++++

beta::[]

The `add_session_metadata` processor enriches process events with additional
information that users can see using the {security-guide}/session-view.html[Session View] tool in the
{elastic-sec} platform.
Expand Down Expand Up @@ -41,9 +39,9 @@ The `add_session_metadata` processor operates using various backend options.

* `auto` is the recommended setting.
It attempts to use `kernel_tracing` first, falling back to `procfs` if necessary, ensuring compatibility even on systems without `kernel_tracing` support.
* `kernel_tracing` collects process information with eBPF or kprobes.
This backend will prefer to use eBPF, if eBPF is not supported kprobes will be used. eBPF requires a system with Linux kernel 5.10.16 or above, kernel support for eBPF enabled, and auditbeat running as superuser.
Kprobe support required Linux kernel 3.10.0 or above, and auditbeat running as a superuser.
* `kernel_tracing` gathers information about processes using either eBPF or kprobes.
It will use eBPF if available, but if not, it will fall back to kprobes. eBPF requires a system with kernel support for eBPF enabled, support for eBPF ring buffer, and auditbeat running as superuser.
Kprobe support requires Linux kernel 3.10.0 or above, and auditbeat running as a superuser.
* `procfs` collects process information with the proc filesystem.
This is compatible with older systems that may not support ebpf.
To gather complete process info, auditbeat requires permissions to read all process data in procfs; for example, run as a superuser or have the `SYS_PTRACE` capability.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,17 @@ var (
pidNsInode uint64
)

// readBootID returns the boot ID of the Linux system from "/proc/sys/kernel/random/boot_id"
func readBootID() (string, error) {
bootID, err := os.ReadFile("/proc/sys/kernel/random/boot_id")
if err != nil {
return "", fmt.Errorf("could not read /proc/sys/kernel/random/boot_id, process entity IDs will not be correct: %w", err)
return "", fmt.Errorf("could not read /proc/sys/kernel/random/boot_id: %w", err)
}

return strings.TrimRight(string(bootID), "\n"), nil
}

// readPIDNsInode returns the PID namespace inode that auditbeat is running in from "/proc/self/ns/pid"
func readPIDNsInode() (uint64, error) {
var ret uint64

Expand All @@ -95,6 +97,7 @@ func readPIDNsInode() (uint64, error) {
return ret, nil
}

// NewProvider returns a new instance of kerneltracingprovider
func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, error) {
attr := quark.DefaultQueueAttr()
attr.Flags = quark.QQ_ALL_BACKENDS | quark.QQ_ENTRY_LEADER | quark.QQ_NO_SNAPSHOT
Expand Down Expand Up @@ -154,42 +157,31 @@ const (
resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset
)

func (p *prvdr) SyncDB(_ *beat.Event, pid uint32) error {
// Sync ensures that the specified pid is present in the internal cache, to ensure the processor is capable of enriching the process.
// The function waits up to a maximum limit (maxWaitLimit) for the pid to appear in the cache using an exponential delay strategy.
// If the pid is not found within the time limit, then an error is returned.
//
// The function also maintains a moving window of time for tracking delays, and applies a backoff strategy if the combined wait time
// exceeds a certain limit (combinedWaitLimit). This is done so that in the case where there are multiple delays, the cumulative delay
// does not exceed a reasonable threshold that would delay all other events processed by auditbeat. When in the backoff state, enrichment
// will proceed without waiting for the process data to exist in the cache, likely resulting in missing enrichment data.
func (p *prvdr) Sync(_ *beat.Event, pid uint32) error {
p.qqMtx.Lock()
defer p.qqMtx.Unlock()

// Use qq.Lookup, not lookupLocked, in this function. Mutex is locked for entire function

// If pid is already in qq, return immediately
if _, found := p.qq.Lookup(int(pid)); found {
return nil
}

now := time.Now()
start := time.Now()

p.handleBackoff(start)
if p.inBackoff {
if now.Sub(p.backoffStart) > backoffDuration {
p.logger.Warnw("ended backoff, skipped processes", "backoffSkipped", p.backoffSkipped)
p.inBackoff = false
p.combinedWait = 0 * time.Millisecond
} else {
p.backoffSkipped += 1
return nil
}
} else {
if p.combinedWait > combinedWaitLimit {
p.logger.Warn("starting backoff")
p.inBackoff = true
p.backoffStart = now
p.backoffSkipped = 0
return nil
}
// maintain a moving window of time for the delays we track
if now.Sub(p.since) > resetDuration {
p.since = now
p.combinedWait = 0 * time.Millisecond
}
return nil
}

start := now
// Wait until either the process exists within the cache or the maxWaitLimit is exceeded, with an exponential delay
nextWait := 5 * time.Millisecond
for {
waited := time.Since(start)
Expand All @@ -211,6 +203,38 @@ func (p *prvdr) SyncDB(_ *beat.Event, pid uint32) error {
}
}

// handleBackoff handles backoff logic of `Sync`
// If the combinedWait time exceeds the combinedWaitLimit duration, the provider will go into backoff state until the backoffDuration is exceeded.
// If in a backoff period, it will track the number of skipped processes, and then log the number when exiting backoff.
//
// If there have been no backoffs within the resetDuration, the combinedWait duration is reset to zero, to keep a moving window in which delays are tracked.
func (p *prvdr) handleBackoff(now time.Time) {
if p.inBackoff {
if now.Sub(p.backoffStart) > backoffDuration {
p.logger.Warnw("ended backoff, skipped processes", "backoffSkipped", p.backoffSkipped)
p.inBackoff = false
p.combinedWait = 0 * time.Millisecond
} else {
p.backoffSkipped += 1
return
}
} else {
if p.combinedWait > combinedWaitLimit {
p.logger.Warn("starting backoff")
p.inBackoff = true
p.backoffStart = now
p.backoffSkipped = 0
return
}
if now.Sub(p.since) > resetDuration {
p.since = now
p.combinedWait = 0 * time.Millisecond
}
}
}

// GetProcess returns a reference to Process struct that contains all known information for the
// process, and its ancestors (parent, process group leader, session leader, and entry leader).
func (p *prvdr) GetProcess(pid uint32) (*types.Process, error) {
proc, found := p.lookupLocked(pid)
if !found {
Expand Down Expand Up @@ -271,6 +295,7 @@ func (p prvdr) lookupLocked(pid uint32) (quark.Process, bool) {
return p.qq.Lookup(int(pid))
}

// fillParent populates the parent process fields with the attributes of the process with PID `ppid`
func (p prvdr) fillParent(process *types.Process, ppid uint32) {
proc, found := p.lookupLocked(ppid)
if !found {
Expand Down Expand Up @@ -304,6 +329,7 @@ func (p prvdr) fillParent(process *types.Process, ppid uint32) {
process.Parent.EntityID = calculateEntityIDv1(ppid, *process.Start)
}

// fillGroupLeader populates the process group leader fields with the attributes of the process with PID `pgid`
func (p prvdr) fillGroupLeader(process *types.Process, pgid uint32) {
proc, found := p.lookupLocked(pgid)
if !found {
Expand Down Expand Up @@ -338,6 +364,7 @@ func (p prvdr) fillGroupLeader(process *types.Process, pgid uint32) {
process.GroupLeader.EntityID = calculateEntityIDv1(pgid, *process.GroupLeader.Start)
}

// fillSessionLeader populates the session leader fields with the attributes of the process with PID `sid`
func (p prvdr) fillSessionLeader(process *types.Process, sid uint32) {
proc, found := p.lookupLocked(sid)
if !found {
Expand Down Expand Up @@ -372,6 +399,7 @@ func (p prvdr) fillSessionLeader(process *types.Process, sid uint32) {
process.SessionLeader.EntityID = calculateEntityIDv1(sid, *process.SessionLeader.Start)
}

// fillEntryLeader populates the entry leader fields with the attributes of the process with PID `elid`
func (p prvdr) fillEntryLeader(process *types.Process, elid uint32) {
proc, found := p.lookupLocked(elid)
if !found {
Expand Down Expand Up @@ -406,6 +434,7 @@ func (p prvdr) fillEntryLeader(process *types.Process, elid uint32) {
process.EntryLeader.EntryMeta.Type = getEntryTypeName(proc.Proc.EntryLeaderType)
}

// setEntityID sets entityID for the process and its parent, group leader, session leader, entry leader if possible
func setEntityID(process *types.Process) {
if process.PID != 0 && process.Start != nil {
process.EntityID = calculateEntityIDv1(process.PID, *process.Start)
Expand All @@ -428,6 +457,7 @@ func setEntityID(process *types.Process) {
}
}

// setSameAsProcess sets if the process is the same as its group leader, session leader, entry leader
func setSameAsProcess(process *types.Process) {
if process.GroupLeader.PID != 0 && process.GroupLeader.Start != nil {
sameAsProcess := process.PID == process.GroupLeader.PID
Expand All @@ -445,10 +475,12 @@ func setSameAsProcess(process *types.Process) {
}
}

// interactiveFromTTY returns if this is an interactive tty device.
func interactiveFromTTY(tty types.TTYDev) bool {
return TTYUnknown != getTTYType(tty.Major, tty.Minor)
}

// getTTYType returns the type of a TTY device based on its major and minor numbers.
func getTTYType(major uint32, minor uint32) TTYType {
if major >= ptsMinMajor && major <= ptsMaxMajor {
return Pts
Expand All @@ -465,6 +497,8 @@ func getTTYType(major uint32, minor uint32) TTYType {
return TTYUnknown
}

// calculateEntityIDv1 calculates the entity ID for a process.
// This is a globally unique identifier for the process.
func calculateEntityIDv1(pid uint32, startTime time.Time) string {
return base64.StdEncoding.EncodeToString(
[]byte(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e
return prvdr{}, fmt.Errorf("build type not supported, cgo required")
}

func (p prvdr) SyncDB(event *beat.Event, pid uint32) error {
func (p prvdr) Sync(event *beat.Event, pid uint32) error {
return fmt.Errorf("build type not supported")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type prvdr struct {
pidField string
}

// NewProvider returns a new instance of procfsprovider.
func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, reader procfs.Reader, pidField string) (provider.Provider, error) {
return prvdr{
ctx: ctx,
Expand All @@ -40,12 +41,15 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea
}, nil
}

// GetProcess is not implemented in this provider.
// This provider adds to the processdb, and process information is retrieved from the DB, not directly from the provider
func (p prvdr) GetProcess(pid uint32) (*types.Process, error) {
return nil, fmt.Errorf("not implemented")
}

// SyncDB will update the process DB with process info from procfs or the event itself
func (p prvdr) SyncDB(ev *beat.Event, pid uint32) error {
// Sync updates the process information database using on the syscall event data and by scraping procfs.
// As process information will not be available in procfs after a process has exited, the provider is susceptible to missing information in short-lived events.
func (p prvdr) Sync(ev *beat.Event, pid uint32) error {
syscall, err := ev.GetValue(syscallField)
if err != nil {
return fmt.Errorf("event not supported, no syscall data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.SyncDB(&event, expected.PIDs.Tgid)
err = provider.Sync(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.SyncDB(&event, expected.PIDs.Tgid)
err = provider.Sync(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.SyncDB(&event, expected.PIDs.Tgid)
err = provider.Sync(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.SyncDB(&event, expected.PIDs.Tgid)
err = provider.Sync(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.SyncDB(&event, expected.PIDs.Tgid)
err = provider.Sync(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down
3 changes: 1 addition & 2 deletions x-pack/auditbeat/processors/sessionmd/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
)

// SyncDB should ensure the DB is in a state to handle the event before returning.
type Provider interface {
SyncDB(event *beat.Event, pid uint32) error
Sync(event *beat.Event, pid uint32) error
GetProcess(pid uint32) (*types.Process, error)
}

0 comments on commit 7be47da

Please sign in to comment.