From b30da813e9c7a81db043e510cb5f23f685172eae Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Mon, 20 May 2024 09:40:36 +0530 Subject: [PATCH 1/3] fix: close and recreate destination when needed --- flow/activities/flowable_core.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index ad3dca378a..7d48d9834f 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -113,6 +113,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon if err != nil { return nil, err } + connectors.CloseConnector(ctx, dstConn) logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) consumedOffset := atomic.Int64{} consumedOffset.Store(lastOffset) @@ -142,6 +143,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon hasRecords := !recordBatch.WaitAndCheckEmpty() logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords)) + dstConn, err = connectors.GetAs[TSync](ctx, config.Destination) + if err != nil { + return nil, fmt.Errorf("failed to recreate destination connector: %w", err) + } + if !hasRecords { // wait for the pull goroutine to finish err = errGroup.Wait() From b404aaaabbc8aa3ef1e212233aa1e2896410e812 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj <amogh@peerdb.io> Date: Fri, 24 May 2024 21:01:24 +0530 Subject: [PATCH 2/3] move the dst connector retreival further down --- flow/activities/flowable_core.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 7d48d9834f..49059ac8e3 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,7 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -143,11 +143,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon hasRecords := !recordBatch.WaitAndCheckEmpty() logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords)) - dstConn, err = connectors.GetAs[TSync](ctx, config.Destination) - if err != nil { - return nil, fmt.Errorf("failed to recreate destination connector: %w", err) - } - if !hasRecords { // wait for the pull goroutine to finish err = errGroup.Wait() @@ -161,6 +156,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } logger.Info("no records to push") + dstConn, err = connectors.GetAs[TSync](ctx, config.Destination) + if err != nil { + return nil, fmt.Errorf("failed to recreate destination connector: %w", err) + } + err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatch.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) From c23151e00752d1b524fae1354f1bc0c79e9b31f0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj <amogh@peerdb.io> Date: Sat, 25 May 2024 00:40:19 +0530 Subject: [PATCH 3/3] Revert "move the dst connector retreival further down" This reverts commit b404aaaabbc8aa3ef1e212233aa1e2896410e812. --- flow/activities/flowable_core.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 49059ac8e3..7d48d9834f 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,7 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - lua "github.com/yuin/gopher-lua" + "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -143,6 +143,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon hasRecords := !recordBatch.WaitAndCheckEmpty() logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords)) + dstConn, err = connectors.GetAs[TSync](ctx, config.Destination) + if err != nil { + return nil, fmt.Errorf("failed to recreate destination connector: %w", err) + } + if !hasRecords { // wait for the pull goroutine to finish err = errGroup.Wait() @@ -156,11 +161,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } logger.Info("no records to push") - dstConn, err = connectors.GetAs[TSync](ctx, config.Destination) - if err != nil { - return nil, fmt.Errorf("failed to recreate destination connector: %w", err) - } - err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatch.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err)