Skip to content

Commit

Permalink
check limit after normalise
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 13, 2024
1 parent 3f4bec5 commit 7b5ee53
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,11 @@ func CDCFlowWorkflow(
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignalProperties, _ bool) {
cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger)
slog.Info("value of signal", slog.Any("signal", cdcFlowData.Signal))
slog.Info("cdc signal val", slog.Any("val", val))
state.ActiveSignal = cdcFlowData.Signal
syncCountLimit = val.CustomNumberOfSyncs
if syncCountLimit <= 0 {
syncCountLimit = MaxSyncsPerCdcFlow
}
slog.Info("sync limit reception inside pause", slog.Int("limit", syncCountLimit))
})
addCdcPropertiesSignalListener(ctx, logger, selector, state)

Expand Down Expand Up @@ -451,12 +448,6 @@ func CDCFlowWorkflow(
}

logger.Info("normalize finished, finishing")
if syncCount == int(syncCountLimit) {
logger.Info("sync count limit reached, pausing",
slog.Int("limit", syncCountLimit),
slog.Int("count", syncCount))
state.ActiveSignal = model.PauseSignal
}
normFlowFuture = nil
restart = true
finished = true
Expand Down Expand Up @@ -487,6 +478,12 @@ func CDCFlowWorkflow(
normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
normDoneChan.Drain()
normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) {
if syncCount == int(syncCountLimit) {

Check failure on line 481 in flow/workflows/cdc_flow.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary conversion (unconvert)
logger.Info("sync count limit reached, pausing",
slog.Int("limit", syncCountLimit),
slog.Int("count", syncCount))
state.ActiveSignal = model.PauseSignal
}
if syncFlowFuture != nil {
_ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil)
}
Expand Down

0 comments on commit 7b5ee53

Please sign in to comment.