Skip to content

Commit

Permalink
make consumer process messages in parallel (#2139)
Browse files Browse the repository at this point in the history
  • Loading branch information
evq authored Oct 14, 2023
1 parent 5bc6388 commit 4a8f9ae
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions libs/redisconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"path/filepath"
"strings"
"sync"
"time"

"github.com/brave-intl/bat-go/libs/concurrent"
Expand Down Expand Up @@ -140,7 +141,7 @@ func (redisClient *RedisClient) ReadAndHandle(ctx context.Context, stream, consu
Group: consumerGroup,
Consumer: consumerID,
Streams: []string{stream, id},
Count: 5,
Count: 10,
Block: 100 * time.Millisecond,
NoAck: false,
}).Result()
Expand All @@ -149,6 +150,7 @@ func (redisClient *RedisClient) ReadAndHandle(ctx context.Context, stream, consu
}

if len(entries) > 0 {
var wg sync.WaitGroup
for i := 0; i < len(entries[0].Messages); i++ {
messageID := entries[0].Messages[i].ID
values := entries[0].Messages[i].Values
Expand All @@ -165,15 +167,20 @@ func (redisClient *RedisClient) ReadAndHandle(ctx context.Context, stream, consu
logger = &tmp
ctx = logger.WithContext(ctx)

err := handle(ctx, stream, messageID, []byte(sData))
if err != nil {
if !strings.Contains(err.Error(), "retry-after") {
logger.Warn().Err(err).Msg("message handler returned an error")
wg.Add(1)
go func() {
defer wg.Done()
err := handle(ctx, stream, messageID, []byte(sData))
if err != nil {
if !strings.Contains(err.Error(), "retry-after") {
logger.Warn().Err(err).Msg("message handler returned an error")
}
} else {
redisClient.XAck(ctx, stream, consumerGroup, messageID)

Check failure on line 179 in libs/redisconsumer/consumer.go

View workflow job for this annotation

GitHub Actions / lint

redisClient.XAck undefined (type *RedisClient has no field or method XAck) (typecheck)
}
} else {
redisClient.XAck(ctx, stream, consumerGroup, messageID)
}
}()
}
wg.Wait()
}

return len(entries)
Expand Down

0 comments on commit 4a8f9ae

Please sign in to comment.