diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c3d375ee70..de4637a9a6 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -293,6 +293,16 @@ func CDCFlowWorkflowWithConfig( currentSyncFlowNum := 0 totalRecordsSynced := 0 + // sync will send normalize changes; + // which will be handled concurrently + syncNormChan := workflow.NewChannel(ctx) + childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( + ctx, + NormalizeFlowWorkflow, + cfg, + syncNormChan, + ) + for { // check and act on signals before a fresh flow starts. w.receiveAndHandleSignalAsync(ctx, state) @@ -357,6 +367,7 @@ func CDCFlowWorkflowWithConfig( SyncFlowWorkflow, cfg, syncFlowOptions, + syncNormChan, ) var childSyncFlowRes *model.SyncResponse @@ -372,8 +383,16 @@ func CDCFlowWorkflowWithConfig( } w.logger.Info("Total records synced: ", totalRecordsSynced) + } - /* TODO send childSyncFlowRes.TableSchemaDeltas */ + syncNormChan.Close() + var childNormalizeFlowRes *NormalizeFlowResult + if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { + w.logger.Error("failed to execute normalize flow: ", err) + state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err) + } else { + state.NormalizeFlowStatuses = childNormalizeFlowRes.NormalizeFlowStatuses + state.NormalizeFlowErrors = childNormalizeFlowRes.NormalizeFlowErrors } // cancel the SendWalHeartbeat activity diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index a3ea56fe6a..4278dd2989 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -1,7 +1,6 @@ package peerflow import ( - "fmt" "time" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -13,20 +12,18 @@ import ( "go.temporal.io/sdk/workflow" ) -type NormalizeFlowState struct { - CDCFlowName string - Progress []string -} - type NormalizeFlowExecution struct { - NormalizeFlowState executionID string logger log.Logger } -func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution { +type NormalizeFlowResult struct { + NormalizeFlowStatuses []*model.NormalizeResponse + NormalizeFlowErrors error +} + +func NewNormalizeFlowExecution(ctx workflow.Context) *NormalizeFlowExecution { return &NormalizeFlowExecution{ - NormalizeFlowState: *state, executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, logger: workflow.GetLogger(ctx), } @@ -35,17 +32,15 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) func NormalizeFlowWorkflow( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, - state *CDCFlowWorkflowState, -) (*CDCFlowWorkflowResult, error) { - if state == nil { - state = NewCDCFlowWorkflowState() - } - + syncNormChan workflow.Channel, +) (*NormalizeFlowResult, error) { w := NewCDCFlowWorkflowExecution(ctx) + res := NormalizeFlowResult {} + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) if err != nil { - return state, err + return nil, err } childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ @@ -57,85 +52,66 @@ func NormalizeFlowWorkflow( } ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) - /* TODO LISTEN FOR TABLE SCHEMA DELTAS */ - var tableSchemaDeltas []*protos.TableSchemaDelta = nil - /* - if childSyncFlowRes != nil { - tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas + for { + var tableSchemaDeltasRecv interface{} + if !syncNormChan.Receive(ctx, &tableSchemaDeltasRecv) { + break } - */ + var tableSchemaDeltas []*protos.TableSchemaDelta = tableSchemaDeltasRecv.([]*protos.TableSchemaDelta) - // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - if tableSchemaDeltas != nil { - modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas)) - modifiedDstTables := make([]string, 0, len(tableSchemaDeltas)) + // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. + if tableSchemaDeltas != nil { + modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas)) + modifiedDstTables := make([]string, 0, len(tableSchemaDeltas)) - for _, tableSchemaDelta := range tableSchemaDeltas { - modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) - modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) - } + for _, tableSchemaDelta := range tableSchemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName) + } - getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, + getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, &protos.GetTableSchemaBatchInput{ PeerConnectionConfig: cfg.Source, TableIdentifiers: modifiedSrcTables, }) - var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput - if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { - w.logger.Error("failed to execute schema update at source: ", err) - state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err) - } else { - for i := range modifiedSrcTables { - cfg.TableNameSchemaMapping[modifiedDstTables[i]] = + var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput + if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { + w.logger.Error("failed to execute schema update at source: ", err) + res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err) + } else { + for i := range modifiedSrcTables { + cfg.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] + } } } - } - - childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( - ctx, - NormalizeFlowWorkflow, - cfg, - ) - - selector := workflow.NewSelector(ctx) - selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) { - var childNormalizeFlowRes *model.NormalizeResponse - if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { - w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err) - } else { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) - } - }) - selector.Select(ctx) - s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ - CDCFlowName: cfg.FlowJobName, - Progress: []string{}, - }) + s := NewNormalizeFlowExecution(ctx) - s.logger.Info("executing normalize flow - ", s.CDCFlowName) + s.logger.Info("executing normalize flow - ", cfg.FlowJobName) - normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - HeartbeatTimeout: 5 * time.Minute, - }) + normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 7 * 24 * time.Hour, + HeartbeatTimeout: 5 * time.Minute, + }) - // execute StartFlow on the peers to start the flow - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: cfg, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) + // execute StartFlow on the peers to start the flow + startNormalizeInput := &protos.StartNormalizeInput{ + FlowConnectionConfigs: cfg, + } + fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - return nil, fmt.Errorf("failed to flow: %w", err) + var normalizeResponse *model.NormalizeResponse + if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { + res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err) + } else { + res.NormalizeFlowStatuses = append(res.NormalizeFlowStatuses, normalizeResponse) + } } - return state, nil + return &res, nil } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 3ee45aecf9..2e38c8c352 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -36,6 +36,7 @@ func (s *SyncFlowExecution) executeSyncFlow( config *protos.FlowConnectionConfigs, opts *protos.SyncFlowOptions, relationMessageMapping model.RelationMessageMapping, + syncNormChan workflow.Channel, ) (*model.SyncResponse, error) { s.logger.Info("executing sync flow - ", s.CDCFlowName) @@ -95,6 +96,8 @@ func (s *SyncFlowExecution) executeSyncFlow( return nil, fmt.Errorf("failed to replay schema delta: %w", err) } + syncNormChan.Send(ctx, syncRes.TableSchemaDeltas) + return syncRes, nil } @@ -104,11 +107,18 @@ func (s *SyncFlowExecution) executeSyncFlow( func SyncFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, + syncNormChan workflow.Channel, ) (*model.SyncResponse, error) { s := NewSyncFlowExecution(ctx, &SyncFlowState{ CDCFlowName: config.FlowJobName, Progress: []string{}, }) - return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping) + return s.executeSyncFlow( + ctx, + config, + options, + options.RelationMessageMapping, + syncNormChan, + ) }