Skip to content

Commit

Permalink
Always defer stop
Browse files Browse the repository at this point in the history
  • Loading branch information
alok87 committed May 5, 2021
1 parent fc2b90c commit 264fb00
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ func (b *loadProcessor) processBatch(
func (b *loadProcessor) Process(session sarama.ConsumerGroupSession, msgBuf []*serializer.Message) error {
start := time.Now()
b.metric.setStartRunning()
defer b.metric.setStopRunning()

b.setBatchId()
ctx := session.Context()

Expand All @@ -796,7 +798,6 @@ func (b *loadProcessor) Process(session sarama.ConsumerGroupSession, msgBuf []*s
return err
}
b.markOffset(session, msgBuf)
b.metric.setStopRunning()

var timeTaken string
secondsTaken := time.Since(start).Seconds()
Expand Down

0 comments on commit 264fb00

Please sign in to comment.