Skip to content

Commit

Permalink
[service] Fix memory leaks and enable goleak check in tests (#9241)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This change adds `goleak` to check for memory leaks. Originally there
were 3 failing tests in the `service` package, so I'll describe changes
in relation to resolving each test's failing goleak check.

1. `TestServiceTelemetryRestart`: Simplest fix, close the response body
to make sure goroutines aren't leaked by reopening a server on the same
port. This was just a test issue
2. `TestTelemetryInit.UseOTelWithSDKConfiguration`: The [meter
provider](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/telemetry.go#L57-L58)
was being started in the initialization process ([metrics
reference](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/internal/proctelemetry/config.go#L135)),
but never shutdown. The type originally being used
(`meter.MetricProvider`) was the base interface which didn't provide a
`Shutdown` method. I changed this to use the `sdk` interfaces that
provide the required `Shutdown` method. The actual functionality of
starting the providers was already using and returning the `sdk`
interface, so the actual underlying type remains the same. Since `mp` is
a private member and `sdkmetric` and implement the original type, I
don't believe changing the type is a breaking change.
3. `TestServiceTelemetryCleanupOnError`: This test starts a server using
a sub-goroutine, cancels it, starts again in a subroutine, and cancels
again in the main goroutine. This test showed the racing behavior of the
subroutine running
[`server.ListenAndServe`](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/internal/proctelemetry/config.go#L148)
and the main goroutine's functionality of [calling
close](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/telemetry.go#L219)
and then starting the server again [right
away](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/service_test.go#L244).
The solution here is to add a `sync.WaitGroup` variable that can
properly block until all servers are closed before returning from
`shutdown`. This will allow us to ensure it's safe to proceed knowing
the ports are free, and server is fully closed.

The first test fix was just a test issue, but 2 and 3 were real bugs. I
realize it's a bit hard to read with them all together, but I assumed
adding PR dependency notes would be more complicated.

**Link to tracking Issue:** <Issue number if applicable>
#9165

**Testing:** <Describe what testing was performed and which tests were
added.>
All tests are passing as well as goleak check.

---------

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
crobert-1 and mx-psi authored Aug 13, 2024
1 parent aacddac commit c1c8fcd
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 26 deletions.
25 changes: 25 additions & 0 deletions .chloggen/goleak_service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leaks during service package shutdown

# One or more tracking issues or pull requests related to the change
issues: [9165]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion service/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 15 additions & 8 deletions service/internal/proctelemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -63,9 +64,9 @@ var (
errNoValidMetricExporter = errors.New("no valid metric exporter")
)

func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) {
if reader.Pull != nil {
return initPullExporter(reader.Pull.Exporter, asyncErrorChannel)
return initPullExporter(reader.Pull.Exporter, asyncErrorChannel, serverWG)
}
if reader.Periodic != nil {
var opts []sdkmetric.PeriodicReaderOption
Expand Down Expand Up @@ -93,17 +94,23 @@ func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disab
), nil
}

func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error) *http.Server {
func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error, serverWG *sync.WaitGroup) *http.Server {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
server := &http.Server{
Addr: address,
Handler: mux,
ReadHeaderTimeout: defaultReadHeaderTimeout,
}

serverWG.Add(1)
go func() {
defer serverWG.Done()
if serveErr := server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) {
asyncErrorChannel <- serveErr
select {
case asyncErrorChannel <- serveErr:
case <-time.After(1 * time.Second):
}
}
}()
return server
Expand Down Expand Up @@ -152,7 +159,7 @@ func cardinalityFilter(kvs ...attribute.KeyValue) attribute.Filter {
}
}

func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) {
promRegistry := prometheus.NewRegistry()
if prometheusConfig.Host == nil {
return nil, nil, fmt.Errorf("host must be specified")
Expand All @@ -176,12 +183,12 @@ func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChann
return nil, nil, fmt.Errorf("error creating otel prometheus exporter: %w", err)
}

return exporter, InitPrometheusServer(promRegistry, net.JoinHostPort(*prometheusConfig.Host, fmt.Sprintf("%d", *prometheusConfig.Port)), asyncErrorChannel), nil
return exporter, InitPrometheusServer(promRegistry, net.JoinHostPort(*prometheusConfig.Host, fmt.Sprintf("%d", *prometheusConfig.Port)), asyncErrorChannel, serverWG), nil
}

func initPullExporter(exporter config.MetricExporter, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
func initPullExporter(exporter config.MetricExporter, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) {
if exporter.Prometheus != nil {
return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel)
return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel, serverWG)
}
return nil, nil, errNoValidMetricExporter
}
Expand Down
3 changes: 2 additions & 1 deletion service/internal/proctelemetry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"net/url"
"reflect"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -529,7 +530,7 @@ func TestMetricReader(t *testing.T) {
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error))
gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error), &sync.WaitGroup{})

defer func() {
if gotReader != nil {
Expand Down
6 changes: 0 additions & 6 deletions service/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ status:
development: [traces, metrics, logs]
distributions: [core, contrib]

tests:
goleak:
ignore:
top:
- "go.opentelemetry.io/collector/service/internal/proctelemetry.InitPrometheusServer.func1"

telemetry:
metrics:
process_uptime:
Expand Down
20 changes: 13 additions & 7 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ func TestServiceTelemetryRestart(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, resp.Body.Close())
assert.Equal(t, http.StatusOK, resp.StatusCode)
// Response body must be closed now instead of defer as the test
// restarts the server on the same port. Leaving response open
// leaks a goroutine.
resp.Body.Close()

// Shutdown the service
require.NoError(t, srvOne.Shutdown(context.Background()))
Expand All @@ -362,6 +366,7 @@ func TestServiceTelemetryRestart(t *testing.T) {
100*time.Millisecond,
"Must get a valid response from the service",
)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)

// Shutdown the new service
Expand Down Expand Up @@ -536,13 +541,14 @@ func assertZPages(t *testing.T, zpagesAddr string) {

func newNopSettings() Settings {
return Settings{
BuildInfo: component.NewDefaultBuildInfo(),
CollectorConf: confmap.New(),
Receivers: receivertest.NewNopBuilder(),
Processors: processortest.NewNopBuilder(),
Exporters: exportertest.NewNopBuilder(),
Connectors: connectortest.NewNopBuilder(),
Extensions: extensiontest.NewNopBuilder(),
BuildInfo: component.NewDefaultBuildInfo(),
CollectorConf: confmap.New(),
Receivers: receivertest.NewNopBuilder(),
Processors: processortest.NewNopBuilder(),
Exporters: exportertest.NewNopBuilder(),
Connectors: connectortest.NewNopBuilder(),
Extensions: extensiontest.NewNopBuilder(),
AsyncErrorChannel: make(chan error),
}
}

Expand Down
11 changes: 8 additions & 3 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"net/http"
"strconv"
"sync"

"go.opentelemetry.io/contrib/config"
"go.opentelemetry.io/otel/metric"
Expand All @@ -29,7 +30,8 @@ const (

type meterProvider struct {
*sdkmetric.MeterProvider
servers []*http.Server
servers []*http.Server
serverWG sync.WaitGroup
}

type meterProviderSettings struct {
Expand Down Expand Up @@ -71,7 +73,7 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
var opts []sdkmetric.Option
for _, reader := range set.cfg.Readers {
// https://github.com/open-telemetry/opentelemetry-collector/issues/8045
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel)
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel, &mp.serverWG)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -110,5 +112,8 @@ func (mp *meterProvider) Shutdown(ctx context.Context) error {
errs = multierr.Append(errs, server.Close())
}
}
return multierr.Append(errs, mp.MeterProvider.Shutdown(ctx))
errs = multierr.Append(errs, mp.MeterProvider.Shutdown(ctx))
mp.serverWG.Wait()

return errs
}

0 comments on commit c1c8fcd

Please sign in to comment.