Skip to content

Commit

Permalink
[chore][pkg/stanza] Cleanup tcp input operator files
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Apr 1, 2024
1 parent c5316c6 commit f903367
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 354 deletions.
149 changes: 149 additions & 0 deletions pkg/stanza/operator/input/tcp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"

import (
"bufio"
"fmt"
"net"
"time"

"github.com/jpillora/backoff"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
operatorType = "tcp_input"

// minMaxLogSize is the minimal size which can be used for buffering
// TCP input
minMaxLogSize = 64 * 1024

// DefaultMaxLogSize is the max buffer sized used
// if MaxLogSize is not set
DefaultMaxLogSize = 1024 * 1024
)

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new TCP input config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfigWithID creates a new TCP input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
OneLogPerPacket: false,
Encoding: "utf-8",
},
}
}

// Config is the configuration of a tcp input operator.
type Config struct {
helper.InputConfig `mapstructure:",squash"`
BaseConfig `mapstructure:",squash"`
}

// BaseConfig is the detailed configuration of a tcp input operator.
type BaseConfig struct {
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
ListenAddress string `mapstructure:"listen_address,omitempty"`
TLS *configtls.ServerConfig `mapstructure:"tls,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
SplitFuncBuilder SplitFuncBuilder
}

type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultSplitFuncBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
return c.SplitConfig.Func(enc, true, int(c.MaxLogSize))
}

// Build will build a tcp input operator.
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

// If MaxLogSize not set, set sane default
if c.MaxLogSize == 0 {
c.MaxLogSize = DefaultMaxLogSize
}

if c.MaxLogSize < minMaxLogSize {
return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minMaxLogSize)
}

if c.ListenAddress == "" {
return nil, fmt.Errorf("missing required parameter 'listen_address'")
}

// validate the input address
if _, err = net.ResolveTCPAddr("tcp", c.ListenAddress); err != nil {
return nil, fmt.Errorf("failed to resolve listen_address: %w", err)
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, err
}

if c.SplitFuncBuilder == nil {
c.SplitFuncBuilder = c.defaultSplitFuncBuilder
}

// Build split func
splitFunc, err := c.SplitFuncBuilder(enc)
if err != nil {
return nil, err
}
splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func())

var resolver *helper.IPResolver
if c.AddAttributes {
resolver = helper.NewIPResolver()
}

tcpInput := &Input{
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
OneLogPerPacket: c.OneLogPerPacket,
encoding: enc,
splitFunc: splitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
resolver: resolver,
}

if c.TLS != nil {
tcpInput.tls, err = c.TLS.LoadTLSConfig()
if err != nil {
return nil, err
}
}

return tcpInput, nil
}
224 changes: 224 additions & 0 deletions pkg/stanza/operator/input/tcp/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"

"github.com/jpillora/backoff"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Input is an operator that listens for log entries over tcp.
type Input struct {
helper.InputOperator
address string
MaxLogSize int
addAttributes bool
OneLogPerPacket bool

listener net.Listener
cancel context.CancelFunc
wg sync.WaitGroup
tls *tls.Config
backoff backoff.Backoff

encoding encoding.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver
}

// Start will start listening for log entries over tcp.
func (i *Input) Start(_ operator.Persister) error {
if err := i.configureListener(); err != nil {
return fmt.Errorf("failed to listen on interface: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
i.cancel = cancel
i.goListen(ctx)
return nil
}

func (i *Input) configureListener() error {
if i.tls == nil {
listener, err := net.Listen("tcp", i.address)
if err != nil {
return fmt.Errorf("failed to configure tcp listener: %w", err)
}
i.listener = listener
return nil
}

i.tls.Time = time.Now
i.tls.Rand = rand.Reader

listener, err := tls.Listen("tcp", i.address, i.tls)
if err != nil {
return fmt.Errorf("failed to configure tls listener: %w", err)
}

i.listener = listener
return nil
}

// goListenn will listen for tcp connections.
func (i *Input) goListen(ctx context.Context) {
i.wg.Add(1)

go func() {
defer i.wg.Done()

for {
conn, err := i.listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return
default:
i.Debugw("Listener accept error", zap.Error(err))
time.Sleep(i.backoff.Duration())
continue
}
}
i.backoff.Reset()

i.Debugf("Received connection: %s", conn.RemoteAddr().String())
subctx, cancel := context.WithCancel(ctx)
i.goHandleClose(subctx, conn)
i.goHandleMessages(subctx, conn, cancel)
}
}()
}

// goHandleClose will wait for the context to finish before closing a connection.
func (i *Input) goHandleClose(ctx context.Context, conn net.Conn) {
i.wg.Add(1)

go func() {
defer i.wg.Done()
<-ctx.Done()
i.Debugf("Closing connection: %s", conn.RemoteAddr().String())
if err := conn.Close(); err != nil {
i.Errorf("Failed to close connection: %s", err)
}
}()
}

// goHandleMessages will handles messages from a tcp connection.
func (i *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel context.CancelFunc) {
i.wg.Add(1)

go func() {
defer i.wg.Done()
defer cancel()

dec := decode.New(i.encoding)
if i.OneLogPerPacket {
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
i.Errorw("IO copy net connection buffer error", zap.Error(err))
}
log := truncateMaxLog(buf.Bytes(), i.MaxLogSize)
i.handleMessage(ctx, conn, dec, log)
return
}

buf := make([]byte, 0, i.MaxLogSize)

scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, i.MaxLogSize)

scanner.Split(i.splitFunc)

for scanner.Scan() {
i.handleMessage(ctx, conn, dec, scanner.Bytes())
}

if err := scanner.Err(); err != nil {
i.Errorw("Scanner error", zap.Error(err))
}
}()
}

func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *decode.Decoder, log []byte) {
decoded, err := dec.Decode(log)
if err != nil {
i.Errorw("Failed to decode data", zap.Error(err))
return
}

entry, err := i.NewEntry(string(decoded))
if err != nil {
i.Errorw("Failed to create entry", zap.Error(err))
return
}

if i.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", i.resolver.GetHostFromIP(ip))
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", i.resolver.GetHostFromIP(ip))
}
}

i.Write(ctx, entry)
}

func truncateMaxLog(data []byte, maxLogSize int) (token []byte) {
if len(data) >= maxLogSize {
return data[:maxLogSize]
}

if len(data) == 0 {
return nil
}

return data
}

// Stop will stop listening for log entries over TCP.
func (i *Input) Stop() error {
if i.cancel == nil {
return nil
}
i.cancel()

if i.listener != nil {
if err := i.listener.Close(); err != nil {
i.Errorf("failed to close TCP connection: %s", err)
}
}

i.wg.Wait()
if i.resolver != nil {
i.resolver.Stop()
}
return nil
}
File renamed without changes.
Loading

0 comments on commit f903367

Please sign in to comment.