diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index ad3dca378a..e5c56ee59e 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,7 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -78,25 +78,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - var srcConn TPull - if sessionID == "" { - srcConn, err = connectors.GetAs[TPull](ctx, config.Source) - if err != nil { - return nil, err - } - defer connectors.CloseConnector(ctx, srcConn) - - if err := srcConn.SetupReplConn(ctx); err != nil { - return nil, err - } - } else { - srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID) - if err != nil { - return nil, err - } - if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) - } + srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID) + if err != nil { + return nil, err + } + if err := srcConn.ConnectionActive(ctx); err != nil { + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } shutdown := utils.HeartbeatRoutine(ctx, func() string { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 895b9376a6..bcf1ac050e 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -66,11 +66,6 @@ func PeerDBCDCDiskSpillMemPercentThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } -// PEERDB_DISABLE_ONE_SYNC -func PeerDBDisableOneSync() bool { - return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) -} - // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum func PeerDBFlowWorkerMaxMemBytes() uint64 { return getEnvUint[uint64]("GOMEMLIMIT", 0) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index e421759909..711928ef5c 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -30,38 +30,30 @@ func SyncFlowWorkflow( parent := workflow.GetInfo(ctx).ParentWorkflowExecution logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - enableOneSync := GetSideEffect(ctx, func(_ workflow.Context) bool { - return !peerdbenv.PeerDBDisableOneSync() - }) - var fMaintain workflow.Future - var sessionID string - syncSessionCtx := ctx - if enableOneSync { - sessionOptions := &workflow.SessionOptions{ - CreationTimeout: 5 * time.Minute, - ExecutionTimeout: 144 * time.Hour, - HeartbeatTimeout: time.Minute, - } - var err error - syncSessionCtx, err = workflow.CreateSession(ctx, sessionOptions) - if err != nil { - return err - } - defer workflow.CompleteSession(syncSessionCtx) - sessionID = workflow.GetSessionInfo(syncSessionCtx).SessionID + sessionOptions := &workflow.SessionOptions{ + CreationTimeout: 5 * time.Minute, + ExecutionTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + } - maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 14 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - fMaintain = workflow.ExecuteActivity( - maintainCtx, - flowable.MaintainPull, - config, - sessionID, - ) + syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) + if err != nil { + return err } + defer workflow.CompleteSession(syncSessionCtx) + + sessionID := workflow.GetSessionInfo(syncSessionCtx).SessionID + maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + fMaintain := workflow.ExecuteActivity( + maintainCtx, + flowable.MaintainPull, + config, + sessionID, + ) var stop, syncErr bool currentSyncFlowNum := 0