Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancing xkafka.Consumer for Ordered Async Message Processing #30

Open
ajatprabha opened this issue Dec 19, 2023 · 4 comments
Open

Enhancing xkafka.Consumer for Ordered Async Message Processing #30

ajatprabha opened this issue Dec 19, 2023 · 4 comments
Assignees
Labels
enhancement New feature or request

Comments

@ajatprabha
Copy link
Contributor

Proposal Overview

This proposal aims to modify the behavior of xkafka.Consumer in asynchronous consumption contexts to allow better control over the order of message consumption.


Problem Statement

The current implementation of xkafka.Concurrency lacks the capability to control when message processing starts. This limitation becomes significant in cases where messages must be processed in a specific order for correct application behavior. For instance, messages with the same Key require that while one message is under processing, the subsequent message should wait, ensuring in-order execution. This kind of synchronization is absent in the Async mode.

Proposed Solution

Introduce a semaphore-like mechanism that the asynchronous consumer can use to determine if it should start processing a message or wait.

Proposed API Changes

  1. Define a new structure MessageMetadata and an interface ConsumerSemaphore:

    type MessageMetadata struct {
    	Key		[]byte
    	Partition	int32
    	Offset	int64
    }
    
    type ConsumerSemaphore interface {
    	AcquireLock(*MessageMetadata)
    	ReleaseLock(*MessageMetadata)
    }
  2. Integrate the semaphore into the Consumer structure and modify the message processing flow to utilize this semaphore. Key changes include:

    • Adding a semaphore field to the Consumer struct.
    • Adjusting the async message processing routine to acquire and release locks based on message metadata.

    diff --git a/xkafka/consumer.go b/xkafka/consumer.go
    index d744033..72e99ba 100644
    --- a/xkafka/consumer.go
    +++ b/xkafka/consumer.go
    @@ -18,6 +18,7 @@ type Consumer struct {
     	handler     Handler
     	middlewares []middleware
     	config      options
    +	sema        ConsumerSemaphore
     }
    
    // NewConsumer creates a new Consumer instance.
    @@ -48,6 +49,7 @@ func NewConsumer(name string, handler Handler, opts ...Option) (*Consumer, error
     		config:  cfg,
     		kafka:   consumer,
     		handler: handler,
    +		sema:    cfg.sema,
     	}, nil
    }
    
    @@ -168,20 +170,20 @@ func (c *Consumer) runAsync(ctx context.Context) error {
    
    			msg := newMessage(c.name, km)
    
    -			st.Go(func() stream.Callback {
    -				err := c.handler.Handle(ctx, msg)
    -				if ferr := c.config.errorHandler(err); ferr != nil {
    -					cancel(ferr)
    -
    -					return func() {}
    +			if c.sema != nil {
    +				mm := &MessageMetadata{
    +					Key:       msg.Key,
    +					Partition: msg.Partition,
    +					Offset:    msg.Offset,
    				}
    
    -				return func() {
    -					if err := c.storeMessage(msg); err != nil {
    -						cancel(err)
    -					}
    -				}
    -			})
    +				c.sema.AcquireLock(mm)
    +				st.Go(c.taskWithLock(ctx, cancel, msg, mm))
    +
    +				continue
    +			}
    +
    +			st.Go(c.task(ctx, cancel, msg))
     		}
     	}
     }
    @@ -239,3 +241,41 @@ func (c *Consumer) Close() {
     
     	_ = c.kafka.Close()
     }
    +
    +func (c *Consumer) taskWithLock(ctx context.Context, cancel context.CancelCauseFunc, msg *Message, mm *MessageMetadata) stream.Task {
    +	return func() stream.Callback {
    +		err := c.handler.Handle(ctx, msg)
    +		if ferr := c.config.errorHandler(err); ferr != nil {
    +			cancel(ferr)
    +
    +			return func() {
    +				c.sema.ReleaseLock(mm)
    +			}
    +		}
    +
    +		return func() {
    +			if err := c.storeMessage(msg); err != nil {
    +				cancel(err)
    +			}
    +
    +			c.sema.ReleaseLock(mm)
    +		}
    +	}
    +}
    +
    +func (c *Consumer) task(ctx context.Context, cancel context.CancelCauseFunc, msg *Message) stream.Task {
    +	return func() stream.Callback {
    +		err := c.handler.Handle(ctx, msg)
    +		if ferr := c.config.errorHandler(err); ferr != nil {
    +			cancel(ferr)
    +
    +			return func() {}
    +		}
    +
    +		return func() {
    +			if err := c.storeMessage(msg); err != nil {
    +				cancel(err)
    +			}
    +		}
    +	}
    +}
    diff --git a/xkafka/options.go b/xkafka/options.go
    index f7e147d..a5f10e8 100644
    --- a/xkafka/options.go
    +++ b/xkafka/options.go
    @@ -84,6 +84,7 @@ type options struct {
     	pollTimeout     time.Duration
     	concurrency     int
     	manualCommit    bool
    +	sema            ConsumerSemaphore
     
     	// producer options
     	producerFn producerFunc

Expected Benefits

Implementing this solution would provide the needed control over message processing order in asynchronous scenarios, enhancing the reliability and correctness of the application using xkafka.Consumer.

@ajatprabha ajatprabha added the enhancement New feature or request label Dec 19, 2023
@sonnes
Copy link
Collaborator

sonnes commented Dec 19, 2023

To summarize, the async mode requires some form of ordering guarantees. Given that a consumer can process messages from multiple partitions & topics, xkafka.Consumer should provide a mechanism to control the order of message processing. The ordering could be based on:

  • Topic
  • Partition
  • Key (if present)
  • Value or field in the value
  • Metadata

This can be achieved by introducing a middleware, lets call it OrderBy, that can be used to enforce ordering guarantees using semaphores.

The implementation of the middleware would be similar to the following:

func OrderBy(lockerFn func(*Message) string) middleware {
	return func(next Handler) Handler {
		return HandlerFunc(func(ctx context.Context, msg *Message) error {
			key := lockerFn(msg)

			// acquire lock
			mutexes.Acquire(key)

			// release lock after processing
			defer mutexes.Release(key)

			return next.Handle(ctx, msg)
		})
	}
}

type mutexes struct {
	lock sync.Map
}

func (m *mutexes) Acquire(key string) {
	lock, _ := m.lock.LoadOrStore(key, &sync.Mutex{})

	lock.(*sync.Mutex).Lock()
}

func (m *mutexes) Release(key string) {
	lock, _ := m.lock.Load(key)

	lock.(*sync.Mutex).Unlock()
}

The middleware can be used as:

// by partition
consumer.Use(OrderBy(orderByPartition))

// or by topic & partition
consumer.Use(OrderBy(orderByTopicAndPartition))

// or by key & partition
consumer.Use(OrderBy(orderByKeyAndPartition))

orderByPartition := func(msg *Message) string {
	return fmt.Sprintf("%d", msg.Partition)
}

orderByTopicAndPartition := func(msg *Message) string {
	return fmt.Sprintf("%s-%d", msg.Topic, msg.Partition)
}

orderByKeyAndPartition := func(msg *Message) string {
	return fmt.Sprintf("%s-%d", string(msg.Key), msg.Partition)
}

With this, the middleware holds the lock and makes sure that subsequent messages are processed only after the current message is processed.

@ajatprabha
Copy link
Contributor Author

ajatprabha commented Dec 19, 2023

This won't guarantee ordering as the issue happens at stream.Go layer as it doesn't guarantee ordered start of execution after it has Read from kafka.

Say, 2 ordered elements are correctly read from Kafka, however, because stream.Go was called almost instantly for both, it may happen that second element acquires lock first, never giving the chance to the 1st element.

If there is no guarantee of execution start from stream.Go, it should handled separately.

Correct me if this understanding is incorrect, as I have seen this happening in examples/xkafka, I have not tried a lock based approach yet though.

@sonnes
Copy link
Collaborator

sonnes commented Dec 19, 2023

I see your point.

By locking at the stream.Go layer, the consumer will be blocked until the a lock is released. This kind of makes the async mode behave like syncronous mode momentarily.

But on the other hand, a full async implementation would require an in memory queue.

@sonnes
Copy link
Collaborator

sonnes commented Dec 21, 2023

Lets try adding this to the example and see how it behaves.

I think the API can be simplified to just accepting a key function.

type OrderBy func(*Message) string

But there's a potential issue of memory bloat if the number of keys is large.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants