Skip to content

Commit

Permalink
Fix memory leak (#53)
Browse files Browse the repository at this point in the history
* add missed return

* refactor subscriber to prevent locking on err channel

Refactor pull method and subscriber to prevent falltrough to second
iteration after pull send err into channel.

* Make error channel buffered for one element

To prevent deadlock for routine if error never recived from consumer add
one element buffer to unblock routine and allow it return from main loop
  • Loading branch information
Aliaksei Burau authored Oct 14, 2019
1 parent e4c5e05 commit 6e0b1b3
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
49 changes: 29 additions & 20 deletions aws/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"log"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -47,8 +46,6 @@ type awsSubscriber struct {
queueArn *string
topicArn *string
subscriptionArn *string

wg sync.WaitGroup
}

func (s *awsSubscriber) Start(ctx context.Context, opts ...pubsub.Option) (<-chan pubsub.Message, <-chan error) {
Expand All @@ -60,31 +57,42 @@ func (s *awsSubscriber) Start(ctx context.Context, opts ...pubsub.Option) (<-cha
for _, opt := range opts {
opt(subscriberOptions)
}
channel := make(chan pubsub.Message)
errChannel := make(chan error)
msgChannel := make(chan pubsub.Message)
errChannel := make(chan error, 1)
go func() {
defer close(channel)
defer close(msgChannel)
defer close(errChannel)

if fErr := s.ensureFilterPolicy(subscriberOptions.Filter); fErr != nil {
errChannel <- fErr
return
}

// Set Retention Period and visibility timeout if needed
if err := ensureQueueAttributes(s.queueURL, subscriberOptions.RetentionPeriod, subscriberOptions.VisibilityTimeout, s.sqs); err != nil {
errChannel <- err
return
}

for {
select {
case <-ctx.Done():
s.wg.Wait()
return
default:
s.pull(ctx, channel, errChannel)
messages, err := s.pull(ctx)
if err != nil {
errChannel <- err
return
}

for _, msg := range messages {
msgChannel <- msg
}
}
}
}()

return channel, errChannel
return msgChannel, errChannel
}

func (s *awsSubscriber) AckMessage(ctx context.Context, messageID string) error {
Expand Down Expand Up @@ -278,38 +286,39 @@ func (s *awsSubscriber) ensureFilterPolicy(filter map[string]string) error {
}

// pull returns the message and error channel for the subscriber
func (s *awsSubscriber) pull(ctx context.Context, channel chan pubsub.Message, errChannel chan error) {
s.wg.Add(1)
defer s.wg.Done()
func (s *awsSubscriber) pull(ctx context.Context) ([]*awsMessage, error) {
resp, err := s.sqs.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: s.queueURL,
WaitTimeSeconds: aws.Int64(20),
MaxNumberOfMessages: aws.Int64(1),
MessageAttributeNames: []*string{aws.String("All")},
})
if err != nil {
errChannel <- err
return
log.Printf("AWS: error while fetching messages %v", err)
return nil, err
}

messages := make([]*awsMessage, 0, len(resp.Messages))
for _, msg := range resp.Messages {
message, err := decodeFromSQSMessage(msg.Body)
if err != nil {
log.Printf("AWS: error parsing SQS message body: %v", err)
errChannel <- err
continue
return nil, err
}

attributes, err := decodeMessageAttributes(msg.Body)
if err != nil {
log.Printf("AWS: error parsing SQS message attributes: %v", err)
errChannel <- err
continue
return nil, err
}
channel <- &awsMessage{
messages = append(messages, &awsMessage{
ctx: ctx,
subscriber: s,
messageID: *msg.ReceiptHandle,
message: message,
metadata: attributes,
}
})
}

return messages, nil
}
1 change: 1 addition & 0 deletions grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (w *grpcClientWrapper) Start(ctx context.Context, opts ...pubsub.Option) (<
select {
case <-w.cancel:
errC <- fmt.Errorf("PUBSUB client: stream closed for subscription %s, topic %s", w.subscriptionID, w.topic)
return
case <-stream.Context().Done():
errC <- fmt.Errorf("PUBSUB client: stream closed for subscription %s, topic %s", w.subscriptionID, w.topic)
return
Expand Down

0 comments on commit 6e0b1b3

Please sign in to comment.