diff --git a/.chloggen/goleak_service.yaml b/.chloggen/goleak_service.yaml new file mode 100755 index 00000000000..b16d72fa43f --- /dev/null +++ b/.chloggen/goleak_service.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leaks during service package shutdown + +# One or more tracking issues or pull requests related to the change +issues: [9165] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/service/generated_package_test.go b/service/generated_package_test.go index bc7ce3cd666..a218b3677e2 100644 --- a/service/generated_package_test.go +++ b/service/generated_package_test.go @@ -9,5 +9,5 @@ import ( ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opentelemetry.io/collector/service/internal/proctelemetry.InitPrometheusServer.func1")) + goleak.VerifyTestMain(m) } diff --git a/service/internal/proctelemetry/config.go b/service/internal/proctelemetry/config.go index 1bb86bb65b2..f36f9eb15c9 100644 --- a/service/internal/proctelemetry/config.go +++ b/service/internal/proctelemetry/config.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "strings" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -63,9 +64,9 @@ var ( errNoValidMetricExporter = errors.New("no valid metric exporter") ) -func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { +func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) { if reader.Pull != nil { - return initPullExporter(reader.Pull.Exporter, asyncErrorChannel) + return initPullExporter(reader.Pull.Exporter, asyncErrorChannel, serverWG) } if reader.Periodic != nil { var opts []sdkmetric.PeriodicReaderOption @@ -93,7 +94,7 @@ func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disab ), nil } -func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error) *http.Server { +func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error, serverWG *sync.WaitGroup) *http.Server { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) server := &http.Server{ @@ -101,9 +102,15 @@ func InitPrometheusServer(registry *prometheus.Registry, address string, asyncEr Handler: mux, ReadHeaderTimeout: defaultReadHeaderTimeout, } + + serverWG.Add(1) go func() { + defer serverWG.Done() if serveErr := server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) { - asyncErrorChannel <- serveErr + select { + case asyncErrorChannel <- serveErr: + case <-time.After(1 * time.Second): + } } }() return server @@ -152,7 +159,7 @@ func cardinalityFilter(kvs ...attribute.KeyValue) attribute.Filter { } } -func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { +func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) { promRegistry := prometheus.NewRegistry() if prometheusConfig.Host == nil { return nil, nil, fmt.Errorf("host must be specified") @@ -176,12 +183,12 @@ func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChann return nil, nil, fmt.Errorf("error creating otel prometheus exporter: %w", err) } - return exporter, InitPrometheusServer(promRegistry, net.JoinHostPort(*prometheusConfig.Host, fmt.Sprintf("%d", *prometheusConfig.Port)), asyncErrorChannel), nil + return exporter, InitPrometheusServer(promRegistry, net.JoinHostPort(*prometheusConfig.Host, fmt.Sprintf("%d", *prometheusConfig.Port)), asyncErrorChannel, serverWG), nil } -func initPullExporter(exporter config.MetricExporter, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { +func initPullExporter(exporter config.MetricExporter, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) { if exporter.Prometheus != nil { - return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel) + return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel, serverWG) } return nil, nil, errNoValidMetricExporter } diff --git a/service/internal/proctelemetry/config_test.go b/service/internal/proctelemetry/config_test.go index d0560ac9c8c..efb081be0d4 100644 --- a/service/internal/proctelemetry/config_test.go +++ b/service/internal/proctelemetry/config_test.go @@ -8,6 +8,7 @@ import ( "errors" "net/url" "reflect" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -529,7 +530,7 @@ func TestMetricReader(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error)) + gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error), &sync.WaitGroup{}) defer func() { if gotReader != nil { diff --git a/service/metadata.yaml b/service/metadata.yaml index 886f71549e0..2119d463699 100644 --- a/service/metadata.yaml +++ b/service/metadata.yaml @@ -6,12 +6,6 @@ status: development: [traces, metrics, logs] distributions: [core, contrib] -tests: - goleak: - ignore: - top: - - "go.opentelemetry.io/collector/service/internal/proctelemetry.InitPrometheusServer.func1" - telemetry: metrics: process_uptime: diff --git a/service/service_test.go b/service/service_test.go index b0a952724be..200b41899be 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -340,6 +340,10 @@ func TestServiceTelemetryRestart(t *testing.T) { assert.NoError(t, err) assert.NoError(t, resp.Body.Close()) assert.Equal(t, http.StatusOK, resp.StatusCode) + // Response body must be closed now instead of defer as the test + // restarts the server on the same port. Leaving response open + // leaks a goroutine. + resp.Body.Close() // Shutdown the service require.NoError(t, srvOne.Shutdown(context.Background())) @@ -362,6 +366,7 @@ func TestServiceTelemetryRestart(t *testing.T) { 100*time.Millisecond, "Must get a valid response from the service", ) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) // Shutdown the new service @@ -536,13 +541,14 @@ func assertZPages(t *testing.T, zpagesAddr string) { func newNopSettings() Settings { return Settings{ - BuildInfo: component.NewDefaultBuildInfo(), - CollectorConf: confmap.New(), - Receivers: receivertest.NewNopBuilder(), - Processors: processortest.NewNopBuilder(), - Exporters: exportertest.NewNopBuilder(), - Connectors: connectortest.NewNopBuilder(), - Extensions: extensiontest.NewNopBuilder(), + BuildInfo: component.NewDefaultBuildInfo(), + CollectorConf: confmap.New(), + Receivers: receivertest.NewNopBuilder(), + Processors: processortest.NewNopBuilder(), + Exporters: exportertest.NewNopBuilder(), + Connectors: connectortest.NewNopBuilder(), + Extensions: extensiontest.NewNopBuilder(), + AsyncErrorChannel: make(chan error), } } diff --git a/service/telemetry.go b/service/telemetry.go index a44aaa4e4e4..7d3427e77bf 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "strconv" + "sync" "go.opentelemetry.io/contrib/config" "go.opentelemetry.io/otel/metric" @@ -29,7 +30,8 @@ const ( type meterProvider struct { *sdkmetric.MeterProvider - servers []*http.Server + servers []*http.Server + serverWG sync.WaitGroup } type meterProviderSettings struct { @@ -71,7 +73,7 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m var opts []sdkmetric.Option for _, reader := range set.cfg.Readers { // https://github.com/open-telemetry/opentelemetry-collector/issues/8045 - r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel) + r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel, &mp.serverWG) if err != nil { return nil, err } @@ -110,5 +112,8 @@ func (mp *meterProvider) Shutdown(ctx context.Context) error { errs = multierr.Append(errs, server.Close()) } } - return multierr.Append(errs, mp.MeterProvider.Shutdown(ctx)) + errs = multierr.Append(errs, mp.MeterProvider.Shutdown(ctx)) + mp.serverWG.Wait() + + return errs }