From 4b70a4154672912109318a13bb371f3ecbe19c33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 28 Feb 2024 15:27:30 +0000 Subject: [PATCH] cdc: don't send normalize-done after sync flow finished Avoids a benign error in temporal logs when pausing mirror --- flow/workflows/cdc_flow.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 3404bbfa25..0fa1110a85 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -214,7 +214,6 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( selector workflow.Selector, state *CDCFlowWorkflowState, ) { - // add a signal to change CDC properties cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx) cdcPropertiesSignalChan.AddToSelector(selector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) { // only modify for options since SyncFlow uses it @@ -240,21 +239,11 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( } func (w *CDCFlowWorkflowExecution) startSyncFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions) { - w.syncFlowFuture = workflow.ExecuteChildWorkflow( - ctx, - SyncFlowWorkflow, - config, - options, - ) + w.syncFlowFuture = workflow.ExecuteChildWorkflow(ctx, SyncFlowWorkflow, config, options) } func (w *CDCFlowWorkflowExecution) startNormFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs) { - w.normFlowFuture = workflow.ExecuteChildWorkflow( - ctx, - NormalizeFlowWorkflow, - config, - nil, - ) + w.normFlowFuture = workflow.ExecuteChildWorkflow(ctx, NormalizeFlowWorkflow, config, nil) } func CDCFlowWorkflow( @@ -499,6 +488,7 @@ func CDCFlowWorkflow( if restart { w.logger.Info("sync finished, finishing normalize") + w.syncFlowFuture = nil _ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, model.NormalizePayload{ Done: true, SyncBatchID: -1, @@ -519,6 +509,7 @@ func CDCFlowWorkflow( if restart { w.logger.Info("normalize finished") + w.normFlowFuture = nil finished = true } else { w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err)) @@ -576,7 +567,9 @@ func CDCFlowWorkflow( if !parallel { normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { - _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, x).Get(ctx, nil) + if w.syncFlowFuture != nil { + _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, x).Get(ctx, nil) + } }) }