diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 027953fd79..6c0f6b4ac9 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -133,6 +133,9 @@ func (c *EventHubConnector) processBatch( ticker := time.NewTicker(eventHubFlushTimeout) defer ticker.Stop() + lastSeenLSN := int64(0) + lastUpdatedOffset := int64(0) + numRecords := 0 for { select { @@ -150,6 +153,12 @@ func (c *EventHubConnector) processBatch( } numRecords++ + + recordLSN := record.GetCheckPointID() + if recordLSN > lastSeenLSN { + lastSeenLSN = recordLSN + } + json, err := record.GetItems().ToJSONWithOpts(toJSONOpts) if err != nil { log.WithFields(log.Fields{ @@ -186,6 +195,15 @@ func (c *EventHubConnector) processBatch( return 0, err } + if lastSeenLSN > lastUpdatedOffset { + err = c.updateLastOffset(flowJobName, lastSeenLSN) + lastUpdatedOffset = lastSeenLSN + log.Infof("[eh] updated last offset for %s to %d", flowJobName, lastSeenLSN) + if err != nil { + return 0, fmt.Errorf("failed to update last offset: %v", err) + } + } + ticker.Stop() ticker = time.NewTicker(eventHubFlushTimeout) }