Skip to content

Commit

Permalink
[exporter/signalfx] Fix goroutine leaks (open-telemetry#32781)
Browse files Browse the repository at this point in the history
**Description:** 
This change is a refactor to allow us to enable `goleak` checks in each
of the exporter's sub-directories, as well as the signalfx receiver
(which uses the signalfxexporter in a test). The main idea is to use
`start`/`shutdown` for each package's functionality, rather than relying
on the top level context cancel of the exporter. The addition of start
and shutdowns for each package means we can more closely control the
lifecycle of each as needed, and enable `goleak` for each package.

The memory leaks being fixed are:
1. Call the `TTLMap.Shutdown` so the ticker is properly stopped:
`t.prevPts.Shutdown()`.
2. Don't block waiting for a request if the context is cancelled.

**Link to tracking issue:**
open-telemetry#30438

---------

Co-authored-by: Sean Marciniak <[email protected]>
  • Loading branch information
2 people authored and f7o committed Sep 12, 2024
1 parent 393eccf commit 0711cb7
Show file tree
Hide file tree
Showing 18 changed files with 180 additions and 52 deletions.
27 changes: 27 additions & 0 deletions .chloggen/goleak_signalfxexp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: signalfxexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak by re-organizing the exporter's functionality lifecycle

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32781]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
15 changes: 9 additions & 6 deletions exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type signalfxExporter struct {
hostMetadataSyncer *hostmetadata.Syncer
converter *translation.MetricsConverter
dimClient *dimensions.DimensionClient
cancelFn func()
}

// newSignalFxExporter returns a new SignalFx exporter.
Expand Down Expand Up @@ -100,6 +99,9 @@ func newSignalFxExporter(
}

func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err error) {
if se.converter != nil {
se.converter.Start()
}
ingestURL, err := se.config.getIngestURL()
if err != nil {
return err
Expand Down Expand Up @@ -129,16 +131,13 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err
if err != nil {
return fmt.Errorf("could not load API TLS config: %w", err)
}
cancellable, cancelFn := context.WithCancel(ctx)
se.cancelFn = cancelFn

apiURL, err := se.config.getAPIURL()
if err != nil {
return err
}

dimClient := dimensions.NewDimensionClient(
cancellable,
dimensions.DimensionClientOptions{
Token: se.config.AccessToken,
APIURL: apiURL,
Expand Down Expand Up @@ -235,8 +234,12 @@ func (se *signalfxExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
}

func (se *signalfxExporter) shutdown(_ context.Context) error {
if se.cancelFn != nil {
se.cancelFn()
if se.dimClient != nil {
se.dimClient.Shutdown()
}

if se.converter != nil {
se.converter.Shutdown()
}
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions exporter/signalfxexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,6 @@ func TestConsumeMetadata(t *testing.T) {
logger := zap.NewNop()

dimClient := dimensions.NewDimensionClient(
context.Background(),
dimensions.DimensionClientOptions{
Token: "foo",
APIURL: serverURL,
Expand Down Expand Up @@ -1203,6 +1202,10 @@ func TestConsumeMetadata(t *testing.T) {
case <-c:
// wait 500ms longer than send delay
case <-time.After(tt.sendDelay + 500*time.Millisecond):
// If no updates are supposed to be sent, the server doesn't update dimensions, and
// doesn't call Done. This is correct behavior, so the test needs to account for it here,
// or a goroutine will be leaked.
defer wg.Done()
require.True(t, tt.shouldNotSendUpdate, "timeout waiting for response")
}

Expand Down Expand Up @@ -1331,6 +1334,7 @@ func TestTLSExporterInit(t *testing.T) {
sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopSettings())
assert.NoError(t, err)
err = sfx.start(context.Background(), componenttest.NewNopHost())
defer func() { require.NoError(t, sfx.shutdown(context.Background())) }()
if tt.wantErr {
require.Error(t, err)
if tt.wantErrMessage != "" {
Expand Down Expand Up @@ -1402,6 +1406,7 @@ func TestTLSIngestConnection(t *testing.T) {
assert.NoError(t, err)
err = sfx.start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
defer func() { assert.NoError(t, sfx.shutdown(context.Background())) }()

_, err = sfx.pushMetricsData(context.Background(), metricsPayload)
if tt.wantErr {
Expand Down Expand Up @@ -1526,10 +1531,7 @@ func TestTLSAPIConnection(t *testing.T) {
require.NoError(t, err)
serverURL, err := url.Parse(tt.config.APIURL)
assert.NoError(t, err)
cancellable, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
dimClient := dimensions.NewDimensionClient(
cancellable,
dimensions.DimensionClientOptions{
Token: "",
APIURL: serverURL,
Expand All @@ -1541,6 +1543,7 @@ func TestTLSAPIConnection(t *testing.T) {
APITLSConfig: apiTLSCfg,
})
dimClient.Start()
defer func() { dimClient.Shutdown() }()

se := &signalfxExporter{
dimClient: dimClient,
Expand Down
6 changes: 3 additions & 3 deletions exporter/signalfxexporter/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 23 additions & 13 deletions exporter/signalfxexporter/internal/dimensions/dimclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// updates are currently not done by this port.
type DimensionClient struct {
sync.RWMutex
ctx context.Context
cancel context.CancelFunc
Token configopaque.String
APIURL *url.URL
client *http.Client
Expand Down Expand Up @@ -84,7 +84,7 @@ type DimensionClientOptions struct {
}

// NewDimensionClient returns a new client
func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *DimensionClient {
func NewDimensionClient(options DimensionClientOptions) *DimensionClient {
client := &http.Client{
Timeout: options.Timeout,
Transport: &http.Transport{
Expand All @@ -102,10 +102,9 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di
TLSClientConfig: options.APITLSConfig,
},
}
sender := NewReqSender(ctx, client, 20, map[string]string{"client": "dimension"})
sender := NewReqSender(client, 20, map[string]string{"client": "dimension"})

return &DimensionClient{
ctx: ctx,
Token: options.Token,
APIURL: options.APIURL,
sendDelay: options.SendDelay,
Expand All @@ -123,7 +122,18 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di

// Start the client's processing queue
func (dc *DimensionClient) Start() {
go dc.processQueue()
var ctx context.Context
// The dimension client is started during the exporter's startup functionality.
// The collector spec states that for long-running operations, components should
// use the background context, rather than the passed in context.
ctx, dc.cancel = context.WithCancel(context.Background())
go dc.processQueue(ctx)
}

func (dc *DimensionClient) Shutdown() {
if dc.cancel != nil {
dc.cancel()
}
}

// acceptDimension to be sent to the API. This will return fairly quickly and
Expand Down Expand Up @@ -185,10 +195,10 @@ func mergeTags(tagSets ...map[string]bool) map[string]bool {
return out
}

func (dc *DimensionClient) processQueue() {
func (dc *DimensionClient) processQueue(ctx context.Context) {
for {
select {
case <-dc.ctx.Done():
case <-ctx.Done():
return
case delayedDimUpdate := <-dc.delayedQueue:
now := dc.now()
Expand All @@ -201,7 +211,7 @@ func (dc *DimensionClient) processQueue() {
delete(dc.delayedSet, delayedDimUpdate.Key())
dc.Unlock()

if err := dc.handleDimensionUpdate(delayedDimUpdate.DimensionUpdate); err != nil {
if err := dc.handleDimensionUpdate(ctx, delayedDimUpdate.DimensionUpdate); err != nil {
dc.logger.Error(
"Could not send dimension update",
zap.Error(err),
Expand All @@ -213,13 +223,13 @@ func (dc *DimensionClient) processQueue() {
}

// handleDimensionUpdate will set custom properties on a specific dimension value.
func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) error {
func (dc *DimensionClient) handleDimensionUpdate(ctx context.Context, dimUpdate *DimensionUpdate) error {
var (
req *http.Request
err error
)

req, err = dc.makePatchRequest(dimUpdate)
req, err = dc.makePatchRequest(ctx, dimUpdate)

if err != nil {
return err
Expand Down Expand Up @@ -276,7 +286,7 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err
}
})))

dc.requestSender.Send(req)
dc.requestSender.Send(ctx, req)

return nil
}
Expand All @@ -290,7 +300,7 @@ func (dc *DimensionClient) makeDimURL(key, value string) (*url.URL, error) {
return url, nil
}

func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request, error) {
func (dc *DimensionClient) makePatchRequest(ctx context.Context, dim *DimensionUpdate) (*http.Request, error) {
var (
tagsToAdd []string
tagsToRemove []string
Expand Down Expand Up @@ -319,7 +329,7 @@ func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request
}

req, err := http.NewRequestWithContext(
context.Background(),
ctx,
"PATCH",
strings.TrimRight(url.String(), "/")+"/_/sfxagent",
bytes.NewReader(json))
Expand Down
18 changes: 11 additions & 7 deletions exporter/signalfxexporter/internal/dimensions/dimclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can
server.Close()
}()

client := NewDimensionClient(ctx, DimensionClientOptions{
APIURL: serverURL,
LogUpdates: true,
Logger: zap.NewNop(),
SendDelay: time.Second,
MaxBuffered: 10,
})
client := NewDimensionClient(
DimensionClientOptions{
APIURL: serverURL,
LogUpdates: true,
Logger: zap.NewNop(),
SendDelay: time.Second,
MaxBuffered: 10,
})
client.Start()

return client, dimCh, forcedResp, cancel
Expand All @@ -117,6 +118,7 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can
func TestDimensionClient(t *testing.T) {
client, dimCh, forcedResp, cancel := setup(t)
defer cancel()
defer client.Shutdown()

t.Run("send dimension update with properties and tags", func(t *testing.T) {
require.NoError(t, client.acceptDimension(&DimensionUpdate{
Expand Down Expand Up @@ -310,6 +312,7 @@ func TestDimensionClient(t *testing.T) {
func TestFlappyUpdates(t *testing.T) {
client, dimCh, _, cancel := setup(t)
defer cancel()
defer client.Shutdown()

// Do some flappy updates
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -348,6 +351,7 @@ func TestFlappyUpdates(t *testing.T) {
func TestInvalidUpdatesNotSent(t *testing.T) {
client, dimCh, _, cancel := setup(t)
defer cancel()
defer client.Shutdown()
require.NoError(t, client.acceptDimension(&DimensionUpdate{
Name: "host",
Value: "",
Expand Down
14 changes: 14 additions & 0 deletions exporter/signalfxexporter/internal/dimensions/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dimensions

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
22 changes: 13 additions & 9 deletions exporter/signalfxexporter/internal/dimensions/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,53 @@ type ReqSender struct {
client *http.Client
requests chan *http.Request
workerCount uint
ctx context.Context
additionalDimensions map[string]string
runningWorkers *atomic.Int64
}

func NewReqSender(ctx context.Context, client *http.Client,
func NewReqSender(client *http.Client,
workerCount uint, diagnosticDimensions map[string]string) *ReqSender {
return &ReqSender{
client: client,
additionalDimensions: diagnosticDimensions,
// Unbuffered so that it blocks clients
requests: make(chan *http.Request),
workerCount: workerCount,
ctx: ctx,
runningWorkers: &atomic.Int64{},
}
}

// Send sends the request. Not thread-safe.
func (rs *ReqSender) Send(req *http.Request) {
func (rs *ReqSender) Send(ctx context.Context, req *http.Request) {
// Slight optimization to avoid spinning up unnecessary workers if there
// aren't ever that many dim updates. Once workers start, they remain for the
// duration of the agent.
select {
case <-ctx.Done():
return
case rs.requests <- req:
return
default:
if rs.runningWorkers.Load() < int64(rs.workerCount) {
go rs.processRequests()
go rs.processRequests(ctx)
}

// Block until we can get through a request
rs.requests <- req
// Block until we can get through a request, unless context has been cancelled.
select {
case <-ctx.Done():
return
case rs.requests <- req:
}
}
}

func (rs *ReqSender) processRequests() {
func (rs *ReqSender) processRequests(ctx context.Context) {
rs.runningWorkers.Add(1)
defer rs.runningWorkers.Add(-1)

for {
select {
case <-rs.ctx.Done():
case <-ctx.Done():
return
case req := <-rs.requests:
if err := rs.sendRequest(req); err != nil {
Expand Down
Loading

0 comments on commit 0711cb7

Please sign in to comment.