Skip to content

Commit

Permalink
normalize_flow: always do one normalize before waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 27, 2023
1 parent 94bb3f5 commit 8c2e616
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
2 changes: 1 addition & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func CDCFlowWorkflowWithConfig(
}
}

batchSizeSelector.Select(ctx)
batchSizeSelector.Select(ctx)
}

workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
Expand Down
34 changes: 20 additions & 14 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,9 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow(
syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync")

stopLoop := false
for !stopLoop {
var stopLoopVal bool
var anyFalse bool
syncChan.Receive(normalizeFlowCtx, &stopLoopVal)
stopLoop = stopLoop || stopLoopVal
anyFalse = anyFalse || !stopLoopVal
for syncChan.ReceiveAsync(&stopLoopVal) {
stopLoop = stopLoop || stopLoopVal
anyFalse = anyFalse || !stopLoopVal
}

if anyFalse {
needSync := true
for {
if needSync {
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
Expand All @@ -78,7 +69,22 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow(
}
result = append(result, *normalizeResponse)
}
}

return result, nil
if !stopLoop {
var stopLoopVal bool
syncChan.Receive(normalizeFlowCtx, &stopLoopVal)
stopLoop = stopLoop || stopLoopVal
needSync = !stopLoopVal
for syncChan.ReceiveAsync(&stopLoopVal) {
stopLoop = stopLoop || stopLoopVal
needSync = needSync || !stopLoopVal
}
} else {
needSync = false
}

if stopLoop && !needSync {
return result, nil
}
}
}

0 comments on commit 8c2e616

Please sign in to comment.