Skip to content

Commit

Permalink
workflows: reading environment is a side effect (#1370)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Feb 25, 2024
1 parent db5c4ee commit 876d477
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
23 changes: 17 additions & 6 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,24 @@ func NewCDCFlowWorkflowExecution(ctx workflow.Context, flowName string) *CDCFlow
}
}

func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) (T, error) {
sideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return f(ctx)
})

var result T
err := sideEffect.Get(&result)
return result, err
}

func GetUUID(ctx workflow.Context) (string, error) {
uuidSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
uuid, err := GetSideEffect(ctx, func(_ workflow.Context) string {
return uuid.New().String()
})

var uuidString string
if err := uuidSideEffect.Get(&uuidString); err != nil {
if err != nil {
return "", fmt.Errorf("failed to generate UUID: %w", err)
}
return uuidString, nil
return uuid, nil
}

func GetChildWorkflowID(
Expand Down Expand Up @@ -427,7 +435,10 @@ func CDCFlowWorkflowWithConfig(
)

var normWaitChan model.TypedReceiveChannel[struct{}]
if !peerdbenv.PeerDBEnableParallelSyncNormalize() {
parallel, _ := GetSideEffect(ctx, func(_ workflow.Context) bool {
return peerdbenv.PeerDBEnableParallelSyncNormalize()
})
if !parallel {
normWaitChan = model.NormalizeSyncDoneSignal.GetSignalChannel(ctx)
}

Expand Down
7 changes: 6 additions & 1 deletion flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func NormalizeFlowWorkflow(
}
tableNameSchemaMapping = s.TableNameSchemaMapping
})

parallel, _ := GetSideEffect(ctx, func(_ workflow.Context) bool {
return peerdbenv.PeerDBEnableParallelSyncNormalize()
})

for !stopLoop {
selector.Select(ctx)
for !canceled && selector.HasPending() {
Expand Down Expand Up @@ -78,7 +83,7 @@ func NormalizeFlowWorkflow(
}
}

if !peerdbenv.PeerDBEnableParallelSyncNormalize() {
if !parallel {
parent := workflow.GetInfo(ctx).ParentWorkflowExecution
model.NormalizeSyncDoneSignal.SignalExternalWorkflow(
ctx,
Expand Down

0 comments on commit 876d477

Please sign in to comment.