Skip to content

Commit

Permalink
[chore][exporter/googlecloudpubsub] Fix goroutines leak (#36591)
Browse files Browse the repository at this point in the history
#### Description

Fixes goroutines leak by properly closing the underlying gRPC client
which is only when we're using an insecure custom endpoint.
Enables goleak in tests.

#### Link to tracking issue

Related to
#30438
  • Loading branch information
kevinnoel-be authored Dec 4, 2024
1 parent 4752d81 commit 1260faf
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 62 deletions.
8 changes: 8 additions & 0 deletions .chloggen/fix-goleak-gcppubsubexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
change_type: bug_fix
component: googlecloudpubsubexporter
note: Fix a goroutine leak during shutdown.
issues: [30438]
subtext: |
A goroutine leak was found in the googlecloudpubsubexporter.
The goroutine leak was caused by the exporter not closing the underlying created gRPC client when using an insecure custom endpoint.
change_logs: []
38 changes: 8 additions & 30 deletions exporter/googlecloudpubsubexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,20 @@ import (
"fmt"
"time"

pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const name = "googlecloudpubsub"

type pubsubExporter struct {
logger *zap.Logger
client *pubsub.PublisherClient
client publisherClient
cancel context.CancelFunc
userAgent string
ceSource string
Expand Down Expand Up @@ -71,8 +67,7 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error {
ctx, ex.cancel = context.WithCancel(ctx)

if ex.client == nil {
copts := ex.generateClientOptions()
client, err := pubsub.NewPublisherClient(ctx, copts...)
client, err := newPublisherClient(ctx, ex.config, ex.userAgent)
if err != nil {
return fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
}
Expand All @@ -82,31 +77,14 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error {
return nil
}

func (ex *pubsubExporter) shutdown(context.Context) error {
if ex.client != nil {
ex.client.Close()
ex.client = nil
func (ex *pubsubExporter) shutdown(_ context.Context) error {
if ex.client == nil {
return nil
}
return nil
}

func (ex *pubsubExporter) generateClientOptions() (copts []option.ClientOption) {
if ex.userAgent != "" {
copts = append(copts, option.WithUserAgent(ex.userAgent))
}
if ex.config.Endpoint != "" {
if ex.config.Insecure {
var dialOpts []grpc.DialOption
if ex.userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent))
}
conn, _ := grpc.NewClient(ex.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
copts = append(copts, option.WithGRPCConn(conn))
} else {
copts = append(copts, option.WithEndpoint(ex.config.Endpoint))
}
}
return copts
client := ex.client
ex.client = nil
return client.Close()
}

func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error {
Expand Down
27 changes: 0 additions & 27 deletions exporter/googlecloudpubsubexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,13 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/api/option"
)

func TestName(t *testing.T) {
exporter := &pubsubExporter{}
assert.Equal(t, "googlecloudpubsub", exporter.Name())
}

func TestGenerateClientOptions(t *testing.T) {
// Start a fake server running locally.
srv := pstest.NewServer()
defer srv.Close()
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
exporterConfig := cfg.(*Config)
exporterConfig.Endpoint = srv.Addr
exporterConfig.UserAgent = "test-user-agent"
exporterConfig.Insecure = true
exporterConfig.ProjectID = "my-project"
exporterConfig.Topic = "projects/my-project/topics/otlp"
exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
Timeout: 12 * time.Second,
}
exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)

options := exporter.generateClientOptions()
assert.Equal(t, option.WithUserAgent("test-user-agent"), options[0])

exporter.config.Insecure = false
options = exporter.generateClientOptions()
assert.Equal(t, option.WithUserAgent("test-user-agent"), options[0])
assert.Equal(t, option.WithEndpoint(srv.Addr), options[1])
}

func TestExporterDefaultSettings(t *testing.T) {
ctx := context.Background()
// Start a fake server running locally.
Expand Down
6 changes: 3 additions & 3 deletions exporter/googlecloudpubsubexporter/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion exporter/googlecloudpubsubexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.0
require (
cloud.google.com/go/pubsub v1.45.1
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.13.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.114.1-0.20241202231142-b9ff1bc54c99
go.opentelemetry.io/collector/component/componenttest v0.114.1-0.20241202231142-b9ff1bc54c99
Expand All @@ -14,6 +15,7 @@ require (
go.opentelemetry.io/collector/exporter v0.114.1-0.20241202231142-b9ff1bc54c99
go.opentelemetry.io/collector/exporter/exportertest v0.114.1-0.20241202231142-b9ff1bc54c99
go.opentelemetry.io/collector/pdata v1.20.1-0.20241202231142-b9ff1bc54c99
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/api v0.205.0
google.golang.org/grpc v1.67.1
Expand All @@ -36,7 +38,6 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
Expand Down
5 changes: 4 additions & 1 deletion exporter/googlecloudpubsubexporter/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ status:
tests:
skip_lifecycle: true
goleak:
skip: true
ignore:
top:
# See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
- "go.opencensus.io/stats/view.(*worker).start"
82 changes: 82 additions & 0 deletions exporter/googlecloudpubsubexporter/publisher_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package googlecloudpubsubexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudpubsubexporter"

import (
"context"
"fmt"

pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// publisherClient subset of `pubsub.PublisherClient`
type publisherClient interface {
Close() error
Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error)
}

// wrappedPublisherClient allows to override the close function
type wrappedPublisherClient struct {
publisherClient
closeFn func() error
}

func (c *wrappedPublisherClient) Close() error {
if c.closeFn != nil {
return c.closeFn()
}
return c.publisherClient.Close()
}

func newPublisherClient(ctx context.Context, config *Config, userAgent string) (publisherClient, error) {
clientOptions, closeFn, err := generateClientOptions(config, userAgent)
if err != nil {
return nil, fmt.Errorf("failed preparing the gRPC client options to PubSub: %w", err)
}

client, err := pubsub.NewPublisherClient(ctx, clientOptions...)
if err != nil {
return nil, fmt.Errorf("failed creating the gRPC client to PubSub: %w", err)
}

if closeFn == nil {
return client, nil
}

return &wrappedPublisherClient{
publisherClient: client,
closeFn: closeFn,
}, nil
}

func generateClientOptions(config *Config, userAgent string) ([]option.ClientOption, func() error, error) {
var copts []option.ClientOption
var closeFn func() error

if userAgent != "" {
copts = append(copts, option.WithUserAgent(userAgent))
}
if config.Endpoint != "" {
if config.Insecure {
var dialOpts []grpc.DialOption
if userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent))
}
client, err := grpc.NewClient(config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
if err != nil {
return nil, nil, err
}
copts = append(copts, option.WithGRPCConn(client))
closeFn = client.Close // we need to be able to properly close the grpc client otherwise it'll leak goroutines
} else {
copts = append(copts, option.WithEndpoint(config.Endpoint))
}
}
return copts, closeFn, nil
}
126 changes: 126 additions & 0 deletions exporter/googlecloudpubsubexporter/publisher_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package googlecloudpubsubexporter

import (
"context"
"testing"

pubsub "cloud.google.com/go/pubsub/apiv1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/api/option"
)

func TestGenerateClientOptions(t *testing.T) {
factory := NewFactory()

t.Run("defaults", func(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = "my-project"
cfg.Topic = "projects/my-project/topics/otlp"

require.NoError(t, cfg.Validate())

gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 6789")
assert.NoError(t, err)
assert.Empty(t, closeConnFn)

expectedOptions := []option.ClientOption{
option.WithUserAgent("test-user-agent 6789"),
}
assert.ElementsMatch(t, expectedOptions, gotOptions)
})

t.Run("secure custom endpoint", func(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = "my-project"
cfg.Topic = "projects/my-project/topics/otlp"
cfg.Endpoint = "defg"

require.NoError(t, cfg.Validate())

gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 4321")
assert.NoError(t, err)
assert.Empty(t, closeConnFn)

expectedOptions := []option.ClientOption{
option.WithUserAgent("test-user-agent 4321"),
option.WithEndpoint("defg"),
}
assert.ElementsMatch(t, expectedOptions, gotOptions)
})

t.Run("insecure endpoint", func(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = "my-project"
cfg.Topic = "projects/my-project/topics/otlp"
cfg.Endpoint = "abcd"
cfg.Insecure = true

require.NoError(t, cfg.Validate())

gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 1234")
assert.NoError(t, err)
assert.NotEmpty(t, closeConnFn)
assert.NoError(t, closeConnFn())

require.Len(t, gotOptions, 2)
assert.Equal(t, option.WithUserAgent("test-user-agent 1234"), gotOptions[0])
assert.IsType(t, option.WithGRPCConn(nil), gotOptions[1])
})
}

func TestNewPublisherClient(t *testing.T) {
// The publisher client checks for credentials during init
t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "testdata/gcp-fake-creds.json")

ctx := context.Background()
factory := NewFactory()

t.Run("defaults", func(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = "my-project"
cfg.Topic = "projects/my-project/topics/otlp"

require.NoError(t, cfg.Validate())

client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789")
assert.NoError(t, err)
require.NotEmpty(t, client)
assert.IsType(t, &pubsub.PublisherClient{}, client)
assert.NoError(t, client.Close())
})

t.Run("secure custom endpoint", func(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = "my-project"
cfg.Topic = "projects/my-project/topics/otlp"
cfg.Endpoint = "xyz"

require.NoError(t, cfg.Validate())

client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789")
assert.NoError(t, err)
require.NotEmpty(t, client)
assert.IsType(t, &pubsub.PublisherClient{}, client)
assert.NoError(t, client.Close())
})

t.Run("insecure endpoint", func(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = "my-project"
cfg.Topic = "projects/my-project/topics/otlp"
cfg.Endpoint = "abc"
cfg.Insecure = true

require.NoError(t, cfg.Validate())

client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789")
assert.NoError(t, err)
require.NotEmpty(t, client)
assert.IsType(t, &wrappedPublisherClient{}, client)
assert.NoError(t, client.Close())
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "service_account",
"private_key_id": "abc",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\nNtBLSfcqPULqD+h7br9lEJio\n-----END PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "123-abc.apps.googleusercontent.com",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "http://localhost:8080/token"
}

0 comments on commit 1260faf

Please sign in to comment.