From 56a17e0eaa8c6c9ca9f00b844f25d9731839bb69 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Tue, 27 Aug 2024 14:49:15 -0400 Subject: [PATCH] [filebeat] Add a configuration option for TCP/UDP network type (#40623) * Add a configuration option for TCP/UDP network type --- CHANGELOG-developer.next.asciidoc | 1 + .../inputs/input-common-tcp-options.asciidoc | 6 ++ .../inputs/input-common-udp-options.asciidoc | 6 ++ filebeat/inputsource/tcp/config.go | 20 ++++- filebeat/inputsource/tcp/config_test.go | 74 +++++++++++++++++++ filebeat/inputsource/tcp/server.go | 12 ++- filebeat/inputsource/tcp/server_test.go | 13 +++- filebeat/inputsource/udp/config.go | 21 ++++++ filebeat/inputsource/udp/config_test.go | 70 ++++++++++++++++++ filebeat/inputsource/udp/server.go | 12 ++- filebeat/inputsource/udp/server_test.go | 12 ++- 11 files changed, 239 insertions(+), 8 deletions(-) create mode 100644 filebeat/inputsource/tcp/config_test.go create mode 100644 filebeat/inputsource/udp/config_test.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index c26e94b76b96..7c53703c73fd 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -201,6 +201,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268] - Bump version of elastic/toutoumomoma to remove internal forks of stdlib debug packages. {pull}40325[40325] - Refactor x-pack/filebeat/input/websocket for generalisation. {pull}40308[40308] +- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623] ==== Deprecated diff --git a/filebeat/docs/inputs/input-common-tcp-options.asciidoc b/filebeat/docs/inputs/input-common-tcp-options.asciidoc index ad57d4893d50..f0d8262f8174 100644 --- a/filebeat/docs/inputs/input-common-tcp-options.asciidoc +++ b/filebeat/docs/inputs/input-common-tcp-options.asciidoc @@ -16,6 +16,12 @@ The maximum size of the message received over TCP. The default is `20MiB`. The host and TCP port to listen on for event streams. +[float] +[id="{beatname_lc}-input-{type}-tcp-network"] +==== `network` + +The network type. Acceptable values are: "tcp" (default), "tcp4", "tcp6" + [float] [id="{beatname_lc}-input-{type}-tcp-framing"] ==== `framing` diff --git a/filebeat/docs/inputs/input-common-udp-options.asciidoc b/filebeat/docs/inputs/input-common-udp-options.asciidoc index e4b2cae25e45..109db7c3e5c8 100644 --- a/filebeat/docs/inputs/input-common-udp-options.asciidoc +++ b/filebeat/docs/inputs/input-common-udp-options.asciidoc @@ -16,6 +16,12 @@ The maximum size of the message received over UDP. The default is `10KiB`. The host and UDP port to listen on for event streams. +[float] +[id="{beatname_lc}-input-{type}-udp-network"] +==== `network` + +The network type. Acceptable values are: "udp" (default), "udp4", "udp6" + [float] [id="{beatname_lc}-input-{type}-udp-read-buffer"] ==== `read_buffer` diff --git a/filebeat/inputsource/tcp/config.go b/filebeat/inputsource/tcp/config.go index 1e402fadf68d..db24b9dfb7f8 100644 --- a/filebeat/inputsource/tcp/config.go +++ b/filebeat/inputsource/tcp/config.go @@ -18,6 +18,7 @@ package tcp import ( + "errors" "fmt" "time" @@ -35,12 +36,29 @@ type Config struct { MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"` MaxConnections int `config:"max_connections"` TLS *tlscommon.ServerConfig `config:"ssl"` + Network string `config:"network"` } +const ( + networkTCP = "tcp" + networkTCP4 = "tcp4" + networkTCP6 = "tcp6" +) + +var ( + ErrInvalidNetwork = errors.New("invalid network value") + ErrMissingHostPort = errors.New("need to specify the host using the `host:port` syntax") +) + // Validate validates the Config option for the tcp input. func (c *Config) Validate() error { if len(c.Host) == 0 { - return fmt.Errorf("need to specify the host using the `host:port` syntax") + return ErrMissingHostPort + } + switch c.Network { + case "", networkTCP, networkTCP4, networkTCP6: + default: + return fmt.Errorf("%w: %s, expected: %v or %v or %v ", ErrInvalidNetwork, c.Network, networkTCP, networkTCP4, networkTCP6) } return nil } diff --git a/filebeat/inputsource/tcp/config_test.go b/filebeat/inputsource/tcp/config_test.go new file mode 100644 index 000000000000..e409cd71f44d --- /dev/null +++ b/filebeat/inputsource/tcp/config_test.go @@ -0,0 +1,74 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func TestValidate(t *testing.T) { + type testCfg struct { + name string + cfg Config + wantErr error + } + + tests := []testCfg{ + { + name: "ok", + cfg: Config{ + Host: "localhost:9000", + }, + }, + { + name: "nohost", + wantErr: ErrMissingHostPort, + }, + { + name: "invalidnetwork", + cfg: Config{ + Host: "localhost:9000", + Network: "foo", + }, + wantErr: ErrInvalidNetwork, + }, + } + + for _, network := range []string{networkTCP, networkTCP4, networkTCP6} { + tests = append(tests, testCfg{ + name: "network_" + network, + cfg: Config{ + Host: "localhost:9000", + Network: network, + }, + }) + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()) + if diff != "" { + t.Fatal(diff) + } + }) + } +} diff --git a/filebeat/inputsource/tcp/server.go b/filebeat/inputsource/tcp/server.go index 9e35088ebb14..9543c4aee2a4 100644 --- a/filebeat/inputsource/tcp/server.go +++ b/filebeat/inputsource/tcp/server.go @@ -67,14 +67,15 @@ func New( func (s *Server) createServer() (net.Listener, error) { var l net.Listener var err error + network := s.network() if s.tlsConfig != nil { t := s.tlsConfig.BuildServerConfig(s.config.Host) - l, err = tls.Listen("tcp", s.config.Host, t) + l, err = tls.Listen(network, s.config.Host, t) if err != nil { return nil, err } } else { - l, err = net.Listen("tcp", s.config.Host) + l, err = net.Listen(network, s.config.Host) if err != nil { return nil, err } @@ -85,3 +86,10 @@ func (s *Server) createServer() (net.Listener, error) { } return l, nil } + +func (s *Server) network() string { + if s.config.Network != "" { + return s.config.Network + } + return networkTCP +} diff --git a/filebeat/inputsource/tcp/server_test.go b/filebeat/inputsource/tcp/server_test.go index 3e12673507da..445a6dc414bb 100644 --- a/filebeat/inputsource/tcp/server_test.go +++ b/filebeat/inputsource/tcp/server_test.go @@ -54,6 +54,13 @@ func TestErrorOnEmptyLineDelimiter(t *testing.T) { } func TestReceiveEventsAndMetadata(t *testing.T) { + // Excluding tcp6 for now, since it fails in our CI + for _, network := range []string{networkTCP, networkTCP4} { + testReceiveEventsAndMetadata(t, network) + } +} + +func testReceiveEventsAndMetadata(t *testing.T, network string) { expectedMessages := generateMessages(5, 100) largeMessages := generateMessages(10, 4096) extraLargeMessages := generateMessages(2, 65*1024) @@ -220,6 +227,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) { if !assert.NoError(t, err) { return } + config.Network = network splitFunc, err := streaming.SplitFunc(test.framing, test.delimiter) if !assert.NoError(t, err) { @@ -237,7 +245,8 @@ func TestReceiveEventsAndMetadata(t *testing.T) { } defer server.Stop() - conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String()) + addr := server.Listener.Listener.Addr().String() + conn, err := net.Dial(network, addr) require.NoError(t, err) fmt.Fprint(conn, test.messageSent) conn.Close() @@ -294,8 +303,8 @@ func TestReceiveNewEventsConcurrently(t *testing.T) { for w := 0; w < workers; w++ { go func() { conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String()) - defer conn.Close() assert.NoError(t, err) + defer conn.Close() for _, sample := range samples { fmt.Fprintln(conn, sample) } diff --git a/filebeat/inputsource/udp/config.go b/filebeat/inputsource/udp/config.go index 6c8646384555..c0a31760bb15 100644 --- a/filebeat/inputsource/udp/config.go +++ b/filebeat/inputsource/udp/config.go @@ -18,6 +18,8 @@ package udp import ( + "errors" + "fmt" "time" "github.com/elastic/beats/v7/libbeat/common/cfgtype" @@ -29,4 +31,23 @@ type Config struct { MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"positive,nonzero"` Timeout time.Duration `config:"timeout"` ReadBuffer cfgtype.ByteSize `config:"read_buffer" validate:"positive"` + Network string `config:"network"` +} + +const ( + networkUDP = "udp" + networkUDP4 = "udp4" + networkUDP6 = "udp6" +) + +var ErrInvalidNetwork = errors.New("invalid network value") + +// Validate validates the Config option for the udp input. +func (c *Config) Validate() error { + switch c.Network { + case "", networkUDP, networkUDP4, networkUDP6: + default: + return fmt.Errorf("%w: %s, expected: %v or %v or %v", ErrInvalidNetwork, c.Network, networkUDP, networkUDP4, networkUDP6) + } + return nil } diff --git a/filebeat/inputsource/udp/config_test.go b/filebeat/inputsource/udp/config_test.go new file mode 100644 index 000000000000..28dbb310cf08 --- /dev/null +++ b/filebeat/inputsource/udp/config_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package udp + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func TestValidate(t *testing.T) { + type testCfg struct { + name string + cfg Config + wantErr error + } + + tests := []testCfg{ + { + name: "ok", + cfg: Config{ + Host: "localhost:8080", + }, + }, + { + name: "invalidnetwork", + cfg: Config{ + Host: "localhost:8080", + Network: "foo", + }, + wantErr: ErrInvalidNetwork, + }, + } + + for _, network := range []string{networkUDP, networkUDP4, networkUDP6} { + tests = append(tests, testCfg{ + name: "network_" + network, + cfg: Config{ + Host: "localhost:8080", + Network: network, + }, + }) + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()) + if diff != "" { + t.Fatal(diff) + } + }) + } +} diff --git a/filebeat/inputsource/udp/server.go b/filebeat/inputsource/udp/server.go index 9f9a16256704..75c74a1f3d01 100644 --- a/filebeat/inputsource/udp/server.go +++ b/filebeat/inputsource/udp/server.go @@ -53,11 +53,12 @@ func New(config *Config, callback inputsource.NetworkFunc) *Server { func (u *Server) createConn() (net.PacketConn, error) { var err error - udpAdddr, err := net.ResolveUDPAddr("udp", u.config.Host) + network := u.network() + udpAdddr, err := net.ResolveUDPAddr(network, u.config.Host) if err != nil { return nil, err } - listener, err := net.ListenUDP("udp", udpAdddr) + listener, err := net.ListenUDP(network, udpAdddr) if err != nil { return nil, err } @@ -71,3 +72,10 @@ func (u *Server) createConn() (net.PacketConn, error) { return listener, err } + +func (u *Server) network() string { + if u.config.Network != "" { + return u.config.Network + } + return networkUDP +} diff --git a/filebeat/inputsource/udp/server_test.go b/filebeat/inputsource/udp/server_test.go index efad27ea0b5a..879ee99daf8d 100644 --- a/filebeat/inputsource/udp/server_test.go +++ b/filebeat/inputsource/udp/server_test.go @@ -40,6 +40,15 @@ type info struct { } func TestReceiveEventFromUDP(t *testing.T) { + // Excluding udp6 for now, since it fails in our CI + for _, network := range []string{networkUDP, networkUDP4} { + t.Run(network, func(t *testing.T) { + testReceiveEventFromUDPWithNetwork(t, network) + }) + } +} + +func testReceiveEventFromUDPWithNetwork(t *testing.T, network string) { tests := []struct { name string message []byte @@ -64,6 +73,7 @@ func TestReceiveEventFromUDP(t *testing.T) { MaxMessageSize: maxMessageSize, Timeout: timeout, ReadBuffer: maxSocketSize, + Network: network, } fn := func(message []byte, metadata inputsource.NetworkMetadata) { ch <- info{message: message, mt: metadata} @@ -77,7 +87,7 @@ func TestReceiveEventFromUDP(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - conn, err := net.Dial("udp", s.localaddress) + conn, err := net.Dial(s.network(), s.localaddress) if !assert.NoError(t, err) { return }