Skip to content

Commit

Permalink
Add Batch Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Jul 15, 2024
1 parent 466ab0c commit d9dc28c
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 10 deletions.
106 changes: 106 additions & 0 deletions xkafka/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package xkafka

import (
"sync"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/rs/xid"
)

// Batch is a group of messages that are processed together.
type Batch struct {
ID string
Messages []*Message
Status Status

err error
lock sync.Mutex
}

// NewBatch creates a new Batch.
func NewBatch() *Batch {
return &Batch{
ID: xid.New().String(),
Messages: make([]*Message, 0),
}
}

// AckSuccess marks the batch as successfully processed.
func (b *Batch) AckSuccess() {
b.lock.Lock()
defer b.lock.Unlock()

b.Status = Success
}

// AckFail marks the batch as failed to process.
func (b *Batch) AckFail(err error) {
b.lock.Lock()
defer b.lock.Unlock()

b.Status = Fail
b.err = err
}

// AckSkip marks the batch as skipped.
func (b *Batch) AckSkip() {
b.lock.Lock()
defer b.lock.Unlock()

b.Status = Skip
}

// Err returns the error that caused the batch to fail.
func (b *Batch) Err() error {
b.lock.Lock()
defer b.lock.Unlock()

return b.err
}

// MaxOffset returns the maximum offset among the
// messages in the batch.
func (b *Batch) MaxOffset() int64 {
b.lock.Lock()
defer b.lock.Unlock()

var max int64
for _, m := range b.Messages {
if m.Offset > max {
max = m.Offset
}
}

return max
}

// GroupMaxOffset returns the maximum offset for each
// topic-partition in the batch.
func (b *Batch) GroupMaxOffset() []kafka.TopicPartition {
b.lock.Lock()
defer b.lock.Unlock()

offsets := make(map[string]map[int32]int64)
for _, m := range b.Messages {
if _, ok := offsets[m.Topic]; !ok {
offsets[m.Topic] = make(map[int32]int64)
}

if m.Offset > offsets[m.Topic][m.Partition] {
offsets[m.Topic][m.Partition] = m.Offset
}
}

var tps []kafka.TopicPartition
for topic, partitions := range offsets {
for partition, offset := range partitions {
tps = append(tps, kafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: kafka.Offset(offset + 1),
})
}
}

return tps
}
290 changes: 290 additions & 0 deletions xkafka/batch_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
package xkafka

import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/sourcegraph/conc/pool"
"github.com/sourcegraph/conc/stream"
)

// BatchConsumer manages consumption & processing of messages
// from kafka topics in batches.
type BatchConsumer struct {
name string
kafka consumerClient
handler BatchHandler
middlewares []BatchMiddlewarer
config *consumerConfig
batch *BatchManager
cancelCtx atomic.Pointer[context.CancelFunc]
}

// NewBatchConsumer creates a new BatchConsumer instance.
func NewBatchConsumer(name string, handler BatchHandler, opts ...ConsumerOption) (*BatchConsumer, error) {
cfg, err := newConsumerConfig(opts...)
if err != nil {
return nil, err
}

// override kafka configs
_ = cfg.configMap.SetKey("enable.auto.offset.store", false)
_ = cfg.configMap.SetKey("bootstrap.servers", strings.Join(cfg.brokers, ","))
_ = cfg.configMap.SetKey("group.id", name)

if cfg.manualCommit {
_ = cfg.configMap.SetKey("enable.auto.commit", false)
}

consumer, err := cfg.consumerFn(&cfg.configMap)
if err != nil {
return nil, err
}

return &BatchConsumer{
name: name,
config: cfg,
kafka: consumer,
handler: handler,
batch: NewBatchManager(cfg.batchSize, cfg.batchTimeout),
}, nil
}

// GetMetadata returns the metadata for the consumer.
func (c *BatchConsumer) GetMetadata() (*Metadata, error) {
return c.kafka.GetMetadata(nil, false, int(c.config.metadataTimeout.Milliseconds()))
}

// Use appends a BatchMiddleware to the chain.
func (c *BatchConsumer) Use(mws ...BatchMiddlewarer) {
c.middlewares = append(c.middlewares, mws...)
}

// Run starts the consumer and blocks until context is cancelled.
func (c *BatchConsumer) Run(ctx context.Context) error {
if err := c.start(ctx); err != nil {
return err
}

return nil
}

func (c *BatchConsumer) start(ctx context.Context) error {
c.handler = c.concatMiddlewares(c.handler)

pool := pool.New().
WithContext(ctx).
WithMaxGoroutines(2).
WithCancelOnError()

pool.Go(func(ctx context.Context) error {
if c.config.concurrency > 1 {
return c.processAsync(ctx)
}

return c.process(ctx)
})

pool.Go(func(ctx context.Context) error {
return c.consume(ctx)
})

return pool.Wait()
}

func (c *BatchConsumer) process(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case batch := <-c.batch.Receive():
err := c.handler.HandleBatch(ctx, batch)
if ferr := c.config.errorHandler(err); ferr != nil {
return ferr
}

err = c.saveOffset(batch)
if err != nil {
return err
}
}
}
}

func (c *BatchConsumer) processAsync(ctx context.Context) error {
st := stream.New().WithMaxGoroutines(c.config.concurrency)
ctx, cancel := context.WithCancelCause(ctx)

for {
select {
case <-ctx.Done():
st.Wait()

return context.Cause(ctx)
case batch := <-c.batch.Receive():
st.Go(func() stream.Callback {
err := c.handler.HandleBatch(ctx, batch)
if ferr := c.config.errorHandler(err); ferr != nil {
cancel(ferr)

return func() {}
}

return func() {
if err := c.saveOffset(batch); err != nil {
cancel(err)
}
}
})
}
}
}

func (c *BatchConsumer) consume(ctx context.Context) (err error) {
if err := c.subscribe(); err != nil {
return err
}

defer func() {
if uerr := c.unsubscribe(); uerr != nil {
err = errors.Join(err, uerr)
}
}()

for {
select {
case <-ctx.Done():
return err
default:
km, err := c.kafka.ReadMessage(c.config.pollTimeout)
if err != nil {
var kerr kafka.Error
if errors.As(err, &kerr) && kerr.Code() == kafka.ErrTimedOut {
continue
}

if ferr := c.config.errorHandler(err); ferr != nil {
err = ferr

return err
}

continue
}

msg := newMessage(c.name, km)

c.batch.Add(msg)
}
}
}

func (c *BatchConsumer) subscribe() error {
return c.kafka.SubscribeTopics(c.config.topics, nil)
}

func (c *BatchConsumer) unsubscribe() error {
return c.kafka.Unsubscribe()
}

func (c *BatchConsumer) concatMiddlewares(handler BatchHandler) BatchHandler {
for i := len(c.middlewares) - 1; i >= 0; i-- {
handler = c.middlewares[i].BatchMiddleware(handler)
}

return handler
}

func (c *BatchConsumer) saveOffset(batch *Batch) error {
if batch.Status == Fail {
return nil
}

offsets := batch.GroupMaxOffset()

_, err := c.kafka.StoreOffsets(offsets)
if err != nil {
return err
}

if c.config.manualCommit {
if _, err := c.kafka.Commit(); err != nil {
return err
}
}

return nil
}

// BatchManager manages aggregation and processing of Message batches.
type BatchManager struct {
size int
timeout time.Duration
batch *Batch
mutex *sync.RWMutex
flushChan chan *Batch
}

// NewBatchManager creates a new BatchManager.
func NewBatchManager(size int, timeout time.Duration) *BatchManager {
b := &BatchManager{
size: size,
timeout: timeout,
mutex: &sync.RWMutex{},
batch: &Batch{
Messages: make([]*Message, 0, size),
},
flushChan: make(chan *Batch),
}

go b.runFlushByTime()

return b
}

// Add adds to batch and flushes when MaxSize is reached.
func (b *BatchManager) Add(m *Message) {
b.mutex.Lock()
b.batch.Messages = append(b.batch.Messages, m)

if len(b.batch.Messages) >= b.size {
b.flush()
}

b.mutex.Unlock()
}

// Receive returns a channel to read batched Messages.
func (b *BatchManager) Receive() <-chan *Batch {
return b.flushChan
}

func (b *BatchManager) runFlushByTime() {
t := time.NewTicker(b.timeout)

for range t.C {
b.mutex.Lock()
b.flush()
b.mutex.Unlock()
}
}

// flush sends the batch to the flush channel and resets the batch.
// DESIGN: flush does NOT acquire a mutex lock. Locks should be managed by caller.
// nolint:gosimple
func (b *BatchManager) flush() {
if len(b.batch.Messages) == 0 {
return
}

b.flushChan <- b.batch

b.batch = &Batch{
Messages: make([]*Message, 0, b.size),
}
}
Loading

0 comments on commit d9dc28c

Please sign in to comment.