Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanza] Cleanup tcp input operator files #32077

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions pkg/stanza/operator/input/tcp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"

import (
"bufio"
"fmt"
"net"
"time"

"github.com/jpillora/backoff"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
operatorType = "tcp_input"

// minMaxLogSize is the minimal size which can be used for buffering
// TCP input
minMaxLogSize = 64 * 1024

// DefaultMaxLogSize is the max buffer sized used
// if MaxLogSize is not set
DefaultMaxLogSize = 1024 * 1024
)

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new TCP input config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfigWithID creates a new TCP input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
OneLogPerPacket: false,
Encoding: "utf-8",
},
}
}

// Config is the configuration of a tcp input operator.
type Config struct {
helper.InputConfig `mapstructure:",squash"`
BaseConfig `mapstructure:",squash"`
}

// BaseConfig is the detailed configuration of a tcp input operator.
type BaseConfig struct {
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
ListenAddress string `mapstructure:"listen_address,omitempty"`
TLS *configtls.ServerConfig `mapstructure:"tls,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
SplitFuncBuilder SplitFuncBuilder
}

type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultSplitFuncBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
return c.SplitConfig.Func(enc, true, int(c.MaxLogSize))
}

// Build will build a tcp input operator.
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

// If MaxLogSize not set, set sane default
if c.MaxLogSize == 0 {
c.MaxLogSize = DefaultMaxLogSize
}

if c.MaxLogSize < minMaxLogSize {
return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minMaxLogSize)
}

if c.ListenAddress == "" {
return nil, fmt.Errorf("missing required parameter 'listen_address'")
}

// validate the input address
if _, err = net.ResolveTCPAddr("tcp", c.ListenAddress); err != nil {
return nil, fmt.Errorf("failed to resolve listen_address: %w", err)
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, err
}

if c.SplitFuncBuilder == nil {
c.SplitFuncBuilder = c.defaultSplitFuncBuilder
}

// Build split func
splitFunc, err := c.SplitFuncBuilder(enc)
if err != nil {
return nil, err
}
splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func())

var resolver *helper.IPResolver
if c.AddAttributes {
resolver = helper.NewIPResolver()
}

tcpInput := &Input{
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
OneLogPerPacket: c.OneLogPerPacket,
encoding: enc,
splitFunc: splitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
resolver: resolver,
}

if c.TLS != nil {
tcpInput.tls, err = c.TLS.LoadTLSConfig()
if err != nil {
return nil, err
}
}

return tcpInput, nil
}
224 changes: 224 additions & 0 deletions pkg/stanza/operator/input/tcp/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"

"github.com/jpillora/backoff"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Input is an operator that listens for log entries over tcp.
type Input struct {
helper.InputOperator
address string
MaxLogSize int
addAttributes bool
OneLogPerPacket bool

listener net.Listener
cancel context.CancelFunc
wg sync.WaitGroup
tls *tls.Config
backoff backoff.Backoff

encoding encoding.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver
}

// Start will start listening for log entries over tcp.
func (i *Input) Start(_ operator.Persister) error {
if err := i.configureListener(); err != nil {
return fmt.Errorf("failed to listen on interface: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
i.cancel = cancel
i.goListen(ctx)
return nil
}

func (i *Input) configureListener() error {
if i.tls == nil {
listener, err := net.Listen("tcp", i.address)
if err != nil {
return fmt.Errorf("failed to configure tcp listener: %w", err)
}
i.listener = listener
return nil
}

i.tls.Time = time.Now
i.tls.Rand = rand.Reader

listener, err := tls.Listen("tcp", i.address, i.tls)
if err != nil {
return fmt.Errorf("failed to configure tls listener: %w", err)
}

i.listener = listener
return nil
}

// goListenn will listen for tcp connections.
func (i *Input) goListen(ctx context.Context) {
i.wg.Add(1)

go func() {
defer i.wg.Done()

for {
conn, err := i.listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return
default:
i.Debugw("Listener accept error", zap.Error(err))
time.Sleep(i.backoff.Duration())
continue
}
}
i.backoff.Reset()

i.Debugf("Received connection: %s", conn.RemoteAddr().String())
subctx, cancel := context.WithCancel(ctx)
i.goHandleClose(subctx, conn)
i.goHandleMessages(subctx, conn, cancel)
}
}()
}

// goHandleClose will wait for the context to finish before closing a connection.
func (i *Input) goHandleClose(ctx context.Context, conn net.Conn) {
i.wg.Add(1)

go func() {
defer i.wg.Done()
<-ctx.Done()
i.Debugf("Closing connection: %s", conn.RemoteAddr().String())
if err := conn.Close(); err != nil {
i.Errorf("Failed to close connection: %s", err)
}
}()
}

// goHandleMessages will handles messages from a tcp connection.
func (i *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel context.CancelFunc) {
i.wg.Add(1)

go func() {
defer i.wg.Done()
defer cancel()

dec := decode.New(i.encoding)
if i.OneLogPerPacket {
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
i.Errorw("IO copy net connection buffer error", zap.Error(err))
}
log := truncateMaxLog(buf.Bytes(), i.MaxLogSize)
i.handleMessage(ctx, conn, dec, log)
return
}

buf := make([]byte, 0, i.MaxLogSize)

scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, i.MaxLogSize)

scanner.Split(i.splitFunc)

for scanner.Scan() {
i.handleMessage(ctx, conn, dec, scanner.Bytes())
}

if err := scanner.Err(); err != nil {
i.Errorw("Scanner error", zap.Error(err))
}
}()
}

func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *decode.Decoder, log []byte) {
decoded, err := dec.Decode(log)
if err != nil {
i.Errorw("Failed to decode data", zap.Error(err))
return
}

entry, err := i.NewEntry(string(decoded))
if err != nil {
i.Errorw("Failed to create entry", zap.Error(err))
return
}

if i.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", i.resolver.GetHostFromIP(ip))
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", i.resolver.GetHostFromIP(ip))
}
}

i.Write(ctx, entry)
}

func truncateMaxLog(data []byte, maxLogSize int) (token []byte) {
if len(data) >= maxLogSize {
return data[:maxLogSize]
}

if len(data) == 0 {
return nil
}

return data
}

// Stop will stop listening for log entries over TCP.
func (i *Input) Stop() error {
if i.cancel == nil {
return nil
}
i.cancel()

if i.listener != nil {
if err := i.listener.Close(); err != nil {
i.Errorf("failed to close TCP connection: %s", err)
}
}

i.wg.Wait()
if i.resolver != nil {
i.resolver.Stop()
}
return nil
}
Loading