From 76e6e7887dae4617db4cbf3579269f9687140c2a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 24 Nov 2023 10:11:14 -0500 Subject: [PATCH] [eventhubs] Update last commited LSN on metadata after we flush (#709) If the sync flow fails this would make it so that the metadata is more up-to data. Prior to the change the metadata would only be updated once per sync flow, now this is done every 10s, so its more updated. --- flow/connectors/eventhub/eventhub.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 027953fd7..6c0f6b4ac 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -133,6 +133,9 @@ func (c *EventHubConnector) processBatch( ticker := time.NewTicker(eventHubFlushTimeout) defer ticker.Stop() + lastSeenLSN := int64(0) + lastUpdatedOffset := int64(0) + numRecords := 0 for { select { @@ -150,6 +153,12 @@ func (c *EventHubConnector) processBatch( } numRecords++ + + recordLSN := record.GetCheckPointID() + if recordLSN > lastSeenLSN { + lastSeenLSN = recordLSN + } + json, err := record.GetItems().ToJSONWithOpts(toJSONOpts) if err != nil { log.WithFields(log.Fields{ @@ -186,6 +195,15 @@ func (c *EventHubConnector) processBatch( return 0, err } + if lastSeenLSN > lastUpdatedOffset { + err = c.updateLastOffset(flowJobName, lastSeenLSN) + lastUpdatedOffset = lastSeenLSN + log.Infof("[eh] updated last offset for %s to %d", flowJobName, lastSeenLSN) + if err != nil { + return 0, fmt.Errorf("failed to update last offset: %v", err) + } + } + ticker.Stop() ticker = time.NewTicker(eventHubFlushTimeout) }