diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index fd9b1a6201..4fd1b3dd79 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -556,7 +556,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl // TODO (kaushik): consider persistent state for a mirror job // to be stored somewhere in temporal state. We might need to persist // the state of the relation message somewhere - p.logger.Info(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", + p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) if p.relationMessageMapping[msg.RelationID] == nil { p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index b0629179f9..3b661b5b43 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -135,30 +135,38 @@ func EnvWaitForEqualTables( table string, cols string, ) { - // wait for PeerFlowStatusQuery to finish setup - // sleep for 5 second to allow the workflow to start - time.Sleep(5 * time.Second) - for { - response, err := env.QueryWorkflow( - shared.CDCFlowStateQuery, - connectionGen.FlowJobName, - ) - if err == nil { - var state peerflow.CDCFlowWorkflowState - err = response.Get(&state) - if err != nil { - slog.Error(err.Error()) - } + suite.T().Helper() + EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols) +} - if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { - break - } - } else { - // log the error for informational purposes - slog.Error(err.Error()) +func EnvWaitForEqualTablesWithNames( + env *testsuite.TestWorkflowEnvironment, + suite e2eshared.RowSource, + reason string, + srcTable string, + dstTable string, + cols string, +) { + t := suite.T() + t.Helper() + + EnvWaitFor(t, env, 3*time.Minute, reason, func() bool { + t.Helper() + + suffix := suite.Suffix() + pool := suite.Pool() + pgRows, err := GetPgRows(pool, suffix, srcTable, cols) + if err != nil { + return false } - time.Sleep(1 * time.Second) - } + + rows, err := suite.GetRows(dstTable, cols) + if err != nil { + return false + } + + return e2eshared.CheckEqualRecordBatches(t, pgRows, rows) + }) } func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironment, @@ -179,7 +187,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen err = response.Get(&state) if err != nil { slog.Error(err.Error()) - } else if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { + } else if state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING { return } } else if counter > 15 { diff --git a/flow/shared/constants.go b/flow/shared/constants.go index f549123a98..119514fb76 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -21,8 +21,7 @@ const ( FlowStatusQuery = "q-flow-status" // Updates - FlowStatusUpdate = "u-flow-status" - CDCFlowConfigUpdate = "u-cdc-flow-config-update" + FlowStatusUpdate = "u-flow-status" ) const MirrorNameSearchAttribute = "MirrorName" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2aea56edec..614e85eaa8 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct { CurrentFlowStatus protos.FlowStatus // flow config update request, set to nil after processed FlowConfigUpdates []*protos.CDCFlowConfigUpdate + // maintaining a copy of SrcTableIdNameMapping and TableNameSchemaMapping from protos.FlowConnectionConfigs + // ideally it shouldn't even be in the config, since it is set dynamically in SetupFlow and should be only in state + SrcTableIdNameMapping map[uint32]string + TableNameSchemaMapping map[string]*protos.TableSchema } type SignalProps struct { @@ -173,7 +177,7 @@ func additionalTablesHasOverlap(currentTableMappings []*protos.TableMapping, func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, - mirrorNameSearch *map[string]interface{}, + limits *CDCFlowLimits, mirrorNameSearch *map[string]interface{}, ) error { for _, flowConfigUpdate := range state.FlowConfigUpdates { if len(flowConfigUpdate.AdditionalTables) == 0 { @@ -195,102 +199,47 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } - additionalTablesSetupFlowID, err := GetChildWorkflowID(ctx, - "additional-tables-setup-flow", cfg.FlowJobName) - if err != nil { - return err - } - additionalTablesSetupFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: additionalTablesSetupFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 2, - }, - SearchAttributes: *mirrorNameSearch, - } - additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesWorkflowCfg.DoInitialCopy = true + additionalTablesWorkflowCfg.InitialCopyOnly = true + additionalTablesWorkflowCfg.ForcePkeyChecks = true additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName, strings.ToLower(shared.RandomString(8))) - additionalTablesSetupCtx := workflow.WithChildOptions(ctx, - additionalTablesSetupFlowOpts) - additionalTablesSetupFlowFuture := workflow.ExecuteChildWorkflow( - additionalTablesSetupCtx, - SetupFlowWorkflow, - additionalTablesWorkflowCfg, - ) - if err := additionalTablesSetupFlowFuture.Get(additionalTablesSetupCtx, - &additionalTablesWorkflowCfg); err != nil { - w.logger.Error("failed to execute SetupFlow for additional tables: ", err) - return fmt.Errorf("failed to execute SetupFlow for additional tables: %w", err) - } - - // next part of the setup is to snapshot-initial-copy and setup replication slots. - additionalTablesSnapshotFlowID, err := GetChildWorkflowID(ctx, - "additional-tables-snapshot-flow", cfg.FlowJobName) - if err != nil { - return err - } - - taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID) + childAdditionalTablesCDCFlowID, + err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName) if err != nil { return err } - additionalTablesSnapshotFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: additionalTablesSnapshotFlowID, + // execute the sync flow as a child workflow + childAdditionalTablesCDCFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: childAdditionalTablesCDCFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, - SearchAttributes: *mirrorNameSearch, - } - additionalTablesSnapshotFlowCtx := workflow.WithChildOptions(ctx, additionalTablesSnapshotFlowOpts) - additionalTablesSnapshotFlowFuture := workflow.ExecuteChildWorkflow(additionalTablesSnapshotFlowCtx, - SnapshotFlowWorkflow, additionalTablesWorkflowCfg) - if err := additionalTablesSnapshotFlowFuture.Get(additionalTablesSnapshotFlowCtx, nil); err != nil { - return fmt.Errorf("failed to execute child workflow: %w", err) - } - - additionalTablesDropFlowID, err := GetChildWorkflowID(ctx, - "additional-tables-drop-flow", cfg.FlowJobName) - if err != nil { - return err + SearchAttributes: *mirrorNameSearch, + WaitForCancellation: true, } - additionalTablesDropFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: additionalTablesDropFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 1, - }, - SearchAttributes: *mirrorNameSearch, - } - additionalTablesDropCtx := workflow.WithChildOptions(ctx, - additionalTablesDropFlowOpts) - additionalTablesDropFlowFuture := workflow.ExecuteChildWorkflow( - additionalTablesDropCtx, - DropFlowWorkflow, - &protos.ShutdownRequest{ - WorkflowId: additionalTablesSetupFlowID, - FlowJobName: additionalTablesWorkflowCfg.FlowJobName, - SourcePeer: cfg.Source, - DestinationPeer: cfg.Destination, - RemoveFlowEntry: false, - }, + childAdditionalTablesCDCFlowCtx := workflow.WithChildOptions(ctx, childAdditionalTablesCDCFlowOpts) + childAdditionalTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( + childAdditionalTablesCDCFlowCtx, + CDCFlowWorkflowWithConfig, + additionalTablesWorkflowCfg, + nil, + limits, ) - if err := additionalTablesDropFlowFuture.Get(additionalTablesDropCtx, nil); err != nil { - w.logger.Error("failed to execute DropFlow for additional tables: ", err) - return fmt.Errorf("failed to execute DropFlow for additional tables: %w", err) + var res *CDCFlowWorkflowResult + if err := childAdditionalTablesCDCFlowFuture.Get(childAdditionalTablesCDCFlowCtx, &res); err != nil { + return err } - for tableID, tableName := range additionalTablesWorkflowCfg.SrcTableIdNameMapping { + for tableID, tableName := range res.SrcTableIdNameMapping { cfg.SrcTableIdNameMapping[tableID] = tableName } - for tableName, tableSchema := range additionalTablesWorkflowCfg.TableNameSchemaMapping { + for tableName, tableSchema := range res.TableNameSchemaMapping { cfg.TableNameSchemaMapping[tableName] = tableSchema } cfg.TableMappings = append(cfg.TableMappings, flowConfigUpdate.AdditionalTables...) @@ -334,18 +283,6 @@ func CDCFlowWorkflowWithConfig( if err != nil { return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err) } - err = workflow.SetUpdateHandler(ctx, shared.CDCFlowConfigUpdate, - func(cdcFlowConfigUpdate *protos.CDCFlowConfigUpdate) error { - if state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcFlowConfigUpdate) - return nil - } - return fmt.Errorf(`flow config updates can only be sent when workflow is paused, - current status: %v`, state.CurrentFlowStatus) - }) - if err != nil { - return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.CDCFlowConfigUpdate, err) - } mirrorNameSearch := map[string]interface{}{ shared.MirrorNameSearchAttribute: cfg.FlowJobName, } @@ -385,6 +322,8 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } + state.SrcTableIdNameMapping = cfg.SrcTableIdNameMapping + state.TableNameSchemaMapping = cfg.TableNameSchemaMapping state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT // next part of the setup is to snapshot-initial-copy and setup replication slots. @@ -453,7 +392,7 @@ func CDCFlowWorkflowWithConfig( // if initial_copy_only is opted for, we end the flow here. if cfg.InitialCopyOnly { - return nil, nil + return state, nil } } @@ -516,7 +455,7 @@ func CDCFlowWorkflowWithConfig( state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) // only process config updates when going from STATUS_PAUSED to STATUS_RUNNING if state.ActiveSignal == shared.NoopSignal { - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, &mirrorNameSearch) + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, &mirrorNameSearch) if err != nil { return state, err } @@ -660,5 +599,5 @@ func CDCFlowWorkflowWithConfig( } state.TruncateProgress(w.logger) - return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 2df08187e3..2fadcbbe62 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -387,8 +387,8 @@ func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error } // Support a Query for the current status of the qrep flow. - err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { - return &state.CurrentFlowState, nil + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) { + return state.CurrentFlowStatus, nil }) if err != nil { return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) @@ -396,7 +396,7 @@ func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error // Support an Update for the current status of the qrep flow. err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { - state.CurrentFlowState = *status + state.CurrentFlowStatus = *status return nil }) if err != nil { @@ -427,15 +427,7 @@ func QRepFlowWorkflow( err := setWorkflowQueries(ctx, state) if err != nil { - return fmt.Errorf("failed to set `%s` query handler: %w", shared.QRepFlowStateQuery, err) - } - - // Support a Query for the current status of the arep flow. - err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { - return &state.CurrentFlowStatus, nil - }) - if err != nil { - return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) + return err } // Support an Update for the current status of the qrep flow. diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 3fe76f90e9..56278ff241 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -187,10 +187,11 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( sort.Strings(sourceTables) tableSchemaInput := &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: flowConnectionConfigs.Source, - TableIdentifiers: sourceTables, - FlowName: s.CDCFlowName, - SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly, + PeerConnectionConfig: flowConnectionConfigs.Source, + TableIdentifiers: sourceTables, + FlowName: s.CDCFlowName, + SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly && + !flowConnectionConfigs.ForcePkeyChecks, } future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) @@ -271,14 +272,15 @@ func (s *SetupFlowExecution) executeSetupFlow( return nil, fmt.Errorf("failed to check connections and setup metadata tables: %w", err) } - // for initial copy only flows, we don't need to ensure pullability or create the raw table - // as we don't need the primary key requirement. - if !config.InitialCopyOnly { + // override for dynamic adding of tables + if !config.InitialCopyOnly || config.ForcePkeyChecks { // then ensure pullability if err := s.ensurePullability(ctx, config); err != nil { return nil, fmt.Errorf("failed to ensure pullability: %w", err) } - + } + // for initial copy only flows, we don't need to create the raw table + if !config.InitialCopyOnly { // then create the raw table if err := s.createRawTable(ctx, config); err != nil { return nil, fmt.Errorf("failed to create raw table: %w", err) diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 8cb6e48db3..be7fdd926a 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -750,18 +750,7 @@ impl NexusBackend { flow_job_name, workflow_details, pt::peerdb_flow::FlowStatus::StatusRunning, - Some(pt::peerdb_flow::FlowConfigUpdate { - update: Some(pt::peerdb_flow::flow_config_update::Update::CdcFlowConfigUpdate( - pt::peerdb_flow::CdcFlowConfigUpdate { - additional_tables: vec![pt::peerdb_flow::TableMapping { - source_table_identifier: "public.oss2".to_string(), - destination_table_identifier: "public.oss2dst" - .to_string(), - partition_key: "".to_string(), - exclude: vec![], - }], - })), - }), + None, ) .await .map_err(|err| { diff --git a/protos/flow.proto b/protos/flow.proto index 02e6b8f460..c948a4b5c1 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -76,9 +76,15 @@ message FlowConnectionConfigs { string soft_delete_col_name = 24; string synced_at_col_name = 25; + // caveat: also disables EnsurePullability, CreateRawTable and also does snapshot without a slot + // not meant as a precursor to CDC, purely automating snapshotting of multiple tables bool initial_copy_only = 26; int64 idle_timeout_seconds = 27; + + // make InitialCopyOnly enable EnsurePullability for the use case of dynamic table addition + // TODO: find a better way to do this. + bool force_pkey_checks = 28; } message RenameTableOption { diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index cf1579fe05..c3bad8f946 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -47,6 +47,7 @@ export const blankCDCSetting: FlowConnectionConfigs = { syncedAtColName: '', initialCopyOnly: false, idleTimeoutSeconds: 60, + forcePkeyChecks: false, }; export const blankQRepSetting = {