Skip to content

Commit

Permalink
Merge branch 'main' into tlscheckreceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-burt authored Oct 30, 2024
2 parents 8aac763 + fc8132c commit 533c8a7
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 210 deletions.
11 changes: 8 additions & 3 deletions receiver/filelogreceiver/filelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package filelogreceiver

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -190,9 +191,13 @@ func (rt *rotationTest) Run(t *testing.T) {
if rt.copyTruncate {
// Recreate the backup file
// if backupFileName exists
if _, err = os.Stat(backupFileName); err == nil {
require.NoError(t, os.Remove(backupFileName))
}
require.Eventually(t, func() bool {
// On Windows you can't remove a file if it still has some handle opened to it. So remove the file
// in a loop until any async operation on it is done.
removeErr := os.Remove(backupFileName)
return errors.Is(removeErr, os.ErrNotExist)
}, 5*time.Second, 100*time.Millisecond)

backupFile, openErr := os.OpenFile(backupFileName, os.O_CREATE|os.O_RDWR, 0600)
require.NoError(t, openErr)

Expand Down
4 changes: 2 additions & 2 deletions receiver/splunkhecreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func createMetricsReceiver(
var recv receiver.Metrics
rCfg := cfg.(*Config)
r := receivers.GetOrAdd(cfg, func() component.Component {
recv, err = newMetricsReceiver(params, *rCfg, consumer)
recv, err = newReceiver(params, *rCfg)
return recv
})
if err != nil {
Expand All @@ -89,7 +89,7 @@ func createLogsReceiver(
var recv receiver.Logs
rCfg := cfg.(*Config)
r := receivers.GetOrAdd(cfg, func() component.Component {
recv, err = newLogsReceiver(params, *rCfg, consumer)
recv, err = newReceiver(params, *rCfg)
return recv
})
if err != nil {
Expand Down
18 changes: 0 additions & 18 deletions receiver/splunkhecreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,6 @@ func TestFactoryType(t *testing.T) {
assert.Equal(t, metadata.Type, NewFactory().Type())
}

func TestCreateNilNextConsumerMetrics(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:1"

mReceiver, err := createMetricsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
assert.EqualError(t, err, "nil metricsConsumer")
assert.Nil(t, mReceiver, "receiver creation failed")
}

func TestCreateNilNextConsumerLogs(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:1"

mReceiver, err := createLogsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
assert.EqualError(t, err, "nil logsConsumer")
assert.Nil(t, mReceiver, "receiver creation failed")
}

func TestMultipleLogsReceivers(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:1"
Expand Down
73 changes: 9 additions & 64 deletions receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,10 @@ const (
)

var (
errNilNextMetricsConsumer = errors.New("nil metricsConsumer")
errNilNextLogsConsumer = errors.New("nil logsConsumer")
errEmptyEndpoint = errors.New("empty endpoint")
errInvalidMethod = errors.New("invalid http method")
errInvalidEncoding = errors.New("invalid encoding")
errExtensionMissing = errors.New("ack extension not found")
errEmptyEndpoint = errors.New("empty endpoint")
errInvalidMethod = errors.New("invalid http method")
errInvalidEncoding = errors.New("invalid encoding")
errExtensionMissing = errors.New("ack extension not found")

okRespBody = []byte(responseOK)
eventRequiredRespBody = []byte(responseErrEventRequired)
Expand Down Expand Up @@ -97,17 +95,10 @@ type splunkReceiver struct {
}

var _ receiver.Metrics = (*splunkReceiver)(nil)
var _ receiver.Logs = (*splunkReceiver)(nil)

// newMetricsReceiver creates the Splunk HEC receiver with the given configuration.
func newMetricsReceiver(
settings receiver.Settings,
config Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, errNilNextMetricsConsumer
}

// newReceiver creates the Splunk HEC receiver with the given configuration.
func newReceiver(settings receiver.Settings, config Config) (*splunkReceiver, error) {
if config.Endpoint == "" {
return nil, errEmptyEndpoint
}
Expand All @@ -126,9 +117,8 @@ func newMetricsReceiver(
return nil, err
}
r := &splunkReceiver{
settings: settings,
config: &config,
metricsConsumer: nextConsumer,
settings: settings,
config: &config,
server: &http.Server{
Addr: config.Endpoint,
// TODO: Evaluate what properties should be configurable, for now
Expand All @@ -143,51 +133,6 @@ func newMetricsReceiver(
return r, nil
}

// newLogsReceiver creates the Splunk HEC receiver with the given configuration.
func newLogsReceiver(
settings receiver.Settings,
config Config,
nextConsumer consumer.Logs,
) (receiver.Logs, error) {
if nextConsumer == nil {
return nil, errNilNextLogsConsumer
}

if config.Endpoint == "" {
return nil, errEmptyEndpoint
}
transport := "http"
if config.TLSSetting != nil {
transport = "https"
}

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: settings.ID,
Transport: transport,
ReceiverCreateSettings: settings,
})
if err != nil {
return nil, err
}

r := &splunkReceiver{
settings: settings,
config: &config,
logsConsumer: nextConsumer,
server: &http.Server{
Addr: config.Endpoint,
// TODO: Evaluate what properties should be configurable, for now
// set some hard-coded values.
ReadHeaderTimeout: defaultServerTimeout,
WriteTimeout: defaultServerTimeout,
},
gzipReaderPool: &sync.Pool{New: func() any { return new(gzip.Reader) }},
obsrecv: obsrecv,
}

return r, nil
}

// Start tells the receiver to start its processing.
// By convention the consumer of the received data is set when the receiver
// instance is created.
Expand Down
Loading

0 comments on commit 533c8a7

Please sign in to comment.