Skip to content

Commit

Permalink
[eventhubs] Update last commited LSN on metadata after we flush (#709)
Browse files Browse the repository at this point in the history
If the sync flow fails this would make it so that the metadata is more
up-to data. Prior to the change the metadata would only be updated once
per sync flow, now this is done every 10s, so its more updated.
  • Loading branch information
iskakaushik authored Nov 24, 2023
1 parent 0c5cbaa commit 76e6e78
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func (c *EventHubConnector) processBatch(
ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()

lastSeenLSN := int64(0)
lastUpdatedOffset := int64(0)

numRecords := 0
for {
select {
Expand All @@ -150,6 +153,12 @@ func (c *EventHubConnector) processBatch(
}

numRecords++

recordLSN := record.GetCheckPointID()
if recordLSN > lastSeenLSN {
lastSeenLSN = recordLSN
}

json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
if err != nil {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -186,6 +195,15 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

if lastSeenLSN > lastUpdatedOffset {
err = c.updateLastOffset(flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
log.Infof("[eh] updated last offset for %s to %d", flowJobName, lastSeenLSN)
if err != nil {
return 0, fmt.Errorf("failed to update last offset: %v", err)
}
}

ticker.Stop()
ticker = time.NewTicker(eventHubFlushTimeout)
}
Expand Down

0 comments on commit 76e6e78

Please sign in to comment.