diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index de4637a9a6..499c190cd4 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -295,13 +295,15 @@ func CDCFlowWorkflowWithConfig( // sync will send normalize changes; // which will be handled concurrently - syncNormChan := workflow.NewChannel(ctx) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( ctx, NormalizeFlowWorkflow, cfg, - syncNormChan, ) + var normExecution workflow.Execution + if err := childNormalizeFlowFuture.GetChildWorkflowExecution().Get(ctx, &normExecution); err != nil { + return state, fmt.Errorf("normalize workflow failed to start: %w", err) + } for { // check and act on signals before a fresh flow starts. @@ -367,7 +369,8 @@ func CDCFlowWorkflowWithConfig( SyncFlowWorkflow, cfg, syncFlowOptions, - syncNormChan, + normExecution.ID, + normExecution.RunID, ) var childSyncFlowRes *model.SyncResponse @@ -385,7 +388,7 @@ func CDCFlowWorkflowWithConfig( w.logger.Info("Total records synced: ", totalRecordsSynced) } - syncNormChan.Close() + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "StopLoop", true) var childNormalizeFlowRes *NormalizeFlowResult if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { w.logger.Error("failed to execute normalize flow: ", err) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 3d7d2d8b5b..c827dd4d1d 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -32,8 +32,9 @@ func NewNormalizeFlowExecution(ctx workflow.Context) *NormalizeFlowExecution { func NormalizeFlowWorkflow( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, - syncNormChan workflow.Channel, ) (*NormalizeFlowResult, error) { + schemaDeltas := workflow.GetSignalChannel(ctx, "SchemaDelta") + stopLoopChan := workflow.GetSignalChannel(ctx, "StopLoop") w := NewCDCFlowWorkflowExecution(ctx) res := NormalizeFlowResult{} @@ -52,41 +53,52 @@ func NormalizeFlowWorkflow( } ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) + var stopLoop bool for { - var tableSchemaDeltas []*protos.TableSchemaDelta - if !syncNormChan.Receive(ctx, &tableSchemaDeltas) { - break + // Sequence channel checks carefully to avoid race condition; + // must check & process all schema deltas before breaking loop + if !stopLoop { + var stopLoopVal bool + stopLoop = stopLoopChan.ReceiveAsync(&stopLoopVal) && stopLoopVal } - // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - if tableSchemaDeltas != nil { - modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas)) - modifiedDstTables := make([]string, 0, len(tableSchemaDeltas)) - - for _, tableSchemaDelta := range tableSchemaDeltas { - modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) - modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) - } + var tableSchemaDeltas []*protos.TableSchemaDelta + received, _ := schemaDeltas.ReceiveWithTimeout(ctx, 5*time.Second, &tableSchemaDeltas) + if received { + if len(tableSchemaDeltas) != 0 { + // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. + modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas)) + modifiedDstTables := make([]string, 0, len(tableSchemaDeltas)) + + for _, tableSchemaDelta := range tableSchemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) + } - getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, - &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: cfg.Source, - TableIdentifiers: modifiedSrcTables, + getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, }) - - var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput - if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { - w.logger.Error("failed to execute schema update at source: ", err) - res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err) - } else { - for i := range modifiedSrcTables { - cfg.TableNameSchemaMapping[modifiedDstTables[i]] = - getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] + getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, + &protos.GetTableSchemaBatchInput{ + PeerConnectionConfig: cfg.Source, + TableIdentifiers: modifiedSrcTables, + }) + + var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput + if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { + w.logger.Error("failed to execute schema update at source: ", err) + res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err) + } else { + for i := range modifiedSrcTables { + cfg.TableNameSchemaMapping[modifiedDstTables[i]] = + getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] + } } } + } else if stopLoop { + break + } else { + continue } s := NewNormalizeFlowExecution(ctx) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 2e38c8c352..a969cd0e3a 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -36,7 +36,8 @@ func (s *SyncFlowExecution) executeSyncFlow( config *protos.FlowConnectionConfigs, opts *protos.SyncFlowOptions, relationMessageMapping model.RelationMessageMapping, - syncNormChan workflow.Channel, + normFlowId string, + normFlowRunId string, ) (*model.SyncResponse, error) { s.logger.Info("executing sync flow - ", s.CDCFlowName) @@ -96,7 +97,7 @@ func (s *SyncFlowExecution) executeSyncFlow( return nil, fmt.Errorf("failed to replay schema delta: %w", err) } - syncNormChan.Send(ctx, syncRes.TableSchemaDeltas) + workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", syncRes.TableSchemaDeltas) return syncRes, nil } @@ -107,7 +108,8 @@ func (s *SyncFlowExecution) executeSyncFlow( func SyncFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, - syncNormChan workflow.Channel, + normFlowId string, + normFlowRunId string, ) (*model.SyncResponse, error) { s := NewSyncFlowExecution(ctx, &SyncFlowState{ CDCFlowName: config.FlowJobName, @@ -119,6 +121,7 @@ func SyncFlowWorkflow(ctx workflow.Context, config, options, options.RelationMessageMapping, - syncNormChan, + normFlowId, + normFlowRunId, ) }