From 8ef5c03362f93369b665f1bfe552a63990418316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 15 Mar 2024 17:39:48 +0000 Subject: [PATCH] Add environment variable to disable one-sync --- flow/activities/flowable.go | 21 ++++++++---- flow/peerdbenv/config.go | 5 +++ flow/workflows/sync_flow.go | 66 ++++++++++++++++++++----------------- 3 files changed, 56 insertions(+), 36 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index afd7b3704e..bf7f9fd678 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -292,12 +292,21 @@ func (a *FlowableActivity) SyncFlow( tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - srcConn, err := a.waitForCdcCache(ctx, sessionID) - if err != nil { - return nil, err - } - if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) + var srcConn connectors.CDCPullConnector + if sessionID == "" { + srcConn, err = connectors.GetCDCPullConnector(ctx, config.Source) + if err != nil { + return nil, err + } + defer connectors.CloseConnector(ctx, srcConn) + } else { + srcConn, err = a.waitForCdcCache(ctx, sessionID) + if err != nil { + return nil, err + } + if err := srcConn.ConnectionActive(ctx); err != nil { + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) + } } shutdown := utils.HeartbeatRoutine(ctx, func() string { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 19f391051a..1d0eacc01a 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -51,6 +51,11 @@ func PeerDBCDCDiskSpillMemPercentThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } +// PEERDB_DISABLE_ONE_SYNC +func PeerDBDisableOneSync() bool { + return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) +} + // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum func PeerDBFlowWorkerMaxMemBytes() uint64 { return getEnvUint[uint64]("GOMEMLIMIT", 0) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index ce6bf8c579..3295957049 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -28,29 +28,33 @@ func SyncFlowWorkflow( parent := workflow.GetInfo(ctx).ParentWorkflowExecution logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - sessionOptions := &workflow.SessionOptions{ - CreationTimeout: 5 * time.Minute, - ExecutionTimeout: 144 * time.Hour, - HeartbeatTimeout: time.Minute, - } - syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) - if err != nil { - return err - } - defer workflow.CompleteSession(syncSessionCtx) - sessionInfo := workflow.GetSessionInfo(syncSessionCtx) + var fMaintain workflow.Future + var sessionID string + if !peerdbenv.PeerDBDisableOneSync() { + sessionOptions := &workflow.SessionOptions{ + CreationTimeout: 5 * time.Minute, + ExecutionTimeout: 144 * time.Hour, + HeartbeatTimeout: time.Minute, + } + syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) + if err != nil { + return err + } + defer workflow.CompleteSession(syncSessionCtx) + sessionID = workflow.GetSessionInfo(syncSessionCtx).SessionID - maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 14 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - fMaintain := workflow.ExecuteActivity( - maintainCtx, - flowable.MaintainPull, - config, - sessionInfo.SessionID, - ) + maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + fMaintain = workflow.ExecuteActivity( + maintainCtx, + flowable.MaintainPull, + config, + sessionID, + ) + } var stop, syncErr bool currentSyncFlowNum := 0 @@ -58,13 +62,15 @@ func SyncFlowWorkflow( selector := workflow.NewNamedSelector(ctx, "SyncLoop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - selector.AddFuture(fMaintain, func(f workflow.Future) { - err := f.Get(ctx, nil) - if err != nil { - logger.Error("MaintainPull failed", slog.Any("error", err)) - syncErr = true - } - }) + if fMaintain != nil { + selector.AddFuture(fMaintain, func(f workflow.Future) { + err := f.Get(ctx, nil) + if err != nil { + logger.Error("MaintainPull failed", slog.Any("error", err)) + syncErr = true + } + }) + } stopChan := model.SyncStopSignal.GetSignalChannel(ctx) stopChan.AddToSelector(selector, func(_ struct{}, _ bool) { @@ -100,7 +106,7 @@ func SyncFlowWorkflow( WaitForCancellation: true, }) - syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionInfo.SessionID) + syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionID) selector.AddFuture(syncFlowFuture, func(f workflow.Future) { syncDone = true