-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancing xkafka.Consumer
for Ordered Async Message Processing
#30
Comments
To summarize, the async mode requires some form of ordering guarantees. Given that a consumer can process messages from multiple partitions & topics, xkafka.Consumer should provide a mechanism to control the order of message processing. The ordering could be based on:
This can be achieved by introducing a middleware, lets call it OrderBy, that can be used to enforce ordering guarantees using semaphores. The implementation of the middleware would be similar to the following: func OrderBy(lockerFn func(*Message) string) middleware {
return func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, msg *Message) error {
key := lockerFn(msg)
// acquire lock
mutexes.Acquire(key)
// release lock after processing
defer mutexes.Release(key)
return next.Handle(ctx, msg)
})
}
}
type mutexes struct {
lock sync.Map
}
func (m *mutexes) Acquire(key string) {
lock, _ := m.lock.LoadOrStore(key, &sync.Mutex{})
lock.(*sync.Mutex).Lock()
}
func (m *mutexes) Release(key string) {
lock, _ := m.lock.Load(key)
lock.(*sync.Mutex).Unlock()
} The middleware can be used as: // by partition
consumer.Use(OrderBy(orderByPartition))
// or by topic & partition
consumer.Use(OrderBy(orderByTopicAndPartition))
// or by key & partition
consumer.Use(OrderBy(orderByKeyAndPartition))
orderByPartition := func(msg *Message) string {
return fmt.Sprintf("%d", msg.Partition)
}
orderByTopicAndPartition := func(msg *Message) string {
return fmt.Sprintf("%s-%d", msg.Topic, msg.Partition)
}
orderByKeyAndPartition := func(msg *Message) string {
return fmt.Sprintf("%s-%d", string(msg.Key), msg.Partition)
} With this, the middleware holds the lock and makes sure that subsequent messages are processed only after the current message is processed. |
This won't guarantee ordering as the issue happens at Say, 2 ordered elements are correctly read from Kafka, however, because If there is no guarantee of execution start from Correct me if this understanding is incorrect, as I have seen this happening in |
I see your point. By locking at the stream.Go layer, the consumer will be blocked until the a lock is released. This kind of makes the async mode behave like syncronous mode momentarily. But on the other hand, a full async implementation would require an in memory queue. |
Lets try adding this to the example and see how it behaves. I think the API can be simplified to just accepting a key function. type OrderBy func(*Message) string But there's a potential issue of memory bloat if the number of keys is large. |
Proposal Overview
This proposal aims to modify the behavior of
xkafka.Consumer
in asynchronous consumption contexts to allow better control over the order of message consumption.Problem Statement
The current implementation of
xkafka.Concurrency
lacks the capability to control when message processing starts. This limitation becomes significant in cases where messages must be processed in a specific order for correct application behavior. For instance, messages with the sameKey
require that while one message is under processing, the subsequent message should wait, ensuring in-order execution. This kind of synchronization is absent in the Async mode.Proposed Solution
Introduce a semaphore-like mechanism that the asynchronous consumer can use to determine if it should start processing a message or wait.
Proposed API Changes
Define a new structure
MessageMetadata
and an interfaceConsumerSemaphore
:Integrate the semaphore into the
Consumer
structure and modify the message processing flow to utilize this semaphore. Key changes include:Consumer
struct.Expected Benefits
Implementing this solution would provide the needed control over message processing order in asynchronous scenarios, enhancing the reliability and correctness of the application using
xkafka.Consumer
.The text was updated successfully, but these errors were encountered: