From b30da813e9c7a81db043e510cb5f23f685172eae Mon Sep 17 00:00:00 2001
From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com>
Date: Mon, 20 May 2024 09:40:36 +0530
Subject: [PATCH 1/3] fix: close and recreate destination when needed

---
 flow/activities/flowable_core.go | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go
index ad3dca378a..7d48d9834f 100644
--- a/flow/activities/flowable_core.go
+++ b/flow/activities/flowable_core.go
@@ -113,6 +113,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
 	if err != nil {
 		return nil, err
 	}
+	connectors.CloseConnector(ctx, dstConn)
 	logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
 	consumedOffset := atomic.Int64{}
 	consumedOffset.Store(lastOffset)
@@ -142,6 +143,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
 	hasRecords := !recordBatch.WaitAndCheckEmpty()
 	logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))
 
+	dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
+	if err != nil {
+		return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
+	}
+
 	if !hasRecords {
 		// wait for the pull goroutine to finish
 		err = errGroup.Wait()

From b404aaaabbc8aa3ef1e212233aa1e2896410e812 Mon Sep 17 00:00:00 2001
From: Amogh-Bharadwaj <amogh@peerdb.io>
Date: Fri, 24 May 2024 21:01:24 +0530
Subject: [PATCH 2/3] move the dst connector retreival further down

---
 flow/activities/flowable_core.go | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go
index 7d48d9834f..49059ac8e3 100644
--- a/flow/activities/flowable_core.go
+++ b/flow/activities/flowable_core.go
@@ -11,7 +11,7 @@ import (
 
 	"github.com/jackc/pgx/v5"
 	"github.com/jackc/pgx/v5/pgxpool"
-	"github.com/yuin/gopher-lua"
+	lua "github.com/yuin/gopher-lua"
 	"go.temporal.io/sdk/activity"
 	"go.temporal.io/sdk/log"
 	"go.temporal.io/sdk/temporal"
@@ -143,11 +143,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
 	hasRecords := !recordBatch.WaitAndCheckEmpty()
 	logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))
 
-	dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
-	if err != nil {
-		return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
-	}
-
 	if !hasRecords {
 		// wait for the pull goroutine to finish
 		err = errGroup.Wait()
@@ -161,6 +156,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
 		}
 		logger.Info("no records to push")
 
+		dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
+		if err != nil {
+			return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
+		}
+
 		err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatch.SchemaDeltas)
 		if err != nil {
 			return nil, fmt.Errorf("failed to sync schema: %w", err)

From c23151e00752d1b524fae1354f1bc0c79e9b31f0 Mon Sep 17 00:00:00 2001
From: Amogh-Bharadwaj <amogh@peerdb.io>
Date: Sat, 25 May 2024 00:40:19 +0530
Subject: [PATCH 3/3] Revert "move the dst connector retreival further down"

This reverts commit b404aaaabbc8aa3ef1e212233aa1e2896410e812.
---
 flow/activities/flowable_core.go | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go
index 49059ac8e3..7d48d9834f 100644
--- a/flow/activities/flowable_core.go
+++ b/flow/activities/flowable_core.go
@@ -11,7 +11,7 @@ import (
 
 	"github.com/jackc/pgx/v5"
 	"github.com/jackc/pgx/v5/pgxpool"
-	lua "github.com/yuin/gopher-lua"
+	"github.com/yuin/gopher-lua"
 	"go.temporal.io/sdk/activity"
 	"go.temporal.io/sdk/log"
 	"go.temporal.io/sdk/temporal"
@@ -143,6 +143,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
 	hasRecords := !recordBatch.WaitAndCheckEmpty()
 	logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))
 
+	dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
+	if err != nil {
+		return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
+	}
+
 	if !hasRecords {
 		// wait for the pull goroutine to finish
 		err = errGroup.Wait()
@@ -156,11 +161,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
 		}
 		logger.Info("no records to push")
 
-		dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
-		if err != nil {
-			return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
-		}
-
 		err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatch.SchemaDeltas)
 		if err != nil {
 			return nil, fmt.Errorf("failed to sync schema: %w", err)