From abf99dd63ce324e445ff0bad6ceff2779f601542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sun, 24 Dec 2023 01:36:28 +0000 Subject: [PATCH] Normalize loop runs concurrently to sync loop --- flow/workflows/cdc_flow.go | 79 ++++++++++++++++++-------------- flow/workflows/normalize_flow.go | 40 +++++++++++----- 2 files changed, 73 insertions(+), 46 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d81a418320..d971766698 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -37,7 +37,7 @@ type CDCFlowWorkflowState struct { // Accumulates status for sync flows spawned. SyncFlowStatuses []*model.SyncResponse // Accumulates status for sync flows spawned. - NormalizeFlowStatuses []*model.NormalizeResponse + NormalizeFlowStatuses []model.NormalizeResponse // Current signalled state of the peer flow. ActiveSignal shared.CDCFlowSignal // SetupComplete indicates whether the peer flow setup has completed. @@ -156,6 +156,7 @@ func CDCFlowWorkflowWithConfig( return nil, fmt.Errorf("invalid connection configs") } + ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) w := NewCDCFlowWorkflowExecution(ctx) if limits.TotalSyncFlows == 0 { @@ -302,6 +303,34 @@ func CDCFlowWorkflowWithConfig( currentSyncFlowNum := 0 totalRecordsSynced := 0 + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) + if err != nil { + return state, err + } + + mirrorNameSearch := map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + } + + childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: normalizeFlowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 20, + }, + SearchAttributes: mirrorNameSearch, + } + normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) + childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( + normCtx, + NormalizeFlowWorkflow, + cfg, + ) + var normExecution workflow.Execution + if err := childNormalizeFlowFuture.GetChildWorkflowExecution().Get(ctx, &normExecution); err != nil { + return state, fmt.Errorf("normalize workflow failed to start: %w", err) + } + for { // check and act on signals before a fresh flow starts. w.receiveAndHandleSignalAsync(ctx, state) @@ -322,6 +351,7 @@ func CDCFlowWorkflowWithConfig( } // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true) w.logger.Info("peer flow has been shutdown") return state, nil } @@ -344,12 +374,10 @@ func CDCFlowWorkflowWithConfig( syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName) if err != nil { + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true) return state, err } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } // execute the sync flow as a child workflow childSyncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, @@ -359,11 +387,10 @@ func CDCFlowWorkflowWithConfig( }, SearchAttributes: mirrorNameSearch, } - ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts) - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) + syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( - ctx, + syncCtx, SyncFlowWorkflow, cfg, syncFlowOptions, @@ -382,21 +409,8 @@ func CDCFlowWorkflowWithConfig( } w.logger.Info("Total records synced: ", totalRecordsSynced) + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", false) - normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) - if err != nil { - return state, err - } - - childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: normalizeFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - SearchAttributes: mirrorNameSearch, - } - ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) var tableSchemaDeltas []*protos.TableSchemaDelta = nil if childSyncFlowRes != nil { tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas @@ -415,7 +429,6 @@ func CDCFlowWorkflowWithConfig( getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) - getModifiedSchemaCtx = workflow.WithValue(getModifiedSchemaCtx, "flowName", cfg.FlowJobName) getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, &protos.GetTableSchemaBatchInput{ PeerConnectionConfig: cfg.Source, @@ -432,23 +445,19 @@ func CDCFlowWorkflowWithConfig( } } } - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) - childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( - ctx, - NormalizeFlowWorkflow, - cfg, - ) - var childNormalizeFlowRes *model.NormalizeResponse - if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { - w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) - } else { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) - } batchSizeSelector.Select(ctx) } + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true) + var childNormalizeFlowRes []model.NormalizeResponse + if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { + w.logger.Error("failed to execute normalize flow: ", err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) + } else { + state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes...) + } + state.TruncateProgress(w.logger) return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index af14e11b8f..04503651b4 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -31,7 +31,7 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) func NormalizeFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, -) (*model.NormalizeResponse, error) { +) ([]model.NormalizeResponse, error) { s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ CDCFlowName: config.FlowJobName, Progress: []string{}, @@ -43,7 +43,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context, func (s *NormalizeFlowExecution) executeNormalizeFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, -) (*model.NormalizeResponse, error) { +) ([]model.NormalizeResponse, error) { s.logger.Info("executing normalize flow - ", s.CDCFlowName) normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -51,16 +51,34 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( HeartbeatTimeout: 5 * time.Minute, }) - // execute StartFlow on the peers to start the flow - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) + result := make([]model.NormalizeResponse, 0) + syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync") + + stopLoop := false + for stopLoop { + var stopLoopVal bool + var anyFalse bool + syncChan.Receive(normalizeFlowCtx, &stopLoopVal) + stopLoop = stopLoop || stopLoopVal + anyFalse = anyFalse || !stopLoopVal + for syncChan.ReceiveAsync(&stopLoopVal) { + stopLoop = stopLoop || stopLoopVal + anyFalse = anyFalse || !stopLoopVal + } + + if anyFalse { + startNormalizeInput := &protos.StartNormalizeInput{ + FlowConnectionConfigs: config, + } + fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - return nil, fmt.Errorf("failed to flow: %w", err) + var normalizeResponse *model.NormalizeResponse + if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { + return result, fmt.Errorf("failed to flow: %w", err) + } + result = append(result, *normalizeResponse) + } } - return normalizeResponse, nil + return result, nil }