From 8e2a2d1ed90f9558a268ad363711c0840192ce07 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 27 May 2024 08:19:06 -0400 Subject: [PATCH] remove session id check --- flow/activities/flowable_core.go | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index ad3dca378a..e5c56ee59e 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,7 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -78,25 +78,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - var srcConn TPull - if sessionID == "" { - srcConn, err = connectors.GetAs[TPull](ctx, config.Source) - if err != nil { - return nil, err - } - defer connectors.CloseConnector(ctx, srcConn) - - if err := srcConn.SetupReplConn(ctx); err != nil { - return nil, err - } - } else { - srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID) - if err != nil { - return nil, err - } - if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) - } + srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID) + if err != nil { + return nil, err + } + if err := srcConn.ConnectionActive(ctx); err != nil { + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } shutdown := utils.HeartbeatRoutine(ctx, func() string {