Skip to content

Commit

Permalink
[filebeat][netflow]: fix template sharing (elastic#42079)
Browse files Browse the repository at this point in the history
Pass the share_templates configuration option into the NetflowV9Protocol
struct. The parameter was not being set, and therefore was always false so
it was not possible to use this option.

Added a test case to prevent future regressions.

Closes elastic#42080
  • Loading branch information
simioa authored Dec 19, 2024
1 parent 59bfb15 commit 323c69e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]
- Fix Netflow Template Sharing configuration handling. {pull}42080[42080]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (c *Config) SequenceResetEnabled() bool {
return c.detectReset
}

// ShareTemplatesEnabled returns if template sharing is enabled.
func (c *Config) ShareTemplatesEnabled() bool {
return c.sharedTemplates
}

// Fields returns the configured fields.
func (c *Config) Fields() fields.FieldDict {
if c.fields == nil {
Expand Down
15 changes: 8 additions & 7 deletions x-pack/filebeat/input/netflow/decoder/v9/v9.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ func New(config config.Config) protocol.Protocol {
func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *log.Logger) *NetflowV9Protocol {
ctx, cancel := context.WithCancel(context.Background())
pd := &NetflowV9Protocol{
ctx: ctx,
cancel: cancel,
decoder: decoder,
logger: logger,
Session: NewSessionMap(logger, config.ActiveSessionsMetric()),
timeout: config.ExpirationTimeout(),
detectReset: config.SequenceResetEnabled(),
ctx: ctx,
cancel: cancel,
decoder: decoder,
logger: logger,
Session: NewSessionMap(logger, config.ActiveSessionsMetric()),
timeout: config.ExpirationTimeout(),
detectReset: config.SequenceResetEnabled(),
shareTemplates: config.ShareTemplatesEnabled(),
}

if config.Cache() {
Expand Down
50 changes: 50 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/v9/v9_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,53 @@ func TestCustomFields(t *testing.T) {
assert.Contains(t, flows[0].Fields, "customField")
assert.Equal(t, flows[0].Fields["customField"], "Hello :)")
}

func TestSharedTemplates(t *testing.T) {
templateAddr := test.MakeAddress(t, "127.0.0.1:12345")
flowsAddr := test.MakeAddress(t, "127.0.0.2:21234")
templatePacket := []uint16{
// Header
// Version, Count, Uptime, Ts, SeqNo, Source
9, 1, 11, 11, 22, 22, 33, 33, 0, 1234,
// Set #1 (template)
0, 20, /*len of set*/
999, 3, /*len*/
1, 4, // Fields
2, 4,
3, 4,
}
flowsPacket := []uint16{
// Header
// Version, Count, Uptime, Ts, SeqNo, Source
9, 1, 11, 11, 22, 22, 33, 34, 0, 1234,
// Set #1 (template)
999, 16, /*len of set*/
1, 1,
2, 2,
3, 3,
}

t.Run("Template sharing enabled", func(t *testing.T) {
cfg := config.Defaults()
cfg.WithSharedTemplates(true)
proto := New(cfg)
flows, err := proto.OnPacket(test.MakePacket(templatePacket), templateAddr)
assert.NoError(t, err)
assert.Empty(t, flows)
flows, err = proto.OnPacket(test.MakePacket(flowsPacket), flowsAddr)
assert.NoError(t, err)
assert.Len(t, flows, 1)
})

t.Run("Template sharing disabled", func(t *testing.T) {
cfg := config.Defaults()
cfg.WithSharedTemplates(false)
proto := New(cfg)
flows, err := proto.OnPacket(test.MakePacket(templatePacket), templateAddr)
assert.NoError(t, err)
assert.Empty(t, flows)
flows, err = proto.OnPacket(test.MakePacket(flowsPacket), flowsAddr)
assert.NoError(t, err)
assert.Empty(t, flows)
})
}

0 comments on commit 323c69e

Please sign in to comment.