From 1f50d4e2a6bd09483b6a0dd8c4b1ae8a491cbdf5 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 1 Apr 2021 14:04:20 +0530 Subject: [PATCH] Stop and reset ticker after processing This is required so that batches are made of big size at the time of full sink. Solves Time part of https://github.com/practo/tipoca-stream/pull/172#issuecomment-811055840 --- pkg/redshiftloader/loader_handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/redshiftloader/loader_handler.go b/pkg/redshiftloader/loader_handler.go index 85a04c5e8..b8206350a 100644 --- a/pkg/redshiftloader/loader_handler.go +++ b/pkg/redshiftloader/loader_handler.go @@ -162,6 +162,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, ) return fmt.Errorf("session ctx done, err: %v", session.Context().Err()) case message, ok := <-claimMsgChan: + maxWaitTicker.Stop() if !ok { klog.V(2).Infof( "%s: consumeClaim returning. read msg channel closed", @@ -220,6 +221,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, return err } *lastSchemaId = upstreamJobSchemaId + maxWaitTicker.Reset(time.Duration(*h.maxWaitSeconds) * time.Second) case <-maxWaitTicker.C: // Process the batch by time klog.V(2).Infof(