From eb308a95c5658cec381e94234ea9d49ba35b5f6c Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Thu, 4 Apr 2024 15:35:56 -0500 Subject: [PATCH] [chore][pkg/stanza] Cleanup namedpipe input operator files (#32075) Contributes to #32058 --- .../namedpipe/{config.go => config_all.go} | 0 .../operator/input/namedpipe/config_linux.go | 52 ++++++++++ .../namedpipe/{namedpipe.go => input.go} | 96 ++++++------------- ...amedpipe_nonlinux.go => input_nonlinux.go} | 0 .../{namedpipe_test.go => input_test.go} | 0 5 files changed, 81 insertions(+), 67 deletions(-) rename pkg/stanza/operator/input/namedpipe/{config.go => config_all.go} (100%) create mode 100644 pkg/stanza/operator/input/namedpipe/config_linux.go rename pkg/stanza/operator/input/namedpipe/{namedpipe.go => input.go} (50%) rename pkg/stanza/operator/input/namedpipe/{namedpipe_nonlinux.go => input_nonlinux.go} (100%) rename pkg/stanza/operator/input/namedpipe/{namedpipe_test.go => input_test.go} (100%) diff --git a/pkg/stanza/operator/input/namedpipe/config.go b/pkg/stanza/operator/input/namedpipe/config_all.go similarity index 100% rename from pkg/stanza/operator/input/namedpipe/config.go rename to pkg/stanza/operator/input/namedpipe/config_all.go diff --git a/pkg/stanza/operator/input/namedpipe/config_linux.go b/pkg/stanza/operator/input/namedpipe/config_linux.go new file mode 100644 index 000000000000..30ad0253cfe2 --- /dev/null +++ b/pkg/stanza/operator/input/namedpipe/config_linux.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build linux + +package namedpipe // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/namedpipe" + +import ( + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" +) + +func init() { + operator.Register(operatorType, func() operator.Builder { return NewConfig() }) +} + +// Build will build a namedpipe input operator. +func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { + inputOperator, err := c.InputConfig.Build(logger) + if err != nil { + return nil, err + } + + enc, err := decode.LookupEncoding(c.Encoding) + if err != nil { + return nil, fmt.Errorf("failed to lookup encoding %q: %w", c.Encoding, err) + } + + splitFunc, err := c.SplitConfig.Func(enc, true, DefaultMaxLogSize) + if err != nil { + return nil, fmt.Errorf("failed to create split function: %w", err) + } + + maxLogSize := c.MaxLogSize + if maxLogSize == 0 { + maxLogSize = DefaultMaxLogSize + } + + return &Input{ + InputOperator: inputOperator, + + buffer: make([]byte, maxLogSize), + path: c.Path, + permissions: c.Permissions, + splitFunc: splitFunc, + trimFunc: c.TrimConfig.Func(), + }, nil +} diff --git a/pkg/stanza/operator/input/namedpipe/namedpipe.go b/pkg/stanza/operator/input/namedpipe/input.go similarity index 50% rename from pkg/stanza/operator/input/namedpipe/namedpipe.go rename to pkg/stanza/operator/input/namedpipe/input.go index 533eef7d9c38..adcaeadaad02 100644 --- a/pkg/stanza/operator/input/namedpipe/namedpipe.go +++ b/pkg/stanza/operator/input/namedpipe/input.go @@ -15,49 +15,11 @@ import ( "go.uber.org/zap" "golang.org/x/sys/unix" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -func init() { - operator.Register(operatorType, func() operator.Builder { return NewConfig() }) -} - -// Build will build a namedpipe input operator. -func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { - inputOperator, err := c.InputConfig.Build(logger) - if err != nil { - return nil, err - } - - enc, err := decode.LookupEncoding(c.Encoding) - if err != nil { - return nil, fmt.Errorf("failed to lookup encoding %q: %w", c.Encoding, err) - } - - splitFunc, err := c.SplitConfig.Func(enc, true, DefaultMaxLogSize) - if err != nil { - return nil, fmt.Errorf("failed to create split function: %w", err) - } - - maxLogSize := c.MaxLogSize - if maxLogSize == 0 { - maxLogSize = DefaultMaxLogSize - } - - return &Input{ - InputOperator: inputOperator, - - buffer: make([]byte, maxLogSize), - path: c.Path, - permissions: c.Permissions, - splitFunc: splitFunc, - trimFunc: c.TrimConfig.Func(), - }, nil -} - type Input struct { helper.InputOperator @@ -71,58 +33,58 @@ type Input struct { wg sync.WaitGroup } -func (n *Input) Start(_ operator.Persister) error { - stat, err := os.Stat(n.path) +func (i *Input) Start(_ operator.Persister) error { + stat, err := os.Stat(i.path) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("failed to stat named pipe: %w", err) } if !os.IsNotExist(err) && stat.Mode()&os.ModeNamedPipe == 0 { - return fmt.Errorf("path %s is not a named pipe", n.path) + return fmt.Errorf("path %s is not a named pipe", i.path) } if os.IsNotExist(err) { - if fifoErr := unix.Mkfifo(n.path, n.permissions); fifoErr != nil { + if fifoErr := unix.Mkfifo(i.path, i.permissions); fifoErr != nil { return fmt.Errorf("failed to create named pipe: %w", fifoErr) } } // chmod the named pipe because mkfifo respects the umask which may result // in a named pipe with incorrect permissions. - if chmodErr := os.Chmod(n.path, os.FileMode(n.permissions)); chmodErr != nil { + if chmodErr := os.Chmod(i.path, os.FileMode(i.permissions)); chmodErr != nil { return fmt.Errorf("failed to chmod named pipe: %w", chmodErr) } - watcher, err := NewWatcher(n.path) + watcher, err := NewWatcher(i.path) if err != nil { return fmt.Errorf("failed to create watcher: %w", err) } - pipe, err := os.OpenFile(n.path, os.O_RDWR, os.ModeNamedPipe) + pipe, err := os.OpenFile(i.path, os.O_RDWR, os.ModeNamedPipe) if err != nil { return fmt.Errorf("failed to open named pipe: %w", err) } - n.pipe = pipe + i.pipe = pipe ctx, cancel := context.WithCancel(context.Background()) - n.cancel = cancel + i.cancel = cancel - n.wg.Add(2) + i.wg.Add(2) go func() { - defer n.wg.Done() + defer i.wg.Done() if err := watcher.Watch(ctx); err != nil { - n.Logger().Errorw("failed to watch named pipe", zap.Error(err)) + i.Logger().Errorw("failed to watch named pipe", zap.Error(err)) } }() go func() { - defer n.wg.Done() + defer i.wg.Done() for { select { case <-watcher.C: - if err := n.process(ctx, pipe); err != nil { - n.Logger().Errorw("failed to process named pipe", zap.Error(err)) + if err := i.process(ctx, pipe); err != nil { + i.Logger().Errorw("failed to process named pipe", zap.Error(err)) } case <-ctx.Done(): return @@ -133,23 +95,23 @@ func (n *Input) Start(_ operator.Persister) error { return nil } -func (n *Input) Stop() error { - if n.pipe != nil { - n.pipe.Close() +func (i *Input) Stop() error { + if i.pipe != nil { + i.pipe.Close() } - if n.cancel != nil { - n.cancel() + if i.cancel != nil { + i.cancel() } - n.wg.Wait() + i.wg.Wait() return nil } -func (n *Input) process(ctx context.Context, pipe *os.File) error { +func (i *Input) process(ctx context.Context, pipe *os.File) error { scan := bufio.NewScanner(pipe) - scan.Split(n.splitFunc) - scan.Buffer(n.buffer, len(n.buffer)) + scan.Split(i.splitFunc) + scan.Buffer(i.buffer, len(i.buffer)) for scan.Scan() { line := scan.Bytes() @@ -157,7 +119,7 @@ func (n *Input) process(ctx context.Context, pipe *os.File) error { continue } - if err := n.sendEntry(ctx, line); err != nil { + if err := i.sendEntry(ctx, line); err != nil { return fmt.Errorf("failed to send entry: %w", err) } } @@ -166,17 +128,17 @@ func (n *Input) process(ctx context.Context, pipe *os.File) error { } // sendEntry sends an entry to the next operator in the pipeline. -func (n *Input) sendEntry(ctx context.Context, bytes []byte) error { - bytes = n.trimFunc(bytes) +func (i *Input) sendEntry(ctx context.Context, bytes []byte) error { + bytes = i.trimFunc(bytes) if len(bytes) == 0 { return nil } - entry, err := n.NewEntry(string(bytes)) + entry, err := i.NewEntry(string(bytes)) if err != nil { return fmt.Errorf("failed to create entry: %w", err) } - n.Write(ctx, entry) + i.Write(ctx, entry) return nil } diff --git a/pkg/stanza/operator/input/namedpipe/namedpipe_nonlinux.go b/pkg/stanza/operator/input/namedpipe/input_nonlinux.go similarity index 100% rename from pkg/stanza/operator/input/namedpipe/namedpipe_nonlinux.go rename to pkg/stanza/operator/input/namedpipe/input_nonlinux.go diff --git a/pkg/stanza/operator/input/namedpipe/namedpipe_test.go b/pkg/stanza/operator/input/namedpipe/input_test.go similarity index 100% rename from pkg/stanza/operator/input/namedpipe/namedpipe_test.go rename to pkg/stanza/operator/input/namedpipe/input_test.go