diff --git a/pkg/redshiftloader/loader_handler.go b/pkg/redshiftloader/loader_handler.go index 85a04c5e8..b8206350a 100644 --- a/pkg/redshiftloader/loader_handler.go +++ b/pkg/redshiftloader/loader_handler.go @@ -162,6 +162,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, ) return fmt.Errorf("session ctx done, err: %v", session.Context().Err()) case message, ok := <-claimMsgChan: + maxWaitTicker.Stop() if !ok { klog.V(2).Infof( "%s: consumeClaim returning. read msg channel closed", @@ -220,6 +221,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, return err } *lastSchemaId = upstreamJobSchemaId + maxWaitTicker.Reset(time.Duration(*h.maxWaitSeconds) * time.Second) case <-maxWaitTicker.C: // Process the batch by time klog.V(2).Infof(