diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3ebf7044fb..56e4101ce0 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -204,13 +204,15 @@ func (a *FlowableActivity) CreateNormalizedTable( }, nil } -func (a *FlowableActivity) StartFlow(ctx context.Context, - input *protos.StartFlowInput, +func (a *FlowableActivity) SyncFlow( + ctx context.Context, + config *protos.FlowConnectionConfigs, + options *protos.SyncFlowOptions, ) (*model.SyncResponse, error) { - ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName) + flowName := config.FlowJobName + ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) activity.RecordHeartbeat(ctx, "starting flow...") - config := input.FlowConnectionConfigs dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -218,8 +220,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, defer connectors.CloseConnector(ctx, dstConn) logger.Info("pulling records...") - tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings)) - for _, v := range input.SyncFlowOptions.TableMappings { + tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings)) + for _, v := range options.TableMappings { tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } @@ -230,17 +232,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, defer connectors.CloseConnector(ctx, srcConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - jobName := input.FlowConnectionConfigs.FlowJobName - return fmt.Sprintf("transferring records for job - %s", jobName) + return fmt.Sprintf("transferring records for job - %s", flowName) }) defer shutdown() - batchSize := input.SyncFlowOptions.BatchSize + batchSize := options.BatchSize if batchSize <= 0 { batchSize = 1_000_000 } - lastOffset, err := dstConn.GetLastOffset(ctx, input.FlowConnectionConfigs.FlowJobName) + lastOffset, err := dstConn.GetLastOffset(ctx, config.FlowJobName) if err != nil { return nil, err } @@ -248,27 +249,26 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, // start a goroutine to pull records from the source recordBatch := model.NewCDCRecordStream() startTime := time.Now() - flowName := input.FlowConnectionConfigs.FlowJobName errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - if input.RelationMessageMapping == nil { - input.RelationMessageMapping = make(map[uint32]*protos.RelationMessage) + if options.RelationMessageMapping == nil { + options.RelationMessageMapping = make(map[uint32]*protos.RelationMessage) } return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, - SrcTableIDNameMapping: input.SrcTableIdNameMapping, + SrcTableIDNameMapping: options.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, LastOffset: lastOffset, MaxBatchSize: batchSize, IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds( - int(input.SyncFlowOptions.IdleTimeoutSeconds), + int(options.IdleTimeoutSeconds), ), - TableNameSchemaMapping: input.TableNameSchemaMapping, - OverridePublicationName: input.FlowConnectionConfigs.PublicationName, - OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, - RelationMessageMapping: input.RelationMessageMapping, + TableNameSchemaMapping: options.TableNameSchemaMapping, + OverridePublicationName: config.PublicationName, + OverrideReplicationSlotName: config.ReplicationSlotName, + RelationMessageMapping: options.RelationMessageMapping, RecordStream: recordBatch, }) }) @@ -293,7 +293,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return &model.SyncResponse{ CurrentSyncBatchID: -1, TableSchemaDeltas: recordBatch.SchemaDeltas, - RelationMessageMapping: input.RelationMessageMapping, + RelationMessageMapping: options.RelationMessageMapping, }, nil } @@ -322,16 +322,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, res, err = dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{ SyncBatchID: syncBatchID, Records: recordBatch, - FlowJobName: input.FlowConnectionConfigs.FlowJobName, - TableMappings: input.SyncFlowOptions.TableMappings, - StagingPath: input.FlowConnectionConfigs.CdcStagingPath, + FlowJobName: flowName, + TableMappings: options.TableMappings, + StagingPath: config.CdcStagingPath, }) if err != nil { logger.Warn("failed to push records", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to push records: %w", err) } - res.RelationMessageMapping = input.RelationMessageMapping + res.RelationMessageMapping = options.RelationMessageMapping return nil }) @@ -352,7 +352,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch( ctx, a.CatalogPool, - input.FlowConnectionConfigs.FlowJobName, + flowName, res.CurrentSyncBatchID, uint32(numRecords), lastCheckpoint, @@ -362,18 +362,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, err } - err = monitoring.UpdateLatestLSNAtTargetForCDCFlow( - ctx, - a.CatalogPool, - input.FlowConnectionConfigs.FlowJobName, - lastCheckpoint, - ) + err = monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint) if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } if res.TableNameRowsMapping != nil { - err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, res.TableNameRowsMapping) if err != nil { return nil, err diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 85c350c164..cda1e496de 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -95,7 +95,7 @@ func (h *FlowRequestHandler) CDCFlowStatus( if state.SyncFlowOptions != nil { config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds config.MaxBatchSize = state.SyncFlowOptions.BatchSize - config.TableMappings = state.TableMappings + config.TableMappings = state.SyncFlowOptions.TableMappings } var initialCopyStatus *protos.SnapshotStatus diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 6d8f185437..7005e32bf9 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1245,9 +1245,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { workflowState := getWorkFlowState() assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.TableMappings, 1) - assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1) - assert.Len(s.t, workflowState.TableNameSchemaMapping, 1) + assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1) + assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1) + assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1) // we have limited batch size to 6, so atleast 3 syncs needed assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3) @@ -1291,9 +1291,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { workflowState = getWorkFlowState() assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.TableMappings, 2) - assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2) - assert.Len(s.t, workflowState.TableNameSchemaMapping, 2) + assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2) + assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2) + assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2) // 3 from first insert of 18 rows in 1 table // 1 from pre-pause // 3 from second insert of 18 rows in 2 tables, batch size updated diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0ef6caf6ca..1f3fd53abb 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -41,38 +41,33 @@ type CDCFlowWorkflowState struct { // Needed to support schema changes. RelationMessageMapping model.RelationMessageMapping CurrentFlowStatus protos.FlowStatus - // moved from config here, set by SetupFlow - SrcTableIdNameMapping map[uint32]string - TableNameSchemaMapping map[string]*protos.TableSchema // flow config update request, set to nil after processed FlowConfigUpdates []*protos.CDCFlowConfigUpdate // options passed to all SyncFlows SyncFlowOptions *protos.SyncFlowOptions - // initially copied from config, all changes are made here though - TableMappings []*protos.TableMapping } // returns a new empty PeerFlowState -func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWorkflowState { - tableMappings := make([]*protos.TableMapping, 0, len(cfgTableMappings)) - for _, tableMapping := range cfgTableMappings { +func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflowState { + tableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) + for _, tableMapping := range cfg.TableMappings { tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) } return &CDCFlowWorkflowState{ Progress: []string{"started"}, // 1 more than the limit of 10 - SyncFlowStatuses: make([]*model.SyncResponse, 0, 11), - NormalizeFlowStatuses: nil, - ActiveSignal: shared.NoopSignal, - SyncFlowErrors: nil, - NormalizeFlowErrors: nil, - RelationMessageMapping: nil, - CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, - SrcTableIdNameMapping: nil, - TableNameSchemaMapping: nil, - FlowConfigUpdates: nil, - SyncFlowOptions: nil, - TableMappings: tableMappings, + SyncFlowStatuses: make([]*model.SyncResponse, 0, 11), + NormalizeFlowStatuses: nil, + ActiveSignal: shared.NoopSignal, + SyncFlowErrors: nil, + NormalizeFlowErrors: nil, + CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, + FlowConfigUpdates: nil, + SyncFlowOptions: &protos.SyncFlowOptions{ + BatchSize: cfg.MaxBatchSize, + IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, + TableMappings: tableMappings, + }, } } @@ -144,7 +139,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont if len(flowConfigUpdate.AdditionalTables) == 0 { continue } - if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) { + if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) { w.logger.Warn("duplicate source/destination tables found in additionalTables") continue } @@ -197,14 +192,13 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } - for tableID, tableName := range res.SrcTableIdNameMapping { - state.SrcTableIdNameMapping[tableID] = tableName + for tableID, tableName := range res.SyncFlowOptions.SrcTableIdNameMapping { + state.SyncFlowOptions.SrcTableIdNameMapping[tableID] = tableName } - for tableName, tableSchema := range res.TableNameSchemaMapping { - state.TableNameSchemaMapping[tableName] = tableSchema + for tableName, tableSchema := range res.SyncFlowOptions.TableNameSchemaMapping { + state.SyncFlowOptions.TableNameSchemaMapping[tableName] = tableSchema } - state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...) - state.SyncFlowOptions.TableMappings = state.TableMappings + state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) // finished processing, wipe it state.FlowConfigUpdates = nil } @@ -219,8 +213,9 @@ func CDCFlowWorkflowWithConfig( if cfg == nil { return nil, fmt.Errorf("invalid connection configs") } + if state == nil { - state = NewCDCFlowWorkflowState(cfg.TableMappings) + state = NewCDCFlowWorkflowState(cfg) } w := NewCDCFlowWorkflowExecution(ctx) @@ -258,14 +253,14 @@ func CDCFlowWorkflowWithConfig( // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { - for _, mapping := range state.TableMappings { + for _, mapping := range state.SyncFlowOptions.TableMappings { oldName := mapping.DestinationTableIdentifier newName := oldName + "_resync" mapping.DestinationTableIdentifier = newName } // because we have renamed the tables. - cfg.TableMappings = state.TableMappings + cfg.TableMappings = state.SyncFlowOptions.TableMappings } // start the SetupFlow workflow as a child workflow, and wait for it to complete @@ -286,8 +281,8 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &setupFlowOutput); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } - state.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping - state.TableNameSchemaMapping = setupFlowOutput.TableNameSchemaMapping + state.SyncFlowOptions.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping + state.SyncFlowOptions.TableNameSchemaMapping = setupFlowOutput.TableNameSchemaMapping state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT // next part of the setup is to snapshot-initial-copy and setup replication slots. @@ -323,21 +318,21 @@ func CDCFlowWorkflowWithConfig( } renameOpts.SyncedAtColName = &cfg.SyncedAtColName correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema) - for _, mapping := range state.TableMappings { + for _, mapping := range state.SyncFlowOptions.TableMappings { oldName := mapping.DestinationTableIdentifier newName := strings.TrimSuffix(oldName, "_resync") renameOpts.RenameTableOptions = append(renameOpts.RenameTableOptions, &protos.RenameTableOption{ CurrentName: oldName, NewName: newName, // oldName is what was used for the TableNameSchema mapping - TableSchema: state.TableNameSchemaMapping[oldName], + TableSchema: state.SyncFlowOptions.TableNameSchemaMapping[oldName], }) mapping.DestinationTableIdentifier = newName // TableNameSchemaMapping is referring to the _resync tables, not the actual names - correctedTableNameSchemaMapping[newName] = state.TableNameSchemaMapping[oldName] + correctedTableNameSchemaMapping[newName] = state.SyncFlowOptions.TableNameSchemaMapping[oldName] } - state.TableNameSchemaMapping = correctedTableNameSchemaMapping + state.SyncFlowOptions.TableNameSchemaMapping = correctedTableNameSchemaMapping renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: time.Minute, @@ -357,18 +352,6 @@ func CDCFlowWorkflowWithConfig( } } - // when we carry forward state, don't remake the options - if state.SyncFlowOptions == nil { - state.SyncFlowOptions = &protos.SyncFlowOptions{ - BatchSize: cfg.MaxBatchSize, - // this means the env variable assignment path is never hit - IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, - SrcTableIdNameMapping: state.SrcTableIdNameMapping, - TableNameSchemaMapping: state.TableNameSchemaMapping, - TableMappings: state.TableMappings, - } - } - currentSyncFlowNum := 0 totalRecordsSynced := int64(0) @@ -486,27 +469,19 @@ func CDCFlowWorkflowWithConfig( currentSyncFlowNum++ // execute the sync flow - startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: time.Minute, WaitForCancellation: true, }) - state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping - startFlowInput := &protos.StartFlowInput{ - FlowConnectionConfigs: cfg, - SyncFlowOptions: state.SyncFlowOptions, - RelationMessageMapping: state.SyncFlowOptions.RelationMessageMapping, - SrcTableIdNameMapping: state.SyncFlowOptions.SrcTableIdNameMapping, - TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping, - } w.logger.Info("executing sync flow", slog.String("flowName", cfg.FlowJobName)) - fStartFlow := workflow.ExecuteActivity(startFlowCtx, flowable.StartFlow, startFlowInput) + syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) var syncDone bool var normalizeSignalError error normDone := normWaitChan == nil - mainLoopSelector.AddFuture(fStartFlow, func(f workflow.Future) { + mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) { syncDone = true var childSyncFlowRes *model.SyncResponse @@ -515,7 +490,7 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else if childSyncFlowRes != nil { state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) - state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping + state.SyncFlowOptions.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping totalRecordsSynced += childSyncFlowRes.NumRecordsSynced w.logger.Info("Total records synced: ", slog.Int64("totalRecordsSynced", totalRecordsSynced)) @@ -550,7 +525,7 @@ func CDCFlowWorkflowWithConfig( } else { for i, srcTable := range modifiedSrcTables { dstTable := modifiedDstTables[i] - state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] + state.SyncFlowOptions.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] } } } @@ -558,7 +533,7 @@ func CDCFlowWorkflowWithConfig( signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{ Done: false, SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, - TableNameSchemaMapping: state.TableNameSchemaMapping, + TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping, }) normalizeSignalError = signalFuture.Get(ctx, nil) } else { diff --git a/protos/flow.proto b/protos/flow.proto index 2147ee2fc8..f5d804ae71 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -106,21 +106,6 @@ message SyncFlowOptions { repeated TableMapping table_mappings = 6; } -// deprecated, unused -message LastSyncState { - int64 checkpoint = 1; -} - -message StartFlowInput { - // deprecated, unused - LastSyncState last_sync_state = 1; - FlowConnectionConfigs flow_connection_configs = 2; - SyncFlowOptions sync_flow_options = 3; - map relation_message_mapping = 4; - map src_table_id_name_mapping = 5; - map table_name_schema_mapping = 6; -} - message StartNormalizeInput { FlowConnectionConfigs flow_connection_configs = 1; map table_name_schema_mapping = 2;