Skip to content

Commit

Permalink
Merge branch 'main' into walheartbeat-for-peers
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 24, 2023
2 parents 89bbbe8 + 76e6e78 commit 6656118
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func (c *EventHubConnector) processBatch(
ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()

lastSeenLSN := int64(0)
lastUpdatedOffset := int64(0)

numRecords := 0
for {
select {
Expand All @@ -150,6 +153,12 @@ func (c *EventHubConnector) processBatch(
}

numRecords++

recordLSN := record.GetCheckPointID()
if recordLSN > lastSeenLSN {
lastSeenLSN = recordLSN
}

json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -186,6 +195,15 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

if lastSeenLSN > lastUpdatedOffset {
err = c.updateLastOffset(flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
log.Infof("[eh] updated last offset for %s to %d", flowJobName, lastSeenLSN)
if err != nil {
return 0, fmt.Errorf("failed to update last offset: %v", err)
}
}

ticker.Stop()
ticker = time.NewTicker(eventHubFlushTimeout)
}
Expand Down

0 comments on commit 6656118

Please sign in to comment.