From ad7f371f1e632f166cb1e7acbad3b31fc6bff12d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 18:55:44 +0100 Subject: [PATCH 1/2] [processor/k8sattributes]: log error encountered during kube client initialisation (#36385) #### Description This PR adds more log output to the k8s attributes receiver to log any errors that are encountered during the kube client initialisation, to make troubleshooting and identifying this issue easier. #### Link to tracking issue Fixes #35879 --------- Signed-off-by: Florian Bacher --- .../k8sattributes-k8s-client-init-log.yaml | 29 +++++++++++++++++++ processor/k8sattributesprocessor/processor.go | 2 ++ 2 files changed, 31 insertions(+) create mode 100644 .chloggen/k8sattributes-k8s-client-init-log.yaml diff --git a/.chloggen/k8sattributes-k8s-client-init-log.yaml b/.chloggen/k8sattributes-k8s-client-init-log.yaml new file mode 100644 index 000000000000..f1c67ff28319 --- /dev/null +++ b/.chloggen/k8sattributes-k8s-client-init-log.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Log any errors encountered during kube client initialisation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35879] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This addresses an issue where the collector, due to an error encountered during the kubernetes client initialisation, + was reporting an 'unavailable' status via the health check extension without any further information to be found in the logs. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 98499f5e5473..9fa753f95fe7 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -62,6 +62,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err for _, opt := range allOptions { if err := opt(kp); err != nil { + kp.logger.Error("Could not apply option", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) return err } @@ -71,6 +72,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err if kp.kc == nil { err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) if err != nil { + kp.logger.Error("Could not initialize kube client", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) return err } From 396c63d0ad4f696d694ff09e40b0abc4cceb8f15 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 19:20:40 +0100 Subject: [PATCH 2/2] [chore] remove converter type from stanza (#36288) #### Description This PR removes the `Converter` type that was previously used mainly by the stanza receiver adapter (see https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35669#issuecomment-2443455752 for more details). Two other receivers were still using the converter to generate test data within the unit tests, so those have been adapted as well with this PR #### Link to tracking issue Follow up to https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35453 #### Testing Adapted unit tests that were still using the converter --------- Signed-off-by: Florian Bacher --- pkg/stanza/adapter/converter.go | 184 ---------------- pkg/stanza/adapter/converter_test.go | 219 ++----------------- receiver/filelogreceiver/filelog_test.go | 77 +++---- receiver/filelogreceiver/go.mod | 2 + receiver/namedpipereceiver/namedpipe_test.go | 4 - 5 files changed, 46 insertions(+), 440 deletions(-) diff --git a/pkg/stanza/adapter/converter.go b/pkg/stanza/adapter/converter.go index 3ab508745bc3..a81fd8f00a42 100644 --- a/pkg/stanza/adapter/converter.go +++ b/pkg/stanza/adapter/converter.go @@ -4,162 +4,19 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" import ( - "context" "encoding/binary" "encoding/json" - "errors" "fmt" - "math" - "runtime" "sort" "sync" "github.com/cespare/xxhash/v2" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) -// Converter converts a batch of entry.Entry into plog.Logs aggregating translated -// entries into logs coming from the same Resource. -// -// The diagram below illustrates the internal communication inside the Converter: -// -// ┌─────────────────────────────────┐ -// │ Batch() │ -// ┌─────────┤ Ingests batches of log entries │ -// │ │ and sends them onto workerChan │ -// │ └─────────────────────────────────┘ -// │ -// │ ┌───────────────────────────────────────────────────┐ -// ├─► workerLoop() │ -// │ │ ┌─────────────────────────────────────────────────┴─┐ -// ├─┼─► workerLoop() │ -// │ │ │ ┌─────────────────────────────────────────────────┴─┐ -// └─┼─┼─► workerLoop() │ -// └─┤ │ consumes sent log entries from workerChan, │ -// │ │ translates received entries to plog.LogRecords, │ -// └─┤ and sends them on flushChan │ -// └─────────────────────────┬─────────────────────────┘ -// │ -// ▼ -// ┌─────────────────────────────────────────────────────┐ -// │ flushLoop() │ -// │ receives log records from flushChan and sends │ -// │ them onto pLogsChan which is consumed by │ -// │ downstream consumers via OutChannel() │ -// └─────────────────────────────────────────────────────┘ -type Converter struct { - set component.TelemetrySettings - - // pLogsChan is a channel on which aggregated logs will be sent to. - pLogsChan chan plog.Logs - - stopOnce sync.Once - - // converterChan is an internal communication channel signaling stop was called - // prevents sending to closed channels - converterChan chan struct{} - - // workerChan is an internal communication channel that gets the log - // entries from Batch() calls and it receives the data in workerLoop(). - workerChan chan []*entry.Entry - // workerCount configures the amount of workers started. - workerCount int - - // flushChan is an internal channel used for transporting batched plog.Logs. - flushChan chan plog.Logs - - // wg is a WaitGroup that makes sure that we wait for spun up goroutines exit - // when Stop() is called. - wg sync.WaitGroup - - // flushWg is a WaitGroup that makes sure that we wait for flush loop to exit - // when Stop() is called. - flushWg sync.WaitGroup -} - -type converterOption interface { - apply(*Converter) -} - -func withWorkerCount(workerCount int) converterOption { - return workerCountOption{workerCount} -} - -type workerCountOption struct { - workerCount int -} - -func (o workerCountOption) apply(c *Converter) { - c.workerCount = o.workerCount -} - -func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter { - set.Logger = set.Logger.With(zap.String("component", "converter")) - c := &Converter{ - set: set, - workerChan: make(chan []*entry.Entry), - workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))), - pLogsChan: make(chan plog.Logs), - converterChan: make(chan struct{}), - flushChan: make(chan plog.Logs), - } - for _, opt := range opts { - opt.apply(c) - } - return c -} - -func (c *Converter) Start() { - c.set.Logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount)) - - c.wg.Add(c.workerCount) - for i := 0; i < c.workerCount; i++ { - go c.workerLoop() - } - - c.flushWg.Add(1) - go c.flushLoop() -} - -func (c *Converter) Stop() { - c.stopOnce.Do(func() { - close(c.converterChan) - - // close workerChan and wait for entries to be processed - close(c.workerChan) - c.wg.Wait() - - // close flushChan and wait for flush loop to finish - close(c.flushChan) - c.flushWg.Wait() - - // close pLogsChan so callers can stop processing - close(c.pLogsChan) - }) -} - -// OutChannel returns the channel on which converted entries will be sent to. -func (c *Converter) OutChannel() <-chan plog.Logs { - return c.pLogsChan -} - -// workerLoop is responsible for obtaining log entries from Batch() calls, -// converting them to plog.LogRecords batched by Resource, and sending them -// on flushChan. -func (c *Converter) workerLoop() { - defer c.wg.Done() - - for entries := range c.workerChan { - // Send plogs directly to flushChan - c.flushChan <- ConvertEntries(entries) - } -} - func ConvertEntries(entries []*entry.Entry) plog.Logs { resourceHashToIdx := make(map[uint64]int) scopeIdxByResource := make(map[uint64]map[string]int) @@ -197,47 +54,6 @@ func ConvertEntries(entries []*entry.Entry) plog.Logs { return pLogs } -func (c *Converter) flushLoop() { - defer c.flushWg.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - for pLogs := range c.flushChan { - if err := c.flush(ctx, pLogs); err != nil { - c.set.Logger.Debug("Problem sending log entries", - zap.Error(err), - ) - } - } -} - -// flush flushes provided plog.Logs entries onto a channel. -func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error { - doneChan := ctx.Done() - - select { - case <-doneChan: - return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err()) - - case c.pLogsChan <- pLogs: - } - - return nil -} - -// Batch takes in an entry.Entry and sends it to an available worker for processing. -func (c *Converter) Batch(e []*entry.Entry) error { - // in case Stop was called do not process batch - select { - case <-c.converterChan: - return errors.New("logs converter has been stopped") - default: - } - - c.workerChan <- e - return nil -} - // convert converts one entry.Entry into plog.LogRecord allocating it. func convert(ent *entry.Entry) plog.LogRecord { dest := plog.NewLogRecord() diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index a56a21f94eb2..a527e3bddb84 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -4,20 +4,15 @@ package adapter import ( - "context" "fmt" "sort" "strconv" - "sync" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) @@ -42,10 +37,6 @@ func BenchmarkConvertComplex(b *testing.B) { } } -func complexEntries(count int) []*entry.Entry { - return complexEntriesForNDifferentHosts(count, 1) -} - func complexEntriesForNDifferentHosts(count int, n int) []*entry.Entry { ret := make([]*entry.Entry, count) for i := 0; i < count; i++ { @@ -392,163 +383,25 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - assert.NoError(t, converter.Batch(entries)) - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - select { - case pLogs, ok := <-ch: - if !ok { - break - } + entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - rLogs := pLogs.ResourceLogs() - rLog := rLogs.At(0) + pLogs := ConvertEntries(entries) - ills := rLog.ScopeLogs() - require.Equal(t, ills.Len(), tc.numberOFScopes) + rLogs := pLogs.ResourceLogs() + rLog := rLogs.At(0) - for i := 0; i < tc.numberOFScopes; i++ { - sl := ills.At(i) - require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) - require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) - } + ills := rLog.ScopeLogs() + require.Equal(t, ills.Len(), tc.numberOFScopes) - case <-timeoutTimer.C: - break + for i := 0; i < tc.numberOFScopes; i++ { + sl := ills.At(i) + require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) + require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) } }) } } -func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { - t.Parallel() - - testcases := []struct { - entries int - maxFlushCount uint - }{ - { - entries: 10, - maxFlushCount: 10, - }, - { - entries: 10, - maxFlushCount: 3, - }, - { - entries: 100, - maxFlushCount: 20, - }, - } - - for i, tc := range testcases { - tc := tc - - t.Run(strconv.Itoa(i), func(t *testing.T) { - t.Parallel() - - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntries(tc.entries) - for from := 0; from < tc.entries; from += int(tc.maxFlushCount) { - to := from + int(tc.maxFlushCount) - if to > tc.entries { - to = tc.entries - } - assert.NoError(t, converter.Batch(entries[from:to])) - } - }() - - var ( - actualCount int - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - forLoop: - for { - if tc.entries == actualCount { - break - } - - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } - - rLogs := pLogs.ResourceLogs() - require.Equal(t, 1, rLogs.Len()) - - rLog := rLogs.At(0) - ills := rLog.ScopeLogs() - require.Equal(t, 1, ills.Len()) - - sl := ills.At(0) - - actualCount += sl.LogRecords().Len() - - assert.LessOrEqual(t, uint(sl.LogRecords().Len()), tc.maxFlushCount, - "Received more log records in one flush than configured by maxFlushCount", - ) - - case <-timeoutTimer.C: - break forLoop - } - } - - assert.Equal(t, tc.entries, actualCount, - "didn't receive expected number of entries after conversion", - ) - }) - } -} - -func TestConverterCancelledContextCancellsTheFlush(t *testing.T) { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - var wg sync.WaitGroup - wg.Add(1) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - go func() { - defer wg.Done() - pLogs := plog.NewLogs() - ills := pLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() - - lr := convert(complexEntry()) - lr.CopyTo(ills.LogRecords().AppendEmpty()) - - assert.Error(t, converter.flush(ctx, pLogs)) - }() - wg.Wait() -} - func TestConvertMetadata(t *testing.T) { now := time.Now() @@ -946,55 +799,17 @@ func BenchmarkConverter(b *testing.B) { for _, wc := range workerCounts { b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { for i := 0; i < b.N; i++ { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(b) - converter := NewConverter(set, withWorkerCount(wc)) - converter.Start() - defer converter.Stop() - b.ReportAllocs() - go func() { - for from := 0; from < entryCount; from += int(batchSize) { - to := from + int(batchSize) - if to > entryCount { - to = entryCount - } - assert.NoError(b, converter.Batch(entries[from:to])) - } - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - var n int - forLoop: - for { - if n == entryCount { - break - } - - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } - - rLogs := pLogs.ResourceLogs() - require.Equal(b, hostsCount, rLogs.Len()) - n += pLogs.LogRecordCount() - - case <-timeoutTimer.C: - break forLoop + for from := 0; from < entryCount; from += int(batchSize) { + to := from + int(batchSize) + if to > entryCount { + to = entryCount } + pLogs := ConvertEntries(entries[from:to]) + rLogs := pLogs.ResourceLogs() + require.Equal(b, hostsCount, rLogs.Len()) } - - assert.Equal(b, entryCount, n, - "didn't receive expected number of entries after conversion", - ) } }) } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 6d9cb8f7f577..a49a7d780286 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -12,7 +12,6 @@ import ( "path/filepath" "runtime" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -28,6 +27,7 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -83,44 +83,50 @@ func TestReadStaticFile(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), 3, &wg) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) + expectedLogs := []plog.Logs{} // Build the expected set by using adapter.Converter to translate entries // to pdata Logs. - queueEntry := func(t *testing.T, c *adapter.Converter, msg string, severity entry.Severity) { + entries := []*entry.Entry{} + queueEntry := func(msg string, severity entry.Severity) { e := entry.New() e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) + e.Body = fmt.Sprintf("2020-08-25 %s %s", severity.String(), msg) e.Severity = severity - e.AddAttribute("file_name", "simple.log") - require.NoError(t, c.Batch([]*entry.Entry{e})) + e.AddAttribute("log.file.name", "simple.log") + e.AddAttribute("time", "2020-08-25") + e.AddAttribute("sev", severity.String()) + e.AddAttribute("msg", msg) + entries = append(entries, e) } - queueEntry(t, converter, "Something routine", entry.Info) - queueEntry(t, converter, "Something bad happened!", entry.Error) - queueEntry(t, converter, "Some details...", entry.Debug) + queueEntry("Something routine", entry.Info) + queueEntry("Something bad happened!", entry.Error) + queueEntry("Some details...", entry.Debug) + + expectedLogs = append(expectedLogs, adapter.ConvertEntries(entries)) dir, err := os.Getwd() require.NoError(t, err) t.Logf("Working Directory: %s", dir) - wg.Wait() - require.Eventually(t, expectNLogs(sink, 3), 2*time.Second, 5*time.Millisecond, "expected %d but got %d logs", 3, sink.LogRecordCount(), ) - // TODO: Figure out a nice way to assert each logs entry content. - // require.Equal(t, expectedLogs, sink.AllLogs()) + + for i, expectedLog := range expectedLogs { + require.NoError(t, + plogtest.CompareLogs( + expectedLog, + sink.AllLogs()[i], + plogtest.IgnoreObservedTimestamp(), + plogtest.IgnoreTimestamp(), + ), + ) + } require.NoError(t, rcvr.Shutdown(context.Background())) } @@ -168,15 +174,6 @@ func (rt *rotationTest) Run(t *testing.T) { fileName := filepath.Join(tempDir, "test.log") backupFileName := filepath.Join(tempDir, "test-backup.log") - // Build expected outputs - expectedTimestamp, _ := time.ParseInLocation("2006-01-02", "2020-08-25", time.Local) - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), numLogs, &wg) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) @@ -224,40 +221,20 @@ func (rt *rotationTest) Run(t *testing.T) { msg := fmt.Sprintf("This is a simple log line with the number %3d", i) - // Build the expected set by converting entries to pdata Logs... - e := entry.New() - e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) - require.NoError(t, converter.Batch([]*entry.Entry{e})) - // ... and write the logs lines to the actual file consumed by receiver. _, err := file.WriteString(fmt.Sprintf("2020-08-25 %s\n", msg)) require.NoError(t, err) time.Sleep(time.Millisecond) } - wg.Wait() require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, 10*time.Millisecond, "expected %d but got %d logs", numLogs, sink.LogRecordCount(), ) + // TODO: Figure out a nice way to assert each logs entry content. // require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) - converter.Stop() -} - -func consumeNLogsFromConverter(ch <-chan plog.Logs, count int, wg *sync.WaitGroup) { - defer wg.Done() - - n := 0 - for pLog := range ch { - n += pLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() - - if n == count { - return - } - } } func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool { diff --git a/receiver/filelogreceiver/go.mod b/receiver/filelogreceiver/go.mod index 0b83f6d48fb5..43e705cd1e03 100644 --- a/receiver/filelogreceiver/go.mod +++ b/receiver/filelogreceiver/go.mod @@ -19,6 +19,7 @@ require ( ) require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.115.0 go.opentelemetry.io/collector/component/componenttest v0.115.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 go.opentelemetry.io/collector/pipeline v0.115.0 @@ -51,6 +52,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect diff --git a/receiver/namedpipereceiver/namedpipe_test.go b/receiver/namedpipereceiver/namedpipe_test.go index c7a4f25bea80..a13bc83e28c6 100644 --- a/receiver/namedpipereceiver/namedpipe_test.go +++ b/receiver/namedpipereceiver/namedpipe_test.go @@ -55,10 +55,6 @@ func TestReadPipe(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))