Skip to content

Commit

Permalink
[eventhubs] Update last commited LSN on metadata after we flush
Browse files Browse the repository at this point in the history
If the sync flow fails this would make it so that the metadata is
more up-to data. Prior to the change the metadata would only be updated
once per sync flow, now this is done every 10s, so its more updated.
  • Loading branch information
iskakaushik committed Nov 24, 2023
1 parent 2eb549d commit 1509630
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func (c *EventHubConnector) processBatch(
ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()

lastSeenLSN := int64(0)
lastUpdatedOffset := int64(0)

numRecords := 0
for {
select {
Expand All @@ -150,6 +153,12 @@ func (c *EventHubConnector) processBatch(
}

numRecords++

recordLSN := record.GetCheckPointID()
if recordLSN > lastSeenLSN {
lastSeenLSN = recordLSN
}

json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -186,6 +195,15 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

if lastSeenLSN > lastUpdatedOffset {
err = c.updateLastOffset(flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
log.Infof("[eh] updated last offset for %s to %d", flowJobName, lastSeenLSN)
if err != nil {
return 0, fmt.Errorf("failed to update last offset: %v", err)
}
}

ticker.Stop()
ticker = time.NewTicker(eventHubFlushTimeout)
}
Expand Down

0 comments on commit 1509630

Please sign in to comment.