diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index ef1b153d39ec..80931bbe87ee 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -5,6 +5,7 @@ package filelogreceiver import ( "context" + "errors" "fmt" "io" "os" @@ -190,9 +191,13 @@ func (rt *rotationTest) Run(t *testing.T) { if rt.copyTruncate { // Recreate the backup file // if backupFileName exists - if _, err = os.Stat(backupFileName); err == nil { - require.NoError(t, os.Remove(backupFileName)) - } + require.Eventually(t, func() bool { + // On Windows you can't remove a file if it still has some handle opened to it. So remove the file + // in a loop until any async operation on it is done. + removeErr := os.Remove(backupFileName) + return errors.Is(removeErr, os.ErrNotExist) + }, 5*time.Second, 100*time.Millisecond) + backupFile, openErr := os.OpenFile(backupFileName, os.O_CREATE|os.O_RDWR, 0600) require.NoError(t, openErr) diff --git a/receiver/splunkhecreceiver/factory.go b/receiver/splunkhecreceiver/factory.go index c6e47f3788cb..ffbd2ad9f534 100644 --- a/receiver/splunkhecreceiver/factory.go +++ b/receiver/splunkhecreceiver/factory.go @@ -68,7 +68,7 @@ func createMetricsReceiver( var recv receiver.Metrics rCfg := cfg.(*Config) r := receivers.GetOrAdd(cfg, func() component.Component { - recv, err = newMetricsReceiver(params, *rCfg, consumer) + recv, err = newReceiver(params, *rCfg) return recv }) if err != nil { @@ -89,7 +89,7 @@ func createLogsReceiver( var recv receiver.Logs rCfg := cfg.(*Config) r := receivers.GetOrAdd(cfg, func() component.Component { - recv, err = newLogsReceiver(params, *rCfg, consumer) + recv, err = newReceiver(params, *rCfg) return recv }) if err != nil { diff --git a/receiver/splunkhecreceiver/factory_test.go b/receiver/splunkhecreceiver/factory_test.go index c4237e2d8d42..10f5a797f155 100644 --- a/receiver/splunkhecreceiver/factory_test.go +++ b/receiver/splunkhecreceiver/factory_test.go @@ -41,24 +41,6 @@ func TestFactoryType(t *testing.T) { assert.Equal(t, metadata.Type, NewFactory().Type()) } -func TestCreateNilNextConsumerMetrics(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "localhost:1" - - mReceiver, err := createMetricsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil) - assert.EqualError(t, err, "nil metricsConsumer") - assert.Nil(t, mReceiver, "receiver creation failed") -} - -func TestCreateNilNextConsumerLogs(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "localhost:1" - - mReceiver, err := createLogsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil) - assert.EqualError(t, err, "nil logsConsumer") - assert.Nil(t, mReceiver, "receiver creation failed") -} - func TestMultipleLogsReceivers(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Endpoint = "localhost:1" diff --git a/receiver/splunkhecreceiver/receiver.go b/receiver/splunkhecreceiver/receiver.go index 3ed0769e4f29..1d091bf3c55a 100644 --- a/receiver/splunkhecreceiver/receiver.go +++ b/receiver/splunkhecreceiver/receiver.go @@ -61,12 +61,10 @@ const ( ) var ( - errNilNextMetricsConsumer = errors.New("nil metricsConsumer") - errNilNextLogsConsumer = errors.New("nil logsConsumer") - errEmptyEndpoint = errors.New("empty endpoint") - errInvalidMethod = errors.New("invalid http method") - errInvalidEncoding = errors.New("invalid encoding") - errExtensionMissing = errors.New("ack extension not found") + errEmptyEndpoint = errors.New("empty endpoint") + errInvalidMethod = errors.New("invalid http method") + errInvalidEncoding = errors.New("invalid encoding") + errExtensionMissing = errors.New("ack extension not found") okRespBody = []byte(responseOK) eventRequiredRespBody = []byte(responseErrEventRequired) @@ -97,17 +95,10 @@ type splunkReceiver struct { } var _ receiver.Metrics = (*splunkReceiver)(nil) +var _ receiver.Logs = (*splunkReceiver)(nil) -// newMetricsReceiver creates the Splunk HEC receiver with the given configuration. -func newMetricsReceiver( - settings receiver.Settings, - config Config, - nextConsumer consumer.Metrics, -) (receiver.Metrics, error) { - if nextConsumer == nil { - return nil, errNilNextMetricsConsumer - } - +// newReceiver creates the Splunk HEC receiver with the given configuration. +func newReceiver(settings receiver.Settings, config Config) (*splunkReceiver, error) { if config.Endpoint == "" { return nil, errEmptyEndpoint } @@ -126,9 +117,8 @@ func newMetricsReceiver( return nil, err } r := &splunkReceiver{ - settings: settings, - config: &config, - metricsConsumer: nextConsumer, + settings: settings, + config: &config, server: &http.Server{ Addr: config.Endpoint, // TODO: Evaluate what properties should be configurable, for now @@ -143,51 +133,6 @@ func newMetricsReceiver( return r, nil } -// newLogsReceiver creates the Splunk HEC receiver with the given configuration. -func newLogsReceiver( - settings receiver.Settings, - config Config, - nextConsumer consumer.Logs, -) (receiver.Logs, error) { - if nextConsumer == nil { - return nil, errNilNextLogsConsumer - } - - if config.Endpoint == "" { - return nil, errEmptyEndpoint - } - transport := "http" - if config.TLSSetting != nil { - transport = "https" - } - - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: settings.ID, - Transport: transport, - ReceiverCreateSettings: settings, - }) - if err != nil { - return nil, err - } - - r := &splunkReceiver{ - settings: settings, - config: &config, - logsConsumer: nextConsumer, - server: &http.Server{ - Addr: config.Endpoint, - // TODO: Evaluate what properties should be configurable, for now - // set some hard-coded values. - ReadHeaderTimeout: defaultServerTimeout, - WriteTimeout: defaultServerTimeout, - }, - gzipReaderPool: &sync.Pool{New: func() any { return new(gzip.Reader) }}, - obsrecv: obsrecv, - } - - return r, nil -} - // Start tells the receiver to start its processing. // By convention the consumer of the received data is set when the receiver // instance is created. diff --git a/receiver/splunkhecreceiver/receiver_test.go b/receiver/splunkhecreceiver/receiver_test.go index 5039cbd8e3ca..4668fd0bb959 100644 --- a/receiver/splunkhecreceiver/receiver_test.go +++ b/receiver/splunkhecreceiver/receiver_test.go @@ -52,7 +52,7 @@ func assertHecSuccessResponseWithAckID(t *testing.T, resp *http.Response, body a assert.Equal(t, map[string]any{"code": float64(0), "text": "Success", "ackId": float64(ackID)}, body) } -func Test_splunkhecreceiver_NewLogsReceiver(t *testing.T) { +func Test_splunkhecreceiver_NewReceiver(t *testing.T) { defaultConfig := createDefaultConfig().(*Config) emptyEndpointConfig := createDefaultConfig().(*Config) emptyEndpointConfig.Endpoint = "" @@ -94,60 +94,7 @@ func Test_splunkhecreceiver_NewLogsReceiver(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := newLogsReceiver(receivertest.NewNopSettings(), tt.args.config, tt.args.logsConsumer) - assert.Equal(t, tt.wantErr, err) - if err == nil { - assert.NotNil(t, got) - } else { - assert.Nil(t, got) - } - }) - } -} - -func Test_splunkhecreceiver_NewMetricsReceiver(t *testing.T) { - defaultConfig := createDefaultConfig().(*Config) - emptyEndpointConfig := createDefaultConfig().(*Config) - emptyEndpointConfig.Endpoint = "" - type args struct { - config Config - metricsConsumer consumer.Metrics - } - tests := []struct { - name string - args args - wantErr error - }{ - { - name: "empty_endpoint", - args: args{ - config: *emptyEndpointConfig, - metricsConsumer: new(consumertest.MetricsSink), - }, - wantErr: errEmptyEndpoint, - }, - { - name: "default_endpoint", - args: args{ - config: *defaultConfig, - metricsConsumer: consumertest.NewNop(), - }, - }, - { - name: "happy_path", - args: args{ - config: Config{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: "localhost:1234", - }, - }, - metricsConsumer: consumertest.NewNop(), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := newMetricsReceiver(receivertest.NewNopSettings(), tt.args.config, tt.args.metricsConsumer) + got, err := newReceiver(receivertest.NewNopSettings(), tt.args.config) assert.Equal(t, tt.wantErr, err) if err == nil { assert.NotNil(t, got) @@ -396,15 +343,15 @@ func Test_consumer_err(t *testing.T) { splunkMsg := buildSplunkHecMsg(currentTime, 5) config := createDefaultConfig().(*Config) config.Endpoint = "localhost:0" // Actually not creating the endpoint - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, consumertest.NewErr(errors.New("bad consumer"))) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = consumertest.NewErr(errors.New("bad consumer")) - r := rcv.(*splunkReceiver) w := httptest.NewRecorder() msgBytes, err := json.Marshal(splunkMsg) require.NoError(t, err) req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) - r.handleReq(w, req) + rcv.handleReq(w, req) resp := w.Result() defer resp.Body.Close() @@ -424,15 +371,15 @@ func Test_consumer_err_metrics(t *testing.T) { assert.True(t, splunkMsg.IsMetric()) config := createDefaultConfig().(*Config) config.Endpoint = "localhost:0" // Actually not creating the endpoint\ - rcv, err := newMetricsReceiver(receivertest.NewNopSettings(), *config, consumertest.NewErr(errors.New("bad consumer"))) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.metricsConsumer = consumertest.NewErr(errors.New("bad consumer")) - r := rcv.(*splunkReceiver) w := httptest.NewRecorder() msgBytes, err := json.Marshal(splunkMsg) require.NoError(t, err) req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) - r.handleReq(w, req) + rcv.handleReq(w, req) resp := w.Result() defer resp.Body.Close() @@ -458,8 +405,9 @@ func Test_splunkhecReceiver_TLS(t *testing.T) { } sink := new(consumertest.LogsSink) set := receivertest.NewNopSettings() - r, err := newLogsReceiver(set, *cfg, sink) + r, err := newReceiver(set, *cfg) require.NoError(t, err) + r.logsConsumer = sink defer func() { require.NoError(t, r.Shutdown(context.Background())) }() @@ -647,11 +595,11 @@ func Test_splunkhecReceiver_AccessTokenPassthrough(t *testing.T) { require.NoError(t, exporter.Shutdown(context.Background())) }() assert.NoError(t, err) - rcv, err := newMetricsReceiver(receivertest.NewNopSettings(), *config, exporter) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) - r := rcv.(*splunkReceiver) + rcv.metricsConsumer = exporter w := httptest.NewRecorder() - r.handleReq(w, req) + rcv.handleReq(w, req) resp := w.Result() defer resp.Body.Close() _, err = io.ReadAll(resp.Body) @@ -663,11 +611,11 @@ func Test_splunkhecReceiver_AccessTokenPassthrough(t *testing.T) { require.NoError(t, exporter.Shutdown(context.Background())) }() assert.NoError(t, err) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, exporter) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) - r := rcv.(*splunkReceiver) + rcv.logsConsumer = exporter w := httptest.NewRecorder() - r.handleReq(w, req) + rcv.handleReq(w, req) resp := w.Result() defer resp.Body.Close() _, err = io.ReadAll(resp.Body) @@ -732,8 +680,9 @@ func Test_Logs_splunkhecReceiver_IndexSourceTypePassthrough(t *testing.T) { defer func() { require.NoError(t, exporter.Shutdown(context.Background())) }() - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *cfg, exporter) + rcv, err := newReceiver(receivertest.NewNopSettings(), *cfg) assert.NoError(t, err) + rcv.logsConsumer = exporter currentTime := float64(time.Now().UnixNano()) / 1e6 splunkhecMsg := buildSplunkHecMsg(currentTime, 3) @@ -761,9 +710,8 @@ func Test_Logs_splunkhecReceiver_IndexSourceTypePassthrough(t *testing.T) { done <- true }() - r := rcv.(*splunkReceiver) w := httptest.NewRecorder() - r.handleReq(w, req) + rcv.handleReq(w, req) resp := w.Result() respBytes, err := io.ReadAll(resp.Body) assert.NoError(t, err) @@ -833,8 +781,9 @@ func Test_Metrics_splunkhecReceiver_IndexSourceTypePassthrough(t *testing.T) { require.NoError(t, exporter.Shutdown(context.Background())) }() assert.NoError(t, err) - rcv, err := newMetricsReceiver(receivertest.NewNopSettings(), *cfg, exporter) + rcv, err := newReceiver(receivertest.NewNopSettings(), *cfg) assert.NoError(t, err) + rcv.metricsConsumer = exporter currentTime := float64(time.Now().UnixNano()) / 1e6 splunkhecMsg := buildSplunkHecMetricsMsg(currentTime, 42, 3) @@ -862,9 +811,8 @@ func Test_Metrics_splunkhecReceiver_IndexSourceTypePassthrough(t *testing.T) { done <- true }() - r := rcv.(*splunkReceiver) w := httptest.NewRecorder() - r.handleReq(w, req) + rcv.handleReq(w, req) resp := w.Result() respBytes, err := io.ReadAll(resp.Body) defer resp.Body.Close() @@ -1099,17 +1047,16 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) - - assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost())) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() w := httptest.NewRecorder() - r.handleRawReq(w, tt.req) + rcv.handleRawReq(w, tt.req) resp := w.Result() respBytes, err := io.ReadAll(resp.Body) @@ -1152,16 +1099,16 @@ func Test_splunkhecReceiver_Start(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *tt.getConfig(), sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *tt.getConfig()) assert.NoError(t, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) if tt.errorExpected { - assert.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) + assert.Error(t, rcv.Start(context.Background(), componenttest.NewNopHost())) } else { - assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost())) } - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }) } } @@ -1366,20 +1313,20 @@ func Test_splunkhecReceiver_handleAck(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink mockHost := mockHost{extensions: map[component.ID]component.Component{ id: tt.setupMockAckExtension(), }} - r := rcv.(*splunkReceiver) - assert.NoError(t, r.Start(context.Background(), mockHost)) + assert.NoError(t, rcv.Start(context.Background(), mockHost)) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() w := httptest.NewRecorder() - r.handleAck(w, tt.req) + rcv.handleAck(w, tt.req) resp := w.Result() respBytes, err := io.ReadAll(resp.Body) @@ -1533,20 +1480,20 @@ func Test_splunkhecReceiver_handleRawReq_WithAck(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink mh := mockHost{extensions: map[component.ID]component.Component{ id: tt.setupMockAckExtension(), }} - r := rcv.(*splunkReceiver) - assert.NoError(t, r.Start(context.Background(), mh)) + assert.NoError(t, rcv.Start(context.Background(), mh)) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() w := httptest.NewRecorder() - r.handleRawReq(w, tt.req) + rcv.handleRawReq(w, tt.req) resp := w.Result() respBytes, err := io.ReadAll(resp.Body) @@ -1717,21 +1664,21 @@ func Test_splunkhecReceiver_handleReq_WithAck(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) w := httptest.NewRecorder() mh := mockHost{extensions: map[component.ID]component.Component{ id: tt.setupMockAckExtension(), }} - assert.NoError(t, r.Start(context.Background(), mh)) + assert.NoError(t, rcv.Start(context.Background(), mh)) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() - r.handleReq(w, tt.req) + rcv.handleReq(w, tt.req) resp := w.Result() defer resp.Body.Close() @@ -1753,16 +1700,16 @@ func Test_splunkhecReceiver_handleReq_WithAck(t *testing.T) { func Test_splunkhecreceiver_handleHealthPath(t *testing.T) { config := createDefaultConfig().(*Config) sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) - assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost())) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() w := httptest.NewRecorder() - r.handleHealthReq(w, httptest.NewRequest("GET", "http://localhost/services/collector/health", nil)) + rcv.handleHealthReq(w, httptest.NewRequest("GET", "http://localhost/services/collector/health", nil)) resp := w.Result() respBytes, err := io.ReadAll(resp.Body) @@ -1814,13 +1761,13 @@ func Test_splunkhecreceiver_handle_nested_fields(t *testing.T) { t.Run(tt.name, func(t *testing.T) { config := createDefaultConfig().(*Config) sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) - assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost())) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() currentTime := float64(time.Now().UnixNano()) / 1e6 event := buildSplunkHecMsg(currentTime, 3) @@ -1830,7 +1777,7 @@ func Test_splunkhecreceiver_handle_nested_fields(t *testing.T) { req := httptest.NewRequest("POST", "http://localhost/services/collector", bytes.NewReader(msgBytes)) w := httptest.NewRecorder() - r.handleReq(w, req) + rcv.handleReq(w, req) if tt.success { assert.Equal(t, http.StatusOK, w.Code) @@ -1941,16 +1888,16 @@ func Test_splunkhecReceiver_rawReqHasmetadataInResource(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) - assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost())) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() w := httptest.NewRecorder() - r.handleRawReq(w, tt.req) + rcv.handleRawReq(w, tt.req) resp := w.Result() assert.NoError(t, err) @@ -1966,10 +1913,10 @@ func BenchmarkHandleReq(b *testing.B) { config := createDefaultConfig().(*Config) config.Endpoint = "localhost:0" sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(b, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) w := httptest.NewRecorder() currentTime := float64(time.Now().UnixNano()) / 1e6 splunkMsg := buildSplunkHecMsg(currentTime, 2) @@ -1985,7 +1932,7 @@ func BenchmarkHandleReq(b *testing.B) { for n := 0; n < b.N; n++ { req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(totalMessage)) - r.handleReq(w, req) + rcv.handleReq(w, req) resp := w.Result() _, err = io.ReadAll(resp.Body) @@ -2042,17 +1989,17 @@ func Test_splunkhecReceiver_healthCheck_success(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(receivertest.NewNopSettings(), *config, sink) + rcv, err := newReceiver(receivertest.NewNopSettings(), *config) assert.NoError(t, err) + rcv.logsConsumer = sink - r := rcv.(*splunkReceiver) - assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost())) defer func() { - assert.NoError(t, r.Shutdown(context.Background())) + assert.NoError(t, rcv.Shutdown(context.Background())) }() w := httptest.NewRecorder() - r.server.Handler.ServeHTTP(w, tt.req) + rcv.server.Handler.ServeHTTP(w, tt.req) resp := w.Result() defer resp.Body.Close() respBytes, err := io.ReadAll(resp.Body)