From f00766fad2d03c6224e341050de00b58b36c74c4 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 19 Mar 2021 08:09:25 +0530 Subject: [PATCH] Max buffer should be equal to the concurrency set Constant buffer size was causing the batcher to die out of memory. We do not need the buffer to be greater than the concurrency. This should greatly solve the scaling issue of batcher and make it perform in most optimum way. https://github.com/practo/tipoca-stream/issues/167 --- pkg/redshiftbatcher/batch_processor.go | 2 +- pkg/redshiftbatcher/batcher_handler.go | 2 +- pkg/serializer/message.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index fd20dea2e..c1df0121a 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -411,7 +411,7 @@ func (b *batchProcessor) Process( msgBufs := [][]*serializer.Message{} klog.V(2).Infof( - "%s: buffchan:%v msgs", + "%s: processChan:%v", b.topic, len(processChan), ) diff --git a/pkg/redshiftbatcher/batcher_handler.go b/pkg/redshiftbatcher/batcher_handler.go index 600abf3cd..96b8b12ac 100644 --- a/pkg/redshiftbatcher/batcher_handler.go +++ b/pkg/redshiftbatcher/batcher_handler.go @@ -134,7 +134,7 @@ func (h *batcherHandler) ConsumeClaim( ) var lastSchemaId *int - processChan := make(chan []*serializer.Message, 1000) + processChan := make(chan []*serializer.Message, h.maxConcurrency) errChan := make(chan error) processor := newBatchProcessor( h.consumerGroupID, diff --git a/pkg/serializer/message.go b/pkg/serializer/message.go index 4b133e0fb..31ca90acd 100644 --- a/pkg/serializer/message.go +++ b/pkg/serializer/message.go @@ -60,7 +60,7 @@ func (b *MessageAsyncBatch) Flush() { b.processChan <- b.msgBuf b.msgBuf = make([]*Message, 0, b.maxSize) klog.V(4).Infof( - "%s: flushed:%d, buffchan:%v msgs", + "%s: flushed:%d, processChan:%v", b.topic, size, len(b.processChan),