diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e56dd83410..dc59b61f6d 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -112,16 +112,24 @@ func NewCDCFlowWorkflowExecution(ctx workflow.Context, flowName string) *CDCFlow } } +func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) (T, error) { + sideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return f(ctx) + }) + + var result T + err := sideEffect.Get(&result) + return result, err +} + func GetUUID(ctx workflow.Context) (string, error) { - uuidSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + uuid, err := GetSideEffect(ctx, func(_ workflow.Context) string { return uuid.New().String() }) - - var uuidString string - if err := uuidSideEffect.Get(&uuidString); err != nil { + if err != nil { return "", fmt.Errorf("failed to generate UUID: %w", err) } - return uuidString, nil + return uuid, nil } func GetChildWorkflowID( @@ -427,7 +435,10 @@ func CDCFlowWorkflowWithConfig( ) var normWaitChan model.TypedReceiveChannel[struct{}] - if !peerdbenv.PeerDBEnableParallelSyncNormalize() { + parallel, _ := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() + }) + if !parallel { normWaitChan = model.NormalizeSyncDoneSignal.GetSignalChannel(ctx) } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 7b19e63b9f..db1321ce32 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -46,6 +46,11 @@ func NormalizeFlowWorkflow( } tableNameSchemaMapping = s.TableNameSchemaMapping }) + + parallel, _ := GetSideEffect(ctx, func(_ workflow.Context) bool { + return peerdbenv.PeerDBEnableParallelSyncNormalize() + }) + for !stopLoop { selector.Select(ctx) for !canceled && selector.HasPending() { @@ -78,7 +83,7 @@ func NormalizeFlowWorkflow( } } - if !peerdbenv.PeerDBEnableParallelSyncNormalize() { + if !parallel { parent := workflow.GetInfo(ctx).ParentWorkflowExecution model.NormalizeSyncDoneSignal.SignalExternalWorkflow( ctx,