From 9c447cfdbe9da39b3b50b8f68c4af9ebd24fc9fa Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 17 Jan 2024 22:45:59 +0530 Subject: [PATCH] [breaking] pruning and reorganizing fields from protos, moving stuff to state (#1087) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### ⚠️ This change can break existing CDC mirrors! Performs several breaking changes to our protobufs: 1) Removes unused fields from `FlowConnectionConfigs`: `table_schema, src_table_id_name_mapping, table_name_schema_mapping, snapshot_sync_mode, cdc_sync_mode, push_batch_size, push_parallelism, metadata_peer`. `src_table_id_name_mapping` and `table_name_schema_mapping` are now moved to state, setup from `SetupFlow` and passed accordingly via more proto changes. Order shuffled around to group similar fields together. 2) Removed unnecessary generalization of `table_identifier` 3) Removed unused messages `EnsurePullabilityInput` and `EnsurePullabilityOutput` 4) Changed wording of `do_initial_copy` and `initial_copy_only` to better align with other options. 5) Removed `columns` from `TableSchema` and fixed related methods up. 6) `batch_size` had 3 different datatypes and was being modified in multiple places, making sure `batch_size` and `idle_timeout_seconds` has one source of truth. --- flow/activities/flowable.go | 20 ++-- flow/cmd/handler.go | 6 +- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/postgres/postgres.go | 13 +- flow/connectors/utils/columns.go | 40 ++----- flow/model/model.go | 4 - flow/workflows/cdc_flow.go | 53 +++++---- flow/workflows/normalize_flow.go | 7 +- flow/workflows/setup_flow.go | 106 ++++++++--------- flow/workflows/snapshot_flow.go | 13 +- flow/workflows/sync_flow.go | 2 + nexus/analyzer/src/lib.rs | 2 +- nexus/flow-rs/src/grpc.rs | 8 +- nexus/pt/src/flow_model.rs | 2 +- protos/flow.proto | 111 +++++++----------- ui/app/mirrors/create/cdc/cdc.tsx | 2 +- ui/app/mirrors/create/handlers.ts | 14 +-- ui/app/mirrors/create/helpers/cdc.ts | 4 +- ui/app/mirrors/create/helpers/common.ts | 12 +- .../mirrors/edit/[mirrorId]/configValues.ts | 8 -- 20 files changed, 174 insertions(+), 255 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 34ce7eb70e..e8cdebbcc3 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -231,14 +231,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, errGroup.Go(func() error { return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, - SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping, + SrcTableIDNameMapping: input.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, - MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), + MaxBatchSize: input.SyncFlowOptions.BatchSize, IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds( - int(input.FlowConnectionConfigs.IdleTimeoutSeconds), + int(input.SyncFlowOptions.IdleTimeoutSeconds), ), - TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, + TableNameSchemaMapping: input.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, RelationMessageMapping: input.RelationMessageMapping, @@ -293,12 +293,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, syncStartTime := time.Now() res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ - Records: recordBatch, - FlowJobName: input.FlowConnectionConfigs.FlowJobName, - TableMappings: input.FlowConnectionConfigs.TableMappings, - StagingPath: input.FlowConnectionConfigs.CdcStagingPath, - PushBatchSize: input.FlowConnectionConfigs.PushBatchSize, - PushParallelism: input.FlowConnectionConfigs.PushParallelism, + Records: recordBatch, + FlowJobName: input.FlowConnectionConfigs.FlowJobName, + TableMappings: input.FlowConnectionConfigs.TableMappings, + StagingPath: input.FlowConnectionConfigs.CdcStagingPath, }) if err != nil { slog.Warn("failed to push records", slog.Any("error", err)) @@ -404,7 +402,7 @@ func (a *FlowableActivity) StartNormalize( SoftDelete: input.FlowConnectionConfigs.SoftDelete, SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName, SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName, - TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, + TableNameSchemaMapping: input.TableNameSchemaMapping, }) if err != nil { a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index c6c15f3f84..08be7c724e 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -131,10 +131,9 @@ func (h *FlowRequestHandler) CreateCDCFlow( }, } - maxBatchSize := int(cfg.MaxBatchSize) + maxBatchSize := cfg.MaxBatchSize if maxBatchSize == 0 { maxBatchSize = 1_000_000 - cfg.MaxBatchSize = uint32(maxBatchSize) } limits := &peerflow.CDCFlowLimits{ @@ -172,14 +171,13 @@ func (h *FlowRequestHandler) CreateCDCFlow( return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) } - state := peerflow.NewCDCFlowWorkflowState() _, err = h.temporalClient.ExecuteWorkflow( ctx, // context workflowOptions, // workflow start options peerflow.CDCFlowWorkflowWithConfig, // workflow function cfg, // workflow input limits, // workflow limits - state, // workflow state + nil, // workflow state ) if err != nil { slog.Error("unable to start PeerFlow workflow", slog.Any("error", err)) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 238048134b..7177e4bb42 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -782,7 +782,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( } // convert the column names and types to bigquery types - columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.Columns)+2) + columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.ColumnNames)+2) utils.IterColumns(tableSchema, func(colName, genericColType string) { columns = append(columns, &bigquery.FieldSchema{ Name: colName, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index c111a0d3a3..d0a7f4db52 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -656,7 +656,6 @@ func (c *PostgresConnector) getTableSchemaForTable( return &protos.TableSchema{ TableIdentifier: tableName, - Columns: nil, PrimaryKeyColumns: pKeyCols, IsReplicaIdentityFull: replicaIdentityType == ReplicaIdentityFull, ColumnNames: columnNames, @@ -776,7 +775,7 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string, func (c *PostgresConnector) EnsurePullability( req *protos.EnsurePullabilityBatchInput, ) (*protos.EnsurePullabilityBatchOutput, error) { - tableIdentifierMapping := make(map[string]*protos.TableIdentifier) + tableIdentifierMapping := make(map[string]*protos.PostgresTableIdentifier) for _, tableName := range req.SourceTableIdentifiers { schemaTable, err := utils.ParseSchemaTable(tableName) if err != nil { @@ -804,12 +803,8 @@ func (c *PostgresConnector) EnsurePullability( return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) } - tableIdentifierMapping[tableName] = &protos.TableIdentifier{ - TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{ - PostgresTableIdentifier: &protos.PostgresTableIdentifier{ - RelId: relID, - }, - }, + tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{ + RelId: relID, } utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) } @@ -852,7 +847,7 @@ func (c *PostgresConnector) SetupReplication(signal SlotSignal, req *protos.Setu } // Create the replication slot and publication err = c.createSlotAndPublication(signal, exists, - slotName, publicationName, tableNameMapping, req.DoInitialCopy) + slotName, publicationName, tableNameMapping, req.DoInitialSnapshot) if err != nil { return fmt.Errorf("error creating replication slot and publication: %w", err) } diff --git a/flow/connectors/utils/columns.go b/flow/connectors/utils/columns.go index 4a10ad444b..f1e0340f03 100644 --- a/flow/connectors/utils/columns.go +++ b/flow/connectors/utils/columns.go @@ -4,51 +4,27 @@ import ( "slices" "github.com/PeerDB-io/peer-flow/generated/protos" - "golang.org/x/exp/maps" ) func TableSchemaColumns(schema *protos.TableSchema) int { - if schema.Columns != nil { - return len(schema.Columns) - } else { - return len(schema.ColumnNames) - } + return len(schema.ColumnNames) } func TableSchemaColumnNames(schema *protos.TableSchema) []string { - if schema.Columns != nil { - return maps.Keys(schema.Columns) - } else { - return slices.Clone(schema.ColumnNames) - } + return slices.Clone(schema.ColumnNames) } func IterColumns(schema *protos.TableSchema, iter func(k, v string)) { - if schema.Columns != nil { - for k, v := range schema.Columns { - iter(k, v) - } - } else { - for i, name := range schema.ColumnNames { - iter(name, schema.ColumnTypes[i]) - } + for i, name := range schema.ColumnNames { + iter(name, schema.ColumnTypes[i]) } } func IterColumnsError(schema *protos.TableSchema, iter func(k, v string) error) error { - if schema.Columns != nil { - for k, v := range schema.Columns { - err := iter(k, v) - if err != nil { - return err - } - } - } else { - for i, name := range schema.ColumnNames { - err := iter(name, schema.ColumnTypes[i]) - if err != nil { - return err - } + for i, name := range schema.ColumnNames { + err := iter(name, schema.ColumnTypes[i]) + if err != nil { + return err } } return nil diff --git a/flow/model/model.go b/flow/model/model.go index 4d819e9fd7..33510e4e12 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -512,10 +512,6 @@ type SyncRecordsRequest struct { TableMappings []*protos.TableMapping // Staging path for AVRO files in CDC StagingPath string - // PushBatchSize is the number of records to push in a batch for EventHub. - PushBatchSize int64 - // PushParallelism is the number of batches in Event Hub to push in parallel. - PushParallelism int64 } type NormalizeRecordsRequest struct { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index f6a5bda189..70ac1723ba 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -26,7 +26,7 @@ type CDCFlowLimits struct { // This is typically non-zero for testing purposes. TotalSyncFlows int // Maximum number of rows in a sync flow batch. - MaxBatchSize int + MaxBatchSize uint32 // Rows synced after which we can say a test is done. ExitAfterRecords int } @@ -49,15 +49,18 @@ type CDCFlowWorkflowState struct { RelationMessageMapping model.RelationMessageMapping // current workflow state CurrentFlowState protos.FlowStatus + // moved from config here, set by SetupFlow + SrcTableIdNameMapping map[uint32]string + TableNameSchemaMapping map[string]*protos.TableSchema } type SignalProps struct { - BatchSize int32 - IdleTimeout int64 + BatchSize uint32 + IdleTimeout uint64 } // returns a new empty PeerFlowState -func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { +func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { return &CDCFlowWorkflowState{ Progress: []string{"started"}, SyncFlowStatuses: nil, @@ -72,7 +75,9 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { RelationName: "protobuf_workaround", }, }, - CurrentFlowState: protos.FlowStatus_STATUS_SETUP, + CurrentFlowState: protos.FlowStatus_STATUS_SETUP, + SrcTableIdNameMapping: make(map[uint32]string, numTables), + TableNameSchemaMapping: make(map[string]*protos.TableSchema, numTables), } } @@ -151,13 +156,12 @@ func CDCFlowWorkflowWithConfig( limits *CDCFlowLimits, state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { - if state == nil { - state = NewCDCFlowWorkflowState() - } - if cfg == nil { return nil, fmt.Errorf("invalid connection configs") } + if state == nil { + state = NewCDCFlowWorkflowState(len(cfg.TableMappings)) + } w := NewCDCFlowWorkflowExecution(ctx) @@ -221,9 +225,12 @@ func CDCFlowWorkflowWithConfig( } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) - if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { + var setupFlowOutput *protos.SetupFlowOutput + if err := setupFlowFuture.Get(setupFlowCtx, &setupFlowOutput); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } + state.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping + state.TableNameSchemaMapping = setupFlowOutput.TableNameSchemaMapping state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT // next part of the setup is to snapshot-initial-copy and setup replication slots. @@ -269,14 +276,14 @@ func CDCFlowWorkflowWithConfig( CurrentName: oldName, NewName: newName, // oldName is what was used for the TableNameSchema mapping - TableSchema: cfg.TableNameSchemaMapping[oldName], + TableSchema: state.TableNameSchemaMapping[oldName], }) mapping.DestinationTableIdentifier = newName // TableNameSchemaMapping is referring to the _resync tables, not the actual names - correctedTableNameSchemaMapping[newName] = cfg.TableNameSchemaMapping[oldName] + correctedTableNameSchemaMapping[newName] = state.TableNameSchemaMapping[oldName] } - cfg.TableNameSchemaMapping = correctedTableNameSchemaMapping + state.TableNameSchemaMapping = correctedTableNameSchemaMapping renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: 1 * time.Hour, @@ -291,14 +298,19 @@ func CDCFlowWorkflowWithConfig( state.Progress = append(state.Progress, "executed setup flow and snapshot flow") // if initial_copy_only is opted for, we end the flow here. - if cfg.InitialCopyOnly { - return nil, nil + if cfg.InitialSnapshotOnly { + return state, nil } } syncFlowOptions := &protos.SyncFlowOptions{ - BatchSize: int32(limits.MaxBatchSize), - IdleTimeoutSeconds: 0, + BatchSize: limits.MaxBatchSize, + IdleTimeoutSeconds: 0, + SrcTableIdNameMapping: state.SrcTableIdNameMapping, + TableNameSchemaMapping: state.TableNameSchemaMapping, + } + normalizeFlowOptions := &protos.NormalizeFlowOptions{ + TableNameSchemaMapping: state.TableNameSchemaMapping, } // add a signal to change CDC properties @@ -307,14 +319,12 @@ func CDCFlowWorkflowWithConfig( cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) { var cdcSignal SignalProps c.Receive(ctx, &cdcSignal) + // only modify for options since SyncFlow uses it if cdcSignal.BatchSize > 0 { syncFlowOptions.BatchSize = cdcSignal.BatchSize - cfg.MaxBatchSize = uint32(cdcSignal.BatchSize) - limits.MaxBatchSize = int(cdcSignal.BatchSize) } if cdcSignal.IdleTimeout > 0 { syncFlowOptions.IdleTimeoutSeconds = cdcSignal.IdleTimeout - cfg.IdleTimeoutSeconds = cdcSignal.IdleTimeout } slog.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(cfg.MaxBatchSize)), @@ -451,7 +461,7 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else { for i := range modifiedSrcTables { - cfg.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] + state.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] } } } @@ -475,6 +485,7 @@ func CDCFlowWorkflowWithConfig( normCtx, NormalizeFlowWorkflow, cfg, + normalizeFlowOptions, ) var childNormalizeFlowRes *model.NormalizeResponse diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 39256eac1a..ebf23051f7 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -31,18 +31,20 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) func NormalizeFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, + options *protos.NormalizeFlowOptions, ) (*model.NormalizeResponse, error) { s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ CDCFlowName: config.FlowJobName, Progress: []string{}, }) - return s.executeNormalizeFlow(ctx, config) + return s.executeNormalizeFlow(ctx, config, options) } func (s *NormalizeFlowExecution) executeNormalizeFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, + options *protos.NormalizeFlowOptions, ) (*model.NormalizeResponse, error) { s.logger.Info("executing normalize flow - ", s.CDCFlowName) @@ -52,7 +54,8 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( }) startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, + FlowConnectionConfigs: config, + TableNameSchemaMapping: options.TableNameSchemaMapping, } fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 3fe76f90e9..b23928f328 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -19,36 +19,31 @@ import ( // setup is done for a sync flow to execute. // // The setup flow is responsible for: -// 1. Global: +// 1. Global: // - ensure that we are able to connect to the source and destination peers -// 2. Source Peer: -// - setup the metadata table on the source peer +// 2. Source Peer: +// - setup the metadata table on the source peer // - initialize pullability on the source peer, as an example on postgres: -// - ensuring the required table exists on the source peer -// - creating the slot and publication on the source peer -// 3. Destination Peer: -// - setup the metadata table on the destination peer +// - ensuring the required table exists on the source peer +// - creating the slot and publication on the source peer +// 3. Destination Peer: +// - setup the metadata table on the destination peer // - creating the raw table on the destination peer // - creating the normalized table on the destination peer - -type SetupFlowState struct { - tableNameMapping map[string]string - CDCFlowName string - Progress []string -} - type SetupFlowExecution struct { - SetupFlowState - executionID string - logger log.Logger + tableNameMapping map[string]string + cdcFlowName string + executionID string + logger log.Logger } // NewSetupFlowExecution creates a new instance of SetupFlowExecution. -func NewSetupFlowExecution(ctx workflow.Context, state *SetupFlowState) *SetupFlowExecution { +func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution { return &SetupFlowExecution{ - SetupFlowState: *state, - executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), + tableNameMapping: tableNameMapping, + cdcFlowName: cdcFlowName, + executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + logger: workflow.GetLogger(ctx), } } @@ -58,7 +53,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) error { - s.logger.Info("checking connections for CDC flow - ", s.CDCFlowName) + s.logger.Info("checking connections for CDC flow - ", s.cdcFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 2 * time.Minute, @@ -86,7 +81,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( return fmt.Errorf("failed to check destination peer connection: %w", err) } - s.logger.Info("ensuring metadata table exists - ", s.CDCFlowName) + s.logger.Info("ensuring metadata table exists - ", s.cdcFlowName) // then setup the destination peer metadata tables if destConnStatus.NeedsSetupMetadataTables { @@ -105,13 +100,13 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( func (s *SetupFlowExecution) ensurePullability( ctx workflow.Context, config *protos.FlowConnectionConfigs, -) error { - s.logger.Info("ensuring pullability for peer flow - ", s.CDCFlowName) +) (map[uint32]string, error) { + s.logger.Info("ensuring pullability for peer flow - ", s.cdcFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, }) - tmpMap := make(map[uint32]string) + srcTableIdNameMapping := make(map[uint32]string) srcTblIdentifiers := maps.Keys(s.tableNameMapping) sort.Strings(srcTblIdentifiers) @@ -119,7 +114,7 @@ func (s *SetupFlowExecution) ensurePullability( // create EnsurePullabilityInput for the srcTableName ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ PeerConnectionConfig: config.Source, - FlowJobName: s.CDCFlowName, + FlowJobName: s.cdcFlowName, SourceTableIdentifiers: srcTblIdentifiers, } @@ -127,7 +122,7 @@ func (s *SetupFlowExecution) ensurePullability( var ensurePullabilityOutput *protos.EnsurePullabilityBatchOutput if err := future.Get(ctx, &ensurePullabilityOutput); err != nil { s.logger.Error("failed to ensure pullability for tables: ", err) - return fmt.Errorf("failed to ensure pullability for tables: %w", err) + return nil, fmt.Errorf("failed to ensure pullability for tables: %w", err) } sortedTableNames := maps.Keys(ensurePullabilityOutput.TableIdentifierMapping) @@ -135,14 +130,10 @@ func (s *SetupFlowExecution) ensurePullability( for _, tableName := range sortedTableNames { tableIdentifier := ensurePullabilityOutput.TableIdentifierMapping[tableName] - switch typedTableIdentifier := tableIdentifier.TableIdentifier.(type) { - case *protos.TableIdentifier_PostgresTableIdentifier: - tmpMap[typedTableIdentifier.PostgresTableIdentifier.RelId] = tableName - } + srcTableIdNameMapping[tableIdentifier.RelId] = tableName } - config.SrcTableIdNameMapping = tmpMap - return nil + return srcTableIdNameMapping, nil } // createRawTable creates the raw table on the destination peer. @@ -150,7 +141,7 @@ func (s *SetupFlowExecution) createRawTable( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) error { - s.logger.Info("creating raw table on destination - ", s.CDCFlowName) + s.logger.Info("creating raw table on destination - ", s.cdcFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -158,9 +149,8 @@ func (s *SetupFlowExecution) createRawTable( // attempt to create the tables. createRawTblInput := &protos.CreateRawTableInput{ PeerConnectionConfig: config.Destination, - FlowJobName: s.CDCFlowName, + FlowJobName: s.cdcFlowName, TableNameMapping: s.tableNameMapping, - CdcSyncMode: config.CdcSyncMode, } rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput) @@ -176,7 +166,7 @@ func (s *SetupFlowExecution) createRawTable( func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs, ) (map[string]*protos.TableSchema, error) { - s.logger.Info("fetching table schema for peer flow - ", s.CDCFlowName) + s.logger.Info("fetching table schema for peer flow - ", s.cdcFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Hour, @@ -189,8 +179,8 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( tableSchemaInput := &protos.GetTableSchemaBatchInput{ PeerConnectionConfig: flowConnectionConfigs.Source, TableIdentifiers: sourceTables, - FlowName: s.CDCFlowName, - SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly, + FlowName: s.cdcFlowName, + SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly, } future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) @@ -205,7 +195,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( sortedSourceTables := maps.Keys(tableNameSchemaMapping) sort.Strings(sortedSourceTables) - s.logger.Info("setting up normalized tables for peer flow - ", s.CDCFlowName) + s.logger.Info("setting up normalized tables for peer flow - ", s.cdcFlowName) normalizedTableMapping := make(map[string]*protos.TableSchema) for _, srcTableName := range sortedSourceTables { tableSchema := tableNameSchemaMapping[srcTableName] @@ -224,7 +214,6 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( }) tableSchema = &protos.TableSchema{ TableIdentifier: tableSchema.TableIdentifier, - Columns: nil, PrimaryKeyColumns: tableSchema.PrimaryKeyColumns, IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, ColumnNames: columnNames, @@ -255,7 +244,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( return nil, fmt.Errorf("failed to create normalized tables: %w", err) } - s.logger.Info("finished setting up normalized tables for peer flow - ", s.CDCFlowName) + s.logger.Info("finished setting up normalized tables for peer flow - ", s.cdcFlowName) return normalizedTableMapping, nil } @@ -263,21 +252,24 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, -) (map[string]*protos.TableSchema, error) { - s.logger.Info("executing setup flow - ", s.CDCFlowName) +) (*protos.SetupFlowOutput, error) { + s.logger.Info("executing setup flow - ", s.cdcFlowName) // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { return nil, fmt.Errorf("failed to check connections and setup metadata tables: %w", err) } + setupFlowOutput := protos.SetupFlowOutput{} // for initial copy only flows, we don't need to ensure pullability or create the raw table // as we don't need the primary key requirement. - if !config.InitialCopyOnly { + if !config.InitialSnapshotOnly { // then ensure pullability - if err := s.ensurePullability(ctx, config); err != nil { + srcTableIdNameMapping, err := s.ensurePullability(ctx, config) + if err != nil { return nil, fmt.Errorf("failed to ensure pullability: %w", err) } + setupFlowOutput.SrcTableIdNameMapping = srcTableIdNameMapping // then create the raw table if err := s.createRawTable(ctx, config); err != nil { @@ -286,38 +278,32 @@ func (s *SetupFlowExecution) executeSetupFlow( } // then fetch the table schema and setup the normalized tables - tableSchema, err := s.fetchTableSchemaAndSetupNormalizedTables(ctx, config) + tableNameSchemaMapping, err := s.fetchTableSchemaAndSetupNormalizedTables(ctx, config) if err != nil { return nil, fmt.Errorf("failed to fetch table schema and setup normalized tables: %w", err) } + setupFlowOutput.TableNameSchemaMapping = tableNameSchemaMapping - return tableSchema, nil + return &setupFlowOutput, nil } // SetupFlowWorkflow is the workflow that sets up the flow. func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, -) (*protos.FlowConnectionConfigs, error) { +) (*protos.SetupFlowOutput, error) { tblNameMapping := make(map[string]string, len(config.TableMappings)) for _, v := range config.TableMappings { tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier } - setupFlowState := &SetupFlowState{ - tableNameMapping: tblNameMapping, - CDCFlowName: config.FlowJobName, - Progress: []string{}, - } - // create the setup flow execution - setupFlowExecution := NewSetupFlowExecution(ctx, setupFlowState) + setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName) // execute the setup flow - tableNameSchemaMapping, err := setupFlowExecution.executeSetupFlow(ctx, config) + setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config) if err != nil { return nil, fmt.Errorf("failed to execute setup flow: %w", err) } - config.TableNameSchemaMapping = tableNameSchemaMapping - return config, nil + return setupFlowOutput, nil } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 91a5a4dbff..00e8f778e5 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -19,8 +19,9 @@ import ( ) type SnapshotFlowExecution struct { - config *protos.FlowConnectionConfigs - logger log.Logger + config *protos.FlowConnectionConfigs + logger log.Logger + tableNameSchemaMapping map[string]*protos.TableSchema } // ensurePullability ensures that the source peer is pullable. @@ -46,7 +47,7 @@ func (s *SnapshotFlowExecution) setupReplication( PeerConnectionConfig: s.config.Source, FlowJobName: flowName, TableNameMapping: tblNameMapping, - DoInitialCopy: s.config.DoInitialCopy, + DoInitialSnapshot: s.config.DoInitialSnapshot, ExistingPublicationName: s.config.PublicationName, ExistingReplicationSlotName: s.config.ReplicationSlotName, } @@ -138,7 +139,7 @@ func (s *SnapshotFlowExecution) cloneTable( } from := "*" if len(mapping.Exclude) != 0 { - for _, v := range s.config.TableNameSchemaMapping { + for _, v := range s.tableNameSchemaMapping { if v.TableIdentifier == srcName { colNames := utils.TableSchemaColumnNames(v) for i, colName := range colNames { @@ -260,7 +261,7 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon replCtx := ctx - if !config.DoInitialCopy { + if !config.DoInitialSnapshot { _, err := se.setupReplication(replCtx) if err != nil { return fmt.Errorf("failed to setup replication: %w", err) @@ -273,7 +274,7 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon return nil } - if config.InitialCopyOnly { + if config.InitialSnapshotOnly { slotInfo := &protos.SetupReplicationOutput{ SlotName: "peerdb_initial_copy_only", SnapshotName: "", // empty snapshot name indicates that we should not use a snapshot diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 3616a7a99e..8c1028069d 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -75,6 +75,8 @@ func (s *SyncFlowExecution) executeSyncFlow( LastSyncState: dstSyncState, SyncFlowOptions: opts, RelationMessageMapping: relationMessageMapping, + SrcTableIdNameMapping: opts.SrcTableIdNameMapping, + TableNameSchemaMapping: opts.TableNameSchemaMapping, } fStartFlow := workflow.ExecuteActivity(startFlowCtx, flowable.StartFlow, startFlowInput) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 8a9fcf9615..a3dbe1b37d 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -332,7 +332,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { resync, soft_delete_col_name, synced_at_col_name, - initial_copy_only, + initial_snapshot_only: initial_copy_only, }; if initial_copy_only && !do_initial_copy { diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index b8adf9ea84..deed768143 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -143,7 +143,7 @@ impl FlowGrpcClient { }); }); - let do_initial_copy = job.do_initial_copy; + let do_initial_snapshot = job.do_initial_copy; let publication_name = job.publication_name.clone(); let replication_slot_name = job.replication_slot_name.clone(); let snapshot_num_rows_per_partition = job.snapshot_num_rows_per_partition; @@ -155,7 +155,7 @@ impl FlowGrpcClient { destination: Some(dst), flow_job_name: job.name.clone(), table_mappings, - do_initial_copy, + do_initial_snapshot, publication_name: publication_name.unwrap_or_default(), snapshot_num_rows_per_partition: snapshot_num_rows_per_partition.unwrap_or(0), snapshot_max_parallel_workers: snapshot_max_parallel_workers.unwrap_or(0), @@ -164,13 +164,11 @@ impl FlowGrpcClient { cdc_staging_path: job.cdc_staging_path.clone().unwrap_or_default(), soft_delete: job.soft_delete, replication_slot_name: replication_slot_name.unwrap_or_default(), - push_batch_size: job.push_batch_size.unwrap_or_default(), - push_parallelism: job.push_parallelism.unwrap_or_default(), max_batch_size: job.max_batch_size.unwrap_or_default(), resync: job.resync, soft_delete_col_name: job.soft_delete_col_name.clone().unwrap_or_default(), synced_at_col_name: job.synced_at_col_name.clone().unwrap_or_default(), - initial_copy_only: job.initial_copy_only, + initial_snapshot_only: job.initial_snapshot_only, ..Default::default() }; diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index 9a39485e20..54c4863fad 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -70,7 +70,7 @@ pub struct FlowJob { pub resync: bool, pub soft_delete_col_name: Option, pub synced_at_col_name: Option, - pub initial_copy_only: bool, + pub initial_snapshot_only: bool, } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] diff --git a/protos/flow.proto b/protos/flow.proto index 24c9d9f87c..60d15d9d91 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -35,51 +35,38 @@ message SetupInput { } message FlowConnectionConfigs { - peerdb_peers.Peer source = 1; - peerdb_peers.Peer destination = 2; - string flow_job_name = 3; - TableSchema table_schema = 4; - repeated TableMapping table_mappings = 5; - map src_table_id_name_mapping = 6; - map table_name_schema_mapping = 7; - - // This is an optional peer that will be used to hold metadata in cases where - // the destination isn't ideal for holding metadata. - peerdb_peers.Peer metadata_peer = 8; - uint32 max_batch_size = 9; - bool do_initial_copy = 10; - - string publication_name = 11; - uint32 snapshot_num_rows_per_partition = 12; + string flow_job_name = 1; + // source and destination peer + peerdb_peers.Peer source = 2; + peerdb_peers.Peer destination = 3; + + // config for the CDC flow itself + // currently, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals + repeated TableMapping table_mappings = 4; + uint32 max_batch_size = 5; + uint64 idle_timeout_seconds = 6; + string cdc_staging_path = 7; + string publication_name = 8; + string replication_slot_name = 9; + + // config for the initial load feature, along with interactions like resync and initial_snapshot_only + bool do_initial_snapshot = 10; + uint32 snapshot_num_rows_per_partition = 11; + string snapshot_staging_path = 12; // max parallel workers is per table uint32 snapshot_max_parallel_workers = 13; uint32 snapshot_num_tables_in_parallel = 14; - QRepSyncMode snapshot_sync_mode = 15; - QRepSyncMode cdc_sync_mode = 16; - string snapshot_staging_path = 17; - string cdc_staging_path = 18; - - // currently only works for snowflake - bool soft_delete = 19; - - string replication_slot_name = 20; - - // the below two are for eventhub only - int64 push_batch_size = 21; - int64 push_parallelism = 22; - // if true, then the flow will be resynced // create new tables with "_resync" suffix, perform initial load and then swap the new tables with the old ones - // to be used after the old mirror is dropped - bool resync = 23; + // to only be used after the old mirror is dropped + bool resync = 15; + bool initial_snapshot_only = 16; - string soft_delete_col_name = 24; - string synced_at_col_name = 25; - - bool initial_copy_only = 26; - - int64 idle_timeout_seconds = 27; + // configurations for soft delete and synced at columns, affects both initial snapshot and CDC + bool soft_delete = 17; + string soft_delete_col_name = 18; + string synced_at_col_name = 19; } message RenameTableOption { @@ -111,18 +98,19 @@ message CreateTablesFromExistingOutput { } message SyncFlowOptions { - int32 batch_size = 1; + uint32 batch_size = 1; map relation_message_mapping = 2; - int64 idle_timeout_seconds = 3; + uint64 idle_timeout_seconds = 3; + map src_table_id_name_mapping = 4; + map table_name_schema_mapping = 5; } message NormalizeFlowOptions { - int32 batch_size = 1; + map table_name_schema_mapping = 1; } message LastSyncState { int64 checkpoint = 1; - google.protobuf.Timestamp last_synced_at = 2; } message StartFlowInput { @@ -130,10 +118,13 @@ message StartFlowInput { FlowConnectionConfigs flow_connection_configs = 2; SyncFlowOptions sync_flow_options = 3; map relation_message_mapping = 4; + map src_table_id_name_mapping = 5; + map table_name_schema_mapping = 6; } message StartNormalizeInput { FlowConnectionConfigs flow_connection_configs = 1; + map table_name_schema_mapping = 2; } message GetLastSyncedIDInput { @@ -141,12 +132,6 @@ message GetLastSyncedIDInput { string flow_job_name = 2; } -message EnsurePullabilityInput { - peerdb_peers.Peer peer_connection_config = 1; - string flow_job_name = 2; - string source_table_identifier = 3; -} - message EnsurePullabilityBatchInput { peerdb_peers.Peer peer_connection_config = 1; string flow_job_name = 2; @@ -157,18 +142,8 @@ message PostgresTableIdentifier { uint32 rel_id = 1; } -message TableIdentifier { - oneof table_identifier { - PostgresTableIdentifier postgres_table_identifier = 1; - } -} - -message EnsurePullabilityOutput { - TableIdentifier table_identifier = 1; -} - message EnsurePullabilityBatchOutput { - map table_identifier_mapping = 1; + map table_identifier_mapping = 1; } message SetupReplicationInput { @@ -177,7 +152,7 @@ message SetupReplicationInput { map table_name_mapping = 3; // replicate to destination using ctid peerdb_peers.Peer destination_peer = 4; - bool do_initial_copy = 5; + bool do_initial_snapshot = 5; string existing_publication_name = 6; string existing_replication_slot_name = 7; } @@ -191,19 +166,16 @@ message CreateRawTableInput { peerdb_peers.Peer peer_connection_config = 1; string flow_job_name = 2; map table_name_mapping = 3; - QRepSyncMode cdc_sync_mode = 4; } message CreateRawTableOutput { string table_identifier = 1; } message TableSchema { string table_identifier = 1; - // DEPRECATED: eliminate when breaking changes are allowed. - map columns = 2; - repeated string primary_key_columns = 3; - bool is_replica_identity_full = 4; - repeated string column_names = 5; - repeated string column_types = 6; + repeated string primary_key_columns = 2; + bool is_replica_identity_full = 3; + repeated string column_names = 4; + repeated string column_types = 5; } message GetTableSchemaBatchInput { @@ -427,3 +399,8 @@ message FlowStateUpdate { QRepFlowStateUpdate qrep_flow_state_update = 2; } } + +message SetupFlowOutput { + map src_table_id_name_mapping = 1; + map table_name_schema_mapping = 2; +} diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 63155acdf2..6d89fe6c5c 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -54,7 +54,7 @@ export default function CDCConfigForm({ const paramDisplayCondition = (setting: MirrorSetting) => { const label = setting.label.toLowerCase(); if ( - (label.includes('snapshot') && mirrorConfig.doInitialCopy !== true) || + (label.includes('snapshot') && mirrorConfig.doInitialSnapshot !== true) || (label.includes('staging path') && defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO') ) { diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 50d9a3903a..935cd34f77 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -150,16 +150,10 @@ export const handleCreateCDC = async ( config['tableMappings'] = tableNameMapping as TableMapping[]; config['flowJobName'] = flowJobName; - if (config.destination?.type == DBType.POSTGRES) { - config.cdcSyncMode = QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; - config.snapshotSyncMode = QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; - } else { - config.cdcSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO; - config.snapshotSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO; - } - - if (config.doInitialCopy == false && config.initialCopyOnly == true) { - notify('Initial Copy Only cannot be true if Initial Copy is false.'); + if (config.doInitialSnapshot == false && config.initialSnapshotOnly == true) { + notify( + 'Initial Snapshot Only cannot be true if Initial Snapshot is false.' + ); return; } diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index 40ae5405b8..525abd4635 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -2,7 +2,7 @@ import { CDCConfig } from '../../../dto/MirrorsDTO'; import { MirrorSetting } from './common'; export const cdcSettings: MirrorSetting[] = [ { - label: 'Initial Copy', + label: 'Initial Snapshot', stateHandler: (value, setter) => setter((curr: CDCConfig) => ({ ...curr, @@ -117,7 +117,7 @@ export const cdcSettings: MirrorSetting[] = [ type: 'switch', }, { - label: 'Initial Copy Only', + label: 'Initial Snapshot Only', stateHandler: (value, setter) => setter((curr: CDCConfig) => ({ ...curr, diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 6864c1bfb2..1af51e8d85 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -23,29 +23,21 @@ export const blankCDCSetting: FlowConnectionConfigs = { source: undefined, destination: undefined, flowJobName: '', - tableSchema: undefined, tableMappings: [], - srcTableIdNameMapping: {}, - tableNameSchemaMapping: {}, - metadataPeer: undefined, maxBatchSize: 1000000, - doInitialCopy: true, + doInitialSnapshot: true, publicationName: '', snapshotNumRowsPerPartition: 500000, snapshotMaxParallelWorkers: 1, snapshotNumTablesInParallel: 4, - snapshotSyncMode: 0, - cdcSyncMode: 0, snapshotStagingPath: '', cdcStagingPath: '', softDelete: false, replicationSlotName: '', - pushBatchSize: 0, - pushParallelism: 0, resync: false, softDeleteColName: '', syncedAtColName: '', - initialCopyOnly: false, + initialSnapshotOnly: false, idleTimeoutSeconds: 60, }; diff --git a/ui/app/mirrors/edit/[mirrorId]/configValues.ts b/ui/app/mirrors/edit/[mirrorId]/configValues.ts index 76d4982910..dd5d5bdb97 100644 --- a/ui/app/mirrors/edit/[mirrorId]/configValues.ts +++ b/ui/app/mirrors/edit/[mirrorId]/configValues.ts @@ -28,14 +28,6 @@ const MirrorValues = (mirrorConfig: FlowConnectionConfigs | undefined) => { value: `${mirrorConfig?.snapshotMaxParallelWorkers} worker(s)`, label: 'Snapshot Parallel Tables', }, - { - value: `${syncModeToLabel(mirrorConfig?.cdcSyncMode!)}`, - label: 'CDC Sync Mode', - }, - { - value: `${syncModeToLabel(mirrorConfig?.snapshotSyncMode!)}`, - label: 'Snapshot Sync Mode', - }, { value: `${ mirrorConfig?.cdcStagingPath?.length