Skip to content

Commit

Permalink
Merge branch 'main' into feat/macos-ul
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr authored Dec 10, 2024
2 parents 77c2646 + f86a5f0 commit 9b51965
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 79 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure Elasticsearch output can always recover from network errors {pull}40794[40794]
- Add `translate_ldap_attribute` processor. {pull}41472[41472]
- Remove unnecessary debug logs during idle connection teardown {issue}40824[40824]
- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794]
- Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636]
- Prevent panic if libbeat processors are loaded more than once. {issue}41475[41475] {pull}41857[51857]
- Allow network condition to handle field values that are arrays of IP addresses. {pull}41918[41918]
- Fix a bug where log files are rotated on startup when interval is configured and rotateonstartup is disabled {issue}41894[41894] {pull}41895[41895]

*Auditbeat*
Expand Down
14 changes: 11 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
waitEvents := newSignalWait()

// count active events for waiting on shutdown
var reg *monitoring.Registry

if b.Info.Monitoring.Namespace != nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats")
if reg == nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats")
}
}
wgEvents := &eventCounter{
count: monitoring.NewInt(nil, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(nil, "filebeat.events.added"),
done: monitoring.NewUint(nil, "filebeat.events.done"),
count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(reg, "filebeat.events.added"),
done: monitoring.NewUint(reg, "filebeat.events.done"),
}
finishedLogger := newFinishedLogger(wgEvents)

Expand Down
9 changes: 3 additions & 6 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch
EphemeralID: eid,
FirstStart: time.Now(),
StartTime: time.Now(),
Monitoring: struct {
DefaultUsername string
}{
DefaultUsername: "test",
},
}
info.Monitoring.DefaultUsername = "test"

sched = scheduler.Create(
1,
Expand Down Expand Up @@ -246,7 +242,8 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {

return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, nil
},
Stats: plugin.NewPluginCountersRecorder("test", reg)},
Stats: plugin.NewPluginCountersRecorder("test", reg),
},
built,
closed
}
Expand Down
7 changes: 5 additions & 2 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/consumer"

"github.com/elastic/elastic-agent-libs/monitoring"
)

// Info stores a beats instance meta data.
Expand All @@ -41,9 +43,10 @@ type Info struct {

// Monitoring-related fields
Monitoring struct {
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance
}
LogConsumer consumer.Logs //otel log consumer
LogConsumer consumer.Logs // otel log consumer

}

Expand Down
36 changes: 25 additions & 11 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
config.OverwriteConfigOpts(configOpts(store))
}

b.Beat.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String())

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
Expand Down Expand Up @@ -469,11 +471,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
return nil, fmt.Errorf("error creating processors: %w", err)
}

reg := monitoring.Default.GetRegistry(b.Info.Name)
if reg == nil {
reg = monitoring.Default.NewRegistry(b.Info.Name)
}

// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
Expand All @@ -485,12 +482,14 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
}
}

tel := reg.GetRegistry("state")
uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry()

tel := uniq_reg.GetRegistry("state")
if tel == nil {
tel = reg.NewRegistry("state")
tel = uniq_reg.NewRegistry("state")
}
monitors := pipeline.Monitors{
Metrics: reg,
Metrics: uniq_reg,
Telemetry: tel,
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
Expand All @@ -510,7 +509,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
b.Publisher = publisher

return b, nil

}

// InitWithSettings does initialization of things common to all actions (read confs, flags)
Expand Down Expand Up @@ -831,11 +829,27 @@ func (b *Beat) RegisterHostname(useFQDN bool) {
hostname := b.Info.FQDNAwareHostname(useFQDN)

// info.hostname
infoRegistry := monitoring.GetNamespace("info").GetRegistry()
var infoRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info")
if infoRegistry == nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info")
}
} else {
infoRegistry = monitoring.GetNamespace("info").GetRegistry()
}
monitoring.NewString(infoRegistry, "hostname").Set(hostname)

// state.host
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
var stateRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state")
if stateRegistry == nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state")
}
} else {
stateRegistry = monitoring.GetNamespace("state").GetRegistry()
}
monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report)
}

Expand Down
23 changes: 23 additions & 0 deletions libbeat/conditions/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@ var httpResponseTestEvent = &beat.Event{
},
}

var httpResponseEventIPList = &beat.Event{
Timestamp: time.Now(),
Fields: mapstr.M{
"@timestamp": "2024-12-05T09:51:23.642Z",
"ecs": mapstr.M{
"version": "8.11.0",
},
"host": mapstr.M{
"hostname": "testhost",
"os": mapstr.M{
"type": "linux",
"family": "debian",
"version": "11 (bullseye)",
"platform": "debian",
},
"ip": []string{
"10.1.0.55",
"fe80::4001:aff:fe9a:55",
},
},
},
}

func testConfig(t *testing.T, expected bool, event *beat.Event, config *Config) {
t.Helper()
logp.TestingSetup()
Expand Down
61 changes: 36 additions & 25 deletions libbeat/conditions/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package conditions
import (
"fmt"
"net"
"slices"
"strings"

"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -94,31 +95,31 @@ func (m multiNetworkMatcher) String() string {
return strings.Join(names, " OR ")
}

func makeMatcher(network string) (networkMatcher, error) {
m := singleNetworkMatcher{name: network, netContainsFunc: namedNetworks[network]}
if m.netContainsFunc == nil {
subnet, err := parseCIDR(network)
if err != nil {
return nil, err
}
m.netContainsFunc = subnet.Contains
}
return m, nil
}

func invalidTypeError(field string, value interface{}) error {
return fmt.Errorf("network condition attempted to set "+
"'%v' -> '%v' and encountered unexpected type '%T', only "+
"strings or []strings are allowed", field, value, value)
}

// NewNetworkCondition builds a new Network using the given configuration.
func NewNetworkCondition(fields map[string]interface{}) (*Network, error) {
cond := &Network{
fields: map[string]networkMatcher{},
log: logp.NewLogger(logName),
}

makeMatcher := func(network string) (networkMatcher, error) {
m := singleNetworkMatcher{name: network, netContainsFunc: namedNetworks[network]}
if m.netContainsFunc == nil {
subnet, err := parseCIDR(network)
if err != nil {
return nil, err
}
m.netContainsFunc = subnet.Contains
}
return m, nil
}

invalidTypeError := func(field string, value interface{}) error {
return fmt.Errorf("network condition attempted to set "+
"'%v' -> '%v' and encountered unexpected type '%T', only "+
"strings or []strings are allowed", field, value, value)
}

for field, value := range mapstr.M(fields).Flatten() {
switch v := value.(type) {
case string:
Expand Down Expand Up @@ -157,15 +158,17 @@ func (c *Network) Check(event ValuesMap) bool {
return false
}

ip := extractIP(value)
if ip == nil {
ipList := extractIP(value)
if len(ipList) == 0 {
c.log.Debugf("Invalid IP address in field=%v for network condition", field)
return false
}

if !network.Contains(ip) {
// match on an "any" basis when we find multiple IPs in the event;
// if the network matcher returns true for any seen IP, consider it a match
if !slices.ContainsFunc(ipList, network.Contains) {
return false
}

}

return true
Expand Down Expand Up @@ -202,12 +205,20 @@ func parseCIDR(value string) (*net.IPNet, error) {

// extractIP return an IP address if unk is an IP address string or a net.IP.
// Otherwise it returns nil.
func extractIP(unk interface{}) net.IP {
func extractIP(unk interface{}) []net.IP {
switch v := unk.(type) {
case string:
return net.ParseIP(v)
case net.IP:
return []net.IP{net.ParseIP(v)}
case []net.IP:
return v
case net.IP:
return []net.IP{v}
case []string:
parsed := make([]net.IP, len(v))
for i, rawIP := range v {
parsed[i] = net.ParseIP(rawIP)
}
return parsed
default:
return nil
}
Expand Down
52 changes: 52 additions & 0 deletions libbeat/conditions/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ network:

testYAMLConfig(t, true, evt, yaml)
})

t.Run("IP list", func(t *testing.T) {
const yaml = `
network:
ip:
client: [loopback]
server: [loopback]
host: 10.10.0.0/8
`

evt := &beat.Event{Fields: mapstr.M{
"ip": mapstr.M{
"client": "127.0.0.1",
"server": "127.0.0.1",
"host": []string{"10.10.0.83", "fe80::4001:aff:fe9a:53"},
},
}}

testYAMLConfig(t, true, evt, yaml)
})
}

func TestNetworkCreate(t *testing.T) {
Expand Down Expand Up @@ -166,6 +186,22 @@ func TestNetworkCheck(t *testing.T) {
})
})

t.Run("multiple IPs field single match", func(t *testing.T) {
testConfig(t, true, httpResponseEventIPList, &Config{
Network: map[string]interface{}{
"host.ip": "10.1.0.0/24",
},
})
})

t.Run("multiple IPs field negative match", func(t *testing.T) {
testConfig(t, false, httpResponseEventIPList, &Config{
Network: map[string]interface{}{
"host.ip": "127.0.0.0/24",
},
})
})

// Multiple conditions are treated as an implicit AND.
t.Run("multiple fields negative match", func(t *testing.T) {
testConfig(t, false, httpResponseTestEvent, &Config{
Expand All @@ -191,6 +227,22 @@ func TestNetworkCheck(t *testing.T) {
},
})
})

t.Run("multiple values multiple IPs match", func(t *testing.T) {
testConfig(t, true, httpResponseEventIPList, &Config{
Network: map[string]interface{}{
"host.ip": []interface{}{"10.1.0.0/24", "127.0.0.0/24"},
},
})
})

t.Run("multiple values multiple IPs no match", func(t *testing.T) {
testConfig(t, false, httpResponseEventIPList, &Config{
Network: map[string]interface{}{
"host.ip": []interface{}{"12.1.0.0/24", "127.0.0.0/24"},
},
})
})
}

func TestNetworkPrivate(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,15 @@ range:
[[condition-network]]
===== `network`

The `network` condition checks if the field is in a certain IP network range.
Both IPv4 and IPv6 addresses are supported. The network range may be specified
using CIDR notation, like "192.0.2.0/24" or "2001:db8::/32", or by using one of
these named ranges:
The `network` condition checks whether a field's value falls within a specified
IP network range. If multiple fields are provided, each field value must match
its corresponding network range. You can specify multiple network ranges for a
single field, and a match occurs if any one of the ranges matches. If the field
value is an array of IPs, it will match if any of the IPs fall within any of the
given ranges. Both IPv4 and IPv6 addresses are supported.

The network range may be specified using CIDR notation, like "192.0.2.0/24" or
"2001:db8::/32", or by using one of these named ranges:

- `loopback` - Matches loopback addresses in the range of `127.0.0.0/8` or
`::1/128`.
Expand Down
Loading

0 comments on commit 9b51965

Please sign in to comment.