From 15096309bfa91910fe32426d84b0a4b2dd1b28a9 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 24 Nov 2023 09:29:36 -0500 Subject: [PATCH] [eventhubs] Update last commited LSN on metadata after we flush If the sync flow fails this would make it so that the metadata is more up-to data. Prior to the change the metadata would only be updated once per sync flow, now this is done every 10s, so its more updated. --- flow/connectors/eventhub/eventhub.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 3dc8598583..73c8ed528c 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -133,6 +133,9 @@ func (c *EventHubConnector) processBatch( ticker := time.NewTicker(eventHubFlushTimeout) defer ticker.Stop() + lastSeenLSN := int64(0) + lastUpdatedOffset := int64(0) + numRecords := 0 for { select { @@ -150,6 +153,12 @@ func (c *EventHubConnector) processBatch( } numRecords++ + + recordLSN := record.GetCheckPointID() + if recordLSN > lastSeenLSN { + lastSeenLSN = recordLSN + } + json, err := record.GetItems().ToJSONWithOpts(toJSONOpts) if err != nil { log.WithFields(log.Fields{ @@ -186,6 +195,15 @@ func (c *EventHubConnector) processBatch( return 0, err } + if lastSeenLSN > lastUpdatedOffset { + err = c.updateLastOffset(flowJobName, lastSeenLSN) + lastUpdatedOffset = lastSeenLSN + log.Infof("[eh] updated last offset for %s to %d", flowJobName, lastSeenLSN) + if err != nil { + return 0, fmt.Errorf("failed to update last offset: %v", err) + } + } + ticker.Stop() ticker = time.NewTicker(eventHubFlushTimeout) }