Skip to content

Commit

Permalink
Enable oneSync always (#1755)
Browse files Browse the repository at this point in the history
This feature has been stabilized and no longer needs a disable flag.
  • Loading branch information
iskakaushik authored May 27, 2024
1 parent 3485158 commit 19ea9ea
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 55 deletions.
27 changes: 7 additions & 20 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -78,25 +78,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

var srcConn TPull
if sessionID == "" {
srcConn, err = connectors.GetAs[TPull](ctx, config.Source)
if err != nil {
return nil, err
}
defer connectors.CloseConnector(ctx, srcConn)

if err := srcConn.SetupReplConn(ctx); err != nil {
return nil, err
}
} else {
srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}
srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
Expand Down
5 changes: 0 additions & 5 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ func PeerDBCDCDiskSpillMemPercentThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync() bool {
return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false)
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
Expand Down
52 changes: 22 additions & 30 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,30 @@ func SyncFlowWorkflow(
parent := workflow.GetInfo(ctx).ParentWorkflowExecution
logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

enableOneSync := GetSideEffect(ctx, func(_ workflow.Context) bool {
return !peerdbenv.PeerDBDisableOneSync()
})
var fMaintain workflow.Future
var sessionID string
syncSessionCtx := ctx
if enableOneSync {
sessionOptions := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: 144 * time.Hour,
HeartbeatTimeout: time.Minute,
}
var err error
syncSessionCtx, err = workflow.CreateSession(ctx, sessionOptions)
if err != nil {
return err
}
defer workflow.CompleteSession(syncSessionCtx)
sessionID = workflow.GetSessionInfo(syncSessionCtx).SessionID
sessionOptions := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
}

maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
fMaintain = workflow.ExecuteActivity(
maintainCtx,
flowable.MaintainPull,
config,
sessionID,
)
syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions)
if err != nil {
return err
}
defer workflow.CompleteSession(syncSessionCtx)

sessionID := workflow.GetSessionInfo(syncSessionCtx).SessionID
maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{
StartToCloseTimeout: 14 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
fMaintain := workflow.ExecuteActivity(
maintainCtx,
flowable.MaintainPull,
config,
sessionID,
)

var stop, syncErr bool
currentSyncFlowNum := 0
Expand Down

0 comments on commit 19ea9ea

Please sign in to comment.