Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid pausing/resuming consumers #363

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions internal/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,24 +296,27 @@ func notifyPullMessageError(ctx context.Context, s Implementation, err error, re
}

func receiveMessages(ctx context.Context, s Implementation, consumer IConsumer, req *PullRequest) ([]messagebroker.ReceivedMessage, error) {
// wrapping this code block in an anonymous function so that defer on time-taken metric can be scoped
if s.CanConsumeMore() == false {
logger.Ctx(ctx).Infow("subscriber: cannot consume more messages before acking", "logFields", getLogFields(s))
// check if consumer is paused once maxOutstanding messages limit is hit
if consumer.IsPaused(ctx) == false {
consumer.PauseConsumer(ctx)
}
} else {
// resume consumer if paused and is allowed to consume more messages
if consumer.IsPaused(ctx) {
consumer.ResumeConsumer(ctx)
var msgs []messagebroker.ReceivedMessage
if s.CanConsumeMore() {
// logger.Ctx(ctx).Infow("subscriber: cannot consume more messages before acking", "logFields", getLogFields(s))
// // check if consumer is paused once maxOutstanding messages limit is hit
// if consumer.IsPaused(ctx) == false {
// consumer.PauseConsumer(ctx)
// }
// } else {
// // resume consumer if paused and is allowed to consume more messages
// if consumer.IsPaused(ctx) {
// consumer.ResumeConsumer(ctx)
// }
// }
resp, err := consumer.ReceiveMessages(ctx, req.MaxNumOfMessages)
if err != nil {
return nil, err
}
return resp.Messages, nil
}
resp, err := consumer.ReceiveMessages(ctx, req.MaxNumOfMessages)
if err != nil {
return nil, err
}
return resp.Messages, nil
return msgs, nil

}

func getLogFields(s Implementation) map[string]interface{} {
Expand Down