Skip to content

Commit

Permalink
Add environment variable to disable one-sync (#1488)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 15, 2024
1 parent fe4fdef commit bc4495a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
21 changes: 15 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,21 @@ func (a *FlowableActivity) SyncFlow(
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

srcConn, err := a.waitForCdcCache(ctx, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
var srcConn connectors.CDCPullConnector
if sessionID == "" {
srcConn, err = connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
return nil, err
}
defer connectors.CloseConnector(ctx, srcConn)
} else {
srcConn, err = a.waitForCdcCache(ctx, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand Down
5 changes: 5 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func PeerDBCDCDiskSpillMemPercentThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync() bool {
return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false)
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
Expand Down
66 changes: 36 additions & 30 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,49 @@ func SyncFlowWorkflow(
parent := workflow.GetInfo(ctx).ParentWorkflowExecution
logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

sessionOptions := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: 144 * time.Hour,
HeartbeatTimeout: time.Minute,
}
syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions)
if err != nil {
return err
}
defer workflow.CompleteSession(syncSessionCtx)
sessionInfo := workflow.GetSessionInfo(syncSessionCtx)
var fMaintain workflow.Future
var sessionID string
if !peerdbenv.PeerDBDisableOneSync() {
sessionOptions := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: 144 * time.Hour,
HeartbeatTimeout: time.Minute,
}
syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions)
if err != nil {
return err
}
defer workflow.CompleteSession(syncSessionCtx)
sessionID = workflow.GetSessionInfo(syncSessionCtx).SessionID

maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
fMaintain := workflow.ExecuteActivity(
maintainCtx,
flowable.MaintainPull,
config,
sessionInfo.SessionID,
)
maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
fMaintain = workflow.ExecuteActivity(
maintainCtx,
flowable.MaintainPull,
config,
sessionID,
)
}

var stop, syncErr bool
currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

selector := workflow.NewNamedSelector(ctx, "SyncLoop")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
selector.AddFuture(fMaintain, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
syncErr = true
}
})
if fMaintain != nil {
selector.AddFuture(fMaintain, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("MaintainPull failed", slog.Any("error", err))
syncErr = true
}
})
}

stopChan := model.SyncStopSignal.GetSignalChannel(ctx)
stopChan.AddToSelector(selector, func(_ struct{}, _ bool) {
Expand Down Expand Up @@ -100,7 +106,7 @@ func SyncFlowWorkflow(
WaitForCancellation: true,
})

syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionInfo.SessionID)
syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionID)
selector.AddFuture(syncFlowFuture, func(f workflow.Future) {
syncDone = true

Expand Down

0 comments on commit bc4495a

Please sign in to comment.