Skip to content

Commit

Permalink
Stop and reset ticker after processing
Browse files Browse the repository at this point in the history
This is required so that batches are made of big size at the time of full sink.
Solves Time part of #172 (comment)
  • Loading branch information
alok87 committed Apr 1, 2021
1 parent b911ca2 commit 1f50d4e
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/redshiftloader/loader_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
)
return fmt.Errorf("session ctx done, err: %v", session.Context().Err())
case message, ok := <-claimMsgChan:
maxWaitTicker.Stop()
if !ok {
klog.V(2).Infof(
"%s: consumeClaim returning. read msg channel closed",
Expand Down Expand Up @@ -220,6 +221,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
return err
}
*lastSchemaId = upstreamJobSchemaId
maxWaitTicker.Reset(time.Duration(*h.maxWaitSeconds) * time.Second)
case <-maxWaitTicker.C:
// Process the batch by time
klog.V(2).Infof(
Expand Down

0 comments on commit 1f50d4e

Please sign in to comment.