From e834b938eef9623ea2f00838892227bbd006abc2 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 26 Jan 2024 11:27:42 -0800 Subject: [PATCH 1/5] [exporter/azuremonitor] Fix leaking goroutines --- exporter/azuremonitorexporter/channels.go | 7 ++- exporter/azuremonitorexporter/factory_test.go | 1 + exporter/azuremonitorexporter/go.mod | 1 + exporter/azuremonitorexporter/go.sum | 1 + .../mock_transportChannel.go | 44 ++++++++++++++++++- .../azuremonitorexporter/traceexporter.go | 17 ++++++- .../traceexporter_test.go | 1 + 7 files changed, 69 insertions(+), 3 deletions(-) diff --git a/exporter/azuremonitorexporter/channels.go b/exporter/azuremonitorexporter/channels.go index 9c50d03c48ee..c3a3e304771a 100644 --- a/exporter/azuremonitorexporter/channels.go +++ b/exporter/azuremonitorexporter/channels.go @@ -3,8 +3,13 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter" -import "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts" +import ( + "time" + + "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts" +) type transportChannel interface { Send(*contracts.Envelope) + Close(retryTimeout ...time.Duration) <-chan struct{} } diff --git a/exporter/azuremonitorexporter/factory_test.go b/exporter/azuremonitorexporter/factory_test.go index 9572b67168f9..02aebeea599f 100644 --- a/exporter/azuremonitorexporter/factory_test.go +++ b/exporter/azuremonitorexporter/factory_test.go @@ -38,6 +38,7 @@ func TestCreateTracesExporterUsingDefaultTransportChannel(t *testing.T) { assert.NotNil(t, exporter) assert.NoError(t, err) assert.NotNil(t, f.tChannel) + assert.NoError(t, exporter.Shutdown(ctx)) } func TestCreateTracesExporterUsingBadConfig(t *testing.T) { diff --git a/exporter/azuremonitorexporter/go.mod b/exporter/azuremonitorexporter/go.mod index 9a1c55ff1cda..2412f9b7c89a 100644 --- a/exporter/azuremonitorexporter/go.mod +++ b/exporter/azuremonitorexporter/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/collector/semconv v0.93.0 go.opentelemetry.io/otel/metric v1.22.0 go.opentelemetry.io/otel/trace v1.22.0 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.26.0 golang.org/x/net v0.20.0 ) diff --git a/exporter/azuremonitorexporter/go.sum b/exporter/azuremonitorexporter/go.sum index 43b00f15dcf2..e0737f2c3926 100644 --- a/exporter/azuremonitorexporter/go.sum +++ b/exporter/azuremonitorexporter/go.sum @@ -315,6 +315,7 @@ go.opentelemetry.io/otel/sdk/metric v1.22.0/go.mod h1:KjQGeMIDlBNEOo6HvjhxIec1p/ go.opentelemetry.io/otel/trace v1.22.0 h1:Hg6pPujv0XG9QaVbGOBVHunyuLcCC3jN7WEhPx83XD0= go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40a21sPw2He1xo= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= diff --git a/exporter/azuremonitorexporter/mock_transportChannel.go b/exporter/azuremonitorexporter/mock_transportChannel.go index 269eb4ac65f6..5d98fba84ddf 100644 --- a/exporter/azuremonitorexporter/mock_transportChannel.go +++ b/exporter/azuremonitorexporter/mock_transportChannel.go @@ -1,13 +1,15 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package azuremonitorexporter import ( contracts "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts" mock "github.com/stretchr/testify/mock" + + time "time" ) // mockTransportChannel is an autogenerated mock type for the transportChannel type @@ -15,7 +17,47 @@ type mockTransportChannel struct { mock.Mock } +// Close provides a mock function with given fields: retryTimeout +func (_m *mockTransportChannel) Close(retryTimeout ...time.Duration) <-chan struct{} { + _va := make([]interface{}, len(retryTimeout)) + for _i := range retryTimeout { + _va[_i] = retryTimeout[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func(...time.Duration) <-chan struct{}); ok { + r0 = rf(retryTimeout...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + // Send provides a mock function with given fields: _a0 func (_m *mockTransportChannel) Send(_a0 *contracts.Envelope) { _m.Called(_a0) } + +// newTransportChannel creates a new instance of mockTransportChannel. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newTransportChannel(t interface { + mock.TestingT + Cleanup(func()) +}) *mockTransportChannel { + mock := &mockTransportChannel{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/exporter/azuremonitorexporter/traceexporter.go b/exporter/azuremonitorexporter/traceexporter.go index f5f5e19c6126..0f78cbc2abc2 100644 --- a/exporter/azuremonitorexporter/traceexporter.go +++ b/exporter/azuremonitorexporter/traceexporter.go @@ -5,6 +5,9 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry- import ( "context" + "errors" + "fmt" + "time" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -62,6 +65,17 @@ func (exporter *traceExporter) onTraceData(_ context.Context, traceData ptrace.T return visitor.err } +func (exporter *traceExporter) Shutdown(ctx context.Context) error { + shutdownTimeout := 30 * time.Second + + select { + case <-exporter.transportChannel.Close(): + return nil + case <-time.After(shutdownTimeout): + return errors.New(fmt.Sprintf("Shutting down timed out after %v", shutdownTimeout)) + } +} + // Returns a new instance of the trace exporter func newTracesExporter(config *Config, transportChannel transportChannel, set exporter.CreateSettings) (exporter.Traces, error) { exporter := &traceExporter{ @@ -75,5 +89,6 @@ func newTracesExporter(config *Config, transportChannel transportChannel, set ex set, config, exporter.onTraceData, - exporterhelper.WithQueue(config.QueueSettings)) + exporterhelper.WithQueue(config.QueueSettings), + exporterhelper.WithShutdown(exporter.Shutdown)) } diff --git a/exporter/azuremonitorexporter/traceexporter_test.go b/exporter/azuremonitorexporter/traceexporter_test.go index aa0ae5aedab2..9b654853c4f7 100644 --- a/exporter/azuremonitorexporter/traceexporter_test.go +++ b/exporter/azuremonitorexporter/traceexporter_test.go @@ -118,6 +118,7 @@ func TestExporterTraceDataCallbackSingleSpanNoEnvelope(t *testing.T) { func getMockTransportChannel() *mockTransportChannel { transportChannelMock := mockTransportChannel{} transportChannelMock.On("Send", mock.Anything) + transportChannelMock.On("Close", mock.Anything) return &transportChannelMock } From 0125f28bdeff74dd9745be9097cbe75c5f7bca49 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 26 Jan 2024 11:48:02 -0800 Subject: [PATCH 2/5] Add shutdown for metrics and logs, new tests --- exporter/azuremonitorexporter/factory_test.go | 28 +++++++++++++++++++ exporter/azuremonitorexporter/logexporter.go | 17 +++++++++++ .../azuremonitorexporter/metricexporter.go | 19 ++++++++++++- .../azuremonitorexporter/traceexporter.go | 12 ++++---- 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/exporter/azuremonitorexporter/factory_test.go b/exporter/azuremonitorexporter/factory_test.go index 02aebeea599f..c4c98059703b 100644 --- a/exporter/azuremonitorexporter/factory_test.go +++ b/exporter/azuremonitorexporter/factory_test.go @@ -54,3 +54,31 @@ func TestCreateTracesExporterUsingBadConfig(t *testing.T) { assert.Nil(t, exporter) assert.Error(t, err) } + +func TestCreateLogsExporterUsingDefaultTransportChannel(t *testing.T) { + // We get the default transport channel creation, if we don't specify one during f creation + f := factory{} + assert.Nil(t, f.tChannel) + ctx := context.Background() + config := createDefaultConfig().(*Config) + config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/" + exporter, err := f.createLogsExporter(ctx, exportertest.NewNopCreateSettings(), config) + assert.NotNil(t, exporter) + assert.NoError(t, err) + assert.NotNil(t, f.tChannel) + assert.NoError(t, exporter.Shutdown(ctx)) +} + +func TestCreateMetricsExporterUsingDefaultTransportChannel(t *testing.T) { + // We get the default transport channel creation, if we don't specify one during f creation + f := factory{} + assert.Nil(t, f.tChannel) + ctx := context.Background() + config := createDefaultConfig().(*Config) + config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/" + exporter, err := f.createMetricsExporter(ctx, exportertest.NewNopCreateSettings(), config) + assert.NotNil(t, exporter) + assert.NoError(t, err) + assert.NotNil(t, f.tChannel) + assert.NoError(t, exporter.Shutdown(ctx)) +} diff --git a/exporter/azuremonitorexporter/logexporter.go b/exporter/azuremonitorexporter/logexporter.go index 901c09d595f4..b5de35e8c1d4 100644 --- a/exporter/azuremonitorexporter/logexporter.go +++ b/exporter/azuremonitorexporter/logexporter.go @@ -5,6 +5,7 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry- import ( "context" + "time" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -39,6 +40,21 @@ func (exporter *logExporter) onLogData(_ context.Context, logData plog.Logs) err return nil } +func (exporter *logExporter) Shutdown(_ context.Context) error { + shutdownTimeout := 1 * time.Second + + select { + case <-exporter.transportChannel.Close(): + return nil + case <-time.After(shutdownTimeout): + // Currently, due to a dependency's bug (https://github.com/microsoft/ApplicationInsights-Go/issues/70), + // the timeout will always be hit before Close is complete. This is not an error, but it will leak a goroutine. + // There's nothing that we can do about this for now, so there's no reason to log or return an error + // here. + return nil + } +} + // Returns a new instance of the log exporter func newLogsExporter(config *Config, transportChannel transportChannel, set exporter.CreateSettings) (exporter.Logs, error) { exporter := &logExporter{ @@ -53,5 +69,6 @@ func newLogsExporter(config *Config, transportChannel transportChannel, set expo config, exporter.onLogData, exporterhelper.WithQueue(config.QueueSettings), + exporterhelper.WithShutdown(exporter.Shutdown), ) } diff --git a/exporter/azuremonitorexporter/metricexporter.go b/exporter/azuremonitorexporter/metricexporter.go index 94d9148ceb2c..bb0dba118f42 100644 --- a/exporter/azuremonitorexporter/metricexporter.go +++ b/exporter/azuremonitorexporter/metricexporter.go @@ -5,6 +5,7 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry- import ( "context" + "time" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -40,6 +41,21 @@ func (exporter *metricExporter) onMetricData(_ context.Context, metricData pmetr return nil } +func (exporter *metricExporter) Shutdown(_ context.Context) error { + shutdownTimeout := 1 * time.Second + + select { + case <-exporter.transportChannel.Close(): + return nil + case <-time.After(shutdownTimeout): + // Currently, due to a dependency's bug (https://github.com/microsoft/ApplicationInsights-Go/issues/70), + // the timeout will always be hit before Close is complete. This is not an error, but it will leak a goroutine. + // There's nothing that we can do about this for now, so there's no reason to log or return an error + // here. + return nil + } +} + // Returns a new instance of the metric exporter func newMetricsExporter(config *Config, transportChannel transportChannel, set exporter.CreateSettings) (exporter.Metrics, error) { exporter := &metricExporter{ @@ -54,5 +70,6 @@ func newMetricsExporter(config *Config, transportChannel transportChannel, set e set, config, exporter.onMetricData, - exporterhelper.WithQueue(config.QueueSettings)) + exporterhelper.WithQueue(config.QueueSettings), + exporterhelper.WithShutdown(exporter.Shutdown)) } diff --git a/exporter/azuremonitorexporter/traceexporter.go b/exporter/azuremonitorexporter/traceexporter.go index 0f78cbc2abc2..4e1c7a8c2aa0 100644 --- a/exporter/azuremonitorexporter/traceexporter.go +++ b/exporter/azuremonitorexporter/traceexporter.go @@ -5,8 +5,6 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry- import ( "context" - "errors" - "fmt" "time" "go.opentelemetry.io/collector/consumer/consumererror" @@ -65,14 +63,18 @@ func (exporter *traceExporter) onTraceData(_ context.Context, traceData ptrace.T return visitor.err } -func (exporter *traceExporter) Shutdown(ctx context.Context) error { - shutdownTimeout := 30 * time.Second +func (exporter *traceExporter) Shutdown(_ context.Context) error { + shutdownTimeout := 1 * time.Second select { case <-exporter.transportChannel.Close(): return nil case <-time.After(shutdownTimeout): - return errors.New(fmt.Sprintf("Shutting down timed out after %v", shutdownTimeout)) + // Currently, due to a dependency's bug (https://github.com/microsoft/ApplicationInsights-Go/issues/70), + // the timeout will always be hit before Close is complete. This is not an error, but it will leak a goroutine. + // There's nothing that we can do about this for now, so there's no reason to log or return an error + // here. + return nil } } From c14cf8201587e384adffdf86f5ff4408f0a56779 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 26 Jan 2024 11:49:56 -0800 Subject: [PATCH 3/5] Add more tests --- exporter/azuremonitorexporter/factory_test.go | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/exporter/azuremonitorexporter/factory_test.go b/exporter/azuremonitorexporter/factory_test.go index c4c98059703b..fb7f926d7fec 100644 --- a/exporter/azuremonitorexporter/factory_test.go +++ b/exporter/azuremonitorexporter/factory_test.go @@ -55,6 +55,18 @@ func TestCreateTracesExporterUsingBadConfig(t *testing.T) { assert.Error(t, err) } +func TestCreateLogsExporterUsingSpecificTransportChannel(t *testing.T) { + // mock transport channel creation + f := factory{tChannel: &mockTransportChannel{}} + ctx := context.Background() + params := exportertest.NewNopCreateSettings() + config := createDefaultConfig().(*Config) + config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/" + exporter, err := f.createLogsExporter(ctx, params, config) + assert.NotNil(t, exporter) + assert.NoError(t, err) +} + func TestCreateLogsExporterUsingDefaultTransportChannel(t *testing.T) { // We get the default transport channel creation, if we don't specify one during f creation f := factory{} @@ -69,6 +81,32 @@ func TestCreateLogsExporterUsingDefaultTransportChannel(t *testing.T) { assert.NoError(t, exporter.Shutdown(ctx)) } +func TestCreateLogsExporterUsingBadConfig(t *testing.T) { + // We get the default transport channel creation, if we don't specify one during factory creation + f := factory{} + assert.Nil(t, f.tChannel) + ctx := context.Background() + params := exportertest.NewNopCreateSettings() + + badConfig := &badConfig{} + + exporter, err := f.createLogsExporter(ctx, params, badConfig) + assert.Nil(t, exporter) + assert.Error(t, err) +} + +func TestCreateMetricsExporterUsingSpecificTransportChannel(t *testing.T) { + // mock transport channel creation + f := factory{tChannel: &mockTransportChannel{}} + ctx := context.Background() + params := exportertest.NewNopCreateSettings() + config := createDefaultConfig().(*Config) + config.ConnectionString = "InstrumentationKey=test-key;IngestionEndpoint=https://test-endpoint/" + exporter, err := f.createMetricsExporter(ctx, params, config) + assert.NotNil(t, exporter) + assert.NoError(t, err) +} + func TestCreateMetricsExporterUsingDefaultTransportChannel(t *testing.T) { // We get the default transport channel creation, if we don't specify one during f creation f := factory{} @@ -82,3 +120,17 @@ func TestCreateMetricsExporterUsingDefaultTransportChannel(t *testing.T) { assert.NotNil(t, f.tChannel) assert.NoError(t, exporter.Shutdown(ctx)) } + +func TestCreateMetricsExporterUsingBadConfig(t *testing.T) { + // We get the default transport channel creation, if we don't specify one during factory creation + f := factory{} + assert.Nil(t, f.tChannel) + ctx := context.Background() + params := exportertest.NewNopCreateSettings() + + badConfig := &badConfig{} + + exporter, err := f.createMetricsExporter(ctx, params, badConfig) + assert.Nil(t, exporter) + assert.Error(t, err) +} From 7e2d00dde5e9d8281208a01371abcd02dd9528f6 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 26 Jan 2024 11:57:52 -0800 Subject: [PATCH 4/5] Add goleak test --- exporter/azuremonitorexporter/package_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 exporter/azuremonitorexporter/package_test.go diff --git a/exporter/azuremonitorexporter/package_test.go b/exporter/azuremonitorexporter/package_test.go new file mode 100644 index 000000000000..8f923f9d936c --- /dev/null +++ b/exporter/azuremonitorexporter/package_test.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package azuremonitorexporter + +import ( + "testing" + + "go.uber.org/goleak" +) + +// Context for ignore opencensus leak: https://github.com/census-instrumentation/opencensus-go/issues/1191 +// Context for ignore ApplicationInsights-Go leak: https://github.com/microsoft/ApplicationInsights-Go/issues/70 +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("github.com/microsoft/ApplicationInsights-Go/appinsights.(*throttleManager).Stop"), + ) +} From 22c7a710260cfde120cdf4fc3f8c4b4979ed3496 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Fri, 26 Jan 2024 12:01:54 -0800 Subject: [PATCH 5/5] Add changelog --- .chloggen/goleak_azuremonitor.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100755 .chloggen/goleak_azuremonitor.yaml diff --git a/.chloggen/goleak_azuremonitor.yaml b/.chloggen/goleak_azuremonitor.yaml new file mode 100755 index 000000000000..1623db9f008c --- /dev/null +++ b/.chloggen/goleak_azuremonitor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/azuremonitorexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leaks occurring on component shutdown + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30438] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: []