diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index ad3dca378a..e5c56ee59e 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,7 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -78,25 +78,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - var srcConn TPull - if sessionID == "" { - srcConn, err = connectors.GetAs[TPull](ctx, config.Source) - if err != nil { - return nil, err - } - defer connectors.CloseConnector(ctx, srcConn) - - if err := srcConn.SetupReplConn(ctx); err != nil { - return nil, err - } - } else { - srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID) - if err != nil { - return nil, err - } - if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) - } + srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID) + if err != nil { + return nil, err + } + if err := srcConn.ConnectionActive(ctx); err != nil { + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } shutdown := utils.HeartbeatRoutine(ctx, func() string {