Skip to content

Commit

Permalink
[chore][pkg/stanza] Cleanup namedpipe input operator files (#32075)
Browse files Browse the repository at this point in the history
Contributes to #32058
  • Loading branch information
djaglowski authored Apr 4, 2024
1 parent 12fe8e2 commit eb308a9
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 67 deletions.
52 changes: 52 additions & 0 deletions pkg/stanza/operator/input/namedpipe/config_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build linux

package namedpipe // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/namedpipe"

import (
"fmt"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// Build will build a namedpipe input operator.
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to lookup encoding %q: %w", c.Encoding, err)
}

splitFunc, err := c.SplitConfig.Func(enc, true, DefaultMaxLogSize)
if err != nil {
return nil, fmt.Errorf("failed to create split function: %w", err)
}

maxLogSize := c.MaxLogSize
if maxLogSize == 0 {
maxLogSize = DefaultMaxLogSize
}

return &Input{
InputOperator: inputOperator,

buffer: make([]byte, maxLogSize),
path: c.Path,
permissions: c.Permissions,
splitFunc: splitFunc,
trimFunc: c.TrimConfig.Func(),
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,11 @@ import (
"go.uber.org/zap"
"golang.org/x/sys/unix"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// Build will build a namedpipe input operator.
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to lookup encoding %q: %w", c.Encoding, err)
}

splitFunc, err := c.SplitConfig.Func(enc, true, DefaultMaxLogSize)
if err != nil {
return nil, fmt.Errorf("failed to create split function: %w", err)
}

maxLogSize := c.MaxLogSize
if maxLogSize == 0 {
maxLogSize = DefaultMaxLogSize
}

return &Input{
InputOperator: inputOperator,

buffer: make([]byte, maxLogSize),
path: c.Path,
permissions: c.Permissions,
splitFunc: splitFunc,
trimFunc: c.TrimConfig.Func(),
}, nil
}

type Input struct {
helper.InputOperator

Expand All @@ -71,58 +33,58 @@ type Input struct {
wg sync.WaitGroup
}

func (n *Input) Start(_ operator.Persister) error {
stat, err := os.Stat(n.path)
func (i *Input) Start(_ operator.Persister) error {
stat, err := os.Stat(i.path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to stat named pipe: %w", err)
}

if !os.IsNotExist(err) && stat.Mode()&os.ModeNamedPipe == 0 {
return fmt.Errorf("path %s is not a named pipe", n.path)
return fmt.Errorf("path %s is not a named pipe", i.path)
}

if os.IsNotExist(err) {
if fifoErr := unix.Mkfifo(n.path, n.permissions); fifoErr != nil {
if fifoErr := unix.Mkfifo(i.path, i.permissions); fifoErr != nil {
return fmt.Errorf("failed to create named pipe: %w", fifoErr)
}
}

// chmod the named pipe because mkfifo respects the umask which may result
// in a named pipe with incorrect permissions.
if chmodErr := os.Chmod(n.path, os.FileMode(n.permissions)); chmodErr != nil {
if chmodErr := os.Chmod(i.path, os.FileMode(i.permissions)); chmodErr != nil {
return fmt.Errorf("failed to chmod named pipe: %w", chmodErr)
}

watcher, err := NewWatcher(n.path)
watcher, err := NewWatcher(i.path)
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
}

pipe, err := os.OpenFile(n.path, os.O_RDWR, os.ModeNamedPipe)
pipe, err := os.OpenFile(i.path, os.O_RDWR, os.ModeNamedPipe)
if err != nil {
return fmt.Errorf("failed to open named pipe: %w", err)
}

n.pipe = pipe
i.pipe = pipe

ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel
i.cancel = cancel

n.wg.Add(2)
i.wg.Add(2)
go func() {
defer n.wg.Done()
defer i.wg.Done()
if err := watcher.Watch(ctx); err != nil {
n.Logger().Errorw("failed to watch named pipe", zap.Error(err))
i.Logger().Errorw("failed to watch named pipe", zap.Error(err))
}
}()

go func() {
defer n.wg.Done()
defer i.wg.Done()
for {
select {
case <-watcher.C:
if err := n.process(ctx, pipe); err != nil {
n.Logger().Errorw("failed to process named pipe", zap.Error(err))
if err := i.process(ctx, pipe); err != nil {
i.Logger().Errorw("failed to process named pipe", zap.Error(err))
}
case <-ctx.Done():
return
Expand All @@ -133,31 +95,31 @@ func (n *Input) Start(_ operator.Persister) error {
return nil
}

func (n *Input) Stop() error {
if n.pipe != nil {
n.pipe.Close()
func (i *Input) Stop() error {
if i.pipe != nil {
i.pipe.Close()
}

if n.cancel != nil {
n.cancel()
if i.cancel != nil {
i.cancel()
}

n.wg.Wait()
i.wg.Wait()
return nil
}

func (n *Input) process(ctx context.Context, pipe *os.File) error {
func (i *Input) process(ctx context.Context, pipe *os.File) error {
scan := bufio.NewScanner(pipe)
scan.Split(n.splitFunc)
scan.Buffer(n.buffer, len(n.buffer))
scan.Split(i.splitFunc)
scan.Buffer(i.buffer, len(i.buffer))

for scan.Scan() {
line := scan.Bytes()
if len(line) == 0 {
continue
}

if err := n.sendEntry(ctx, line); err != nil {
if err := i.sendEntry(ctx, line); err != nil {
return fmt.Errorf("failed to send entry: %w", err)
}
}
Expand All @@ -166,17 +128,17 @@ func (n *Input) process(ctx context.Context, pipe *os.File) error {
}

// sendEntry sends an entry to the next operator in the pipeline.
func (n *Input) sendEntry(ctx context.Context, bytes []byte) error {
bytes = n.trimFunc(bytes)
func (i *Input) sendEntry(ctx context.Context, bytes []byte) error {
bytes = i.trimFunc(bytes)
if len(bytes) == 0 {
return nil
}

entry, err := n.NewEntry(string(bytes))
entry, err := i.NewEntry(string(bytes))
if err != nil {
return fmt.Errorf("failed to create entry: %w", err)
}

n.Write(ctx, entry)
i.Write(ctx, entry)
return nil
}

0 comments on commit eb308a9

Please sign in to comment.