From 76e6e7887dae4617db4cbf3579269f9687140c2a Mon Sep 17 00:00:00 2001
From: Kaushik Iska <iska.kaushik@gmail.com>
Date: Fri, 24 Nov 2023 10:11:14 -0500
Subject: [PATCH] [eventhubs] Update last commited LSN on metadata after we
 flush (#709)

If the sync flow fails this would make it so that the metadata is more
up-to data. Prior to the change the metadata would only be updated once
per sync flow, now this is done every 10s, so its more updated.
---
 flow/connectors/eventhub/eventhub.go | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go
index 027953fd79..6c0f6b4ac9 100644
--- a/flow/connectors/eventhub/eventhub.go
+++ b/flow/connectors/eventhub/eventhub.go
@@ -133,6 +133,9 @@ func (c *EventHubConnector) processBatch(
 	ticker := time.NewTicker(eventHubFlushTimeout)
 	defer ticker.Stop()
 
+	lastSeenLSN := int64(0)
+	lastUpdatedOffset := int64(0)
+
 	numRecords := 0
 	for {
 		select {
@@ -150,6 +153,12 @@ func (c *EventHubConnector) processBatch(
 			}
 
 			numRecords++
+
+			recordLSN := record.GetCheckPointID()
+			if recordLSN > lastSeenLSN {
+				lastSeenLSN = recordLSN
+			}
+
 			json, err := record.GetItems().ToJSONWithOpts(toJSONOpts)
 			if err != nil {
 				log.WithFields(log.Fields{
@@ -186,6 +195,15 @@ func (c *EventHubConnector) processBatch(
 				return 0, err
 			}
 
+			if lastSeenLSN > lastUpdatedOffset {
+				err = c.updateLastOffset(flowJobName, lastSeenLSN)
+				lastUpdatedOffset = lastSeenLSN
+				log.Infof("[eh] updated last offset for %s to %d", flowJobName, lastSeenLSN)
+				if err != nil {
+					return 0, fmt.Errorf("failed to update last offset: %v", err)
+				}
+			}
+
 			ticker.Stop()
 			ticker = time.NewTicker(eventHubFlushTimeout)
 		}