From ffb7d4bc1d09f5661ce97d6a6dfb96602dd2ce4a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 15 Jan 2024 16:13:38 -0500 Subject: [PATCH] refine check constraints --- flow/connectors/postgres/postgres.go | 22 +++++++++++++++------- flow/workflows/cdc_flow.go | 1 - flow/workflows/setup_flow.go | 23 +++++++++++------------ protos/flow.proto | 5 +---- ui/app/mirrors/create/helpers/common.ts | 1 - 5 files changed, 27 insertions(+), 25 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 5236fb2add..126a6d1c2d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -778,6 +778,20 @@ func (c *PostgresConnector) EnsurePullability( return nil, err } + tableIdentifierMapping[tableName] = &protos.TableIdentifier{ + TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{ + PostgresTableIdentifier: &protos.PostgresTableIdentifier{ + RelId: relID, + }, + }, + } + + if !req.CheckConstraints { + msg := fmt.Sprintf("[no-constriants] ensured pullability table %s", tableName) + utils.RecordHeartbeatWithRecover(c.ctx, msg) + continue + } + replicaIdentity, replErr := c.getReplicaIdentityType(schemaTable) if replErr != nil { return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr) @@ -789,17 +803,11 @@ func (c *PostgresConnector) EnsurePullability( } // we only allow no primary key if the table has REPLICA IDENTITY FULL + // this is ok for replica identity idex as we populate the primary key columns if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) { return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) } - tableIdentifierMapping[tableName] = &protos.TableIdentifier{ - TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{ - PostgresTableIdentifier: &protos.PostgresTableIdentifier{ - RelId: relID, - }, - }, - } utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 614e85eaa8..50de3b65dd 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -202,7 +202,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesWorkflowCfg.DoInitialCopy = true additionalTablesWorkflowCfg.InitialCopyOnly = true - additionalTablesWorkflowCfg.ForcePkeyChecks = true additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName, strings.ToLower(shared.RandomString(8))) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 56278ff241..6ec312499b 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -105,13 +105,13 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( func (s *SetupFlowExecution) ensurePullability( ctx workflow.Context, config *protos.FlowConnectionConfigs, + checkConstraints bool, ) error { s.logger.Info("ensuring pullability for peer flow - ", s.CDCFlowName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, }) - tmpMap := make(map[uint32]string) srcTblIdentifiers := maps.Keys(s.tableNameMapping) sort.Strings(srcTblIdentifiers) @@ -121,6 +121,7 @@ func (s *SetupFlowExecution) ensurePullability( PeerConnectionConfig: config.Source, FlowJobName: s.CDCFlowName, SourceTableIdentifiers: srcTblIdentifiers, + CheckConstraints: checkConstraints, } future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput) @@ -133,6 +134,7 @@ func (s *SetupFlowExecution) ensurePullability( sortedTableNames := maps.Keys(ensurePullabilityOutput.TableIdentifierMapping) sort.Strings(sortedTableNames) + tmpMap := make(map[uint32]string) for _, tableName := range sortedTableNames { tableIdentifier := ensurePullabilityOutput.TableIdentifierMapping[tableName] switch typedTableIdentifier := tableIdentifier.TableIdentifier.(type) { @@ -187,11 +189,10 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( sort.Strings(sourceTables) tableSchemaInput := &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: flowConnectionConfigs.Source, - TableIdentifiers: sourceTables, - FlowName: s.CDCFlowName, - SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly && - !flowConnectionConfigs.ForcePkeyChecks, + PeerConnectionConfig: flowConnectionConfigs.Source, + TableIdentifiers: sourceTables, + FlowName: s.CDCFlowName, + SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly, } future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) @@ -272,13 +273,11 @@ func (s *SetupFlowExecution) executeSetupFlow( return nil, fmt.Errorf("failed to check connections and setup metadata tables: %w", err) } - // override for dynamic adding of tables - if !config.InitialCopyOnly || config.ForcePkeyChecks { - // then ensure pullability - if err := s.ensurePullability(ctx, config); err != nil { - return nil, fmt.Errorf("failed to ensure pullability: %w", err) - } + checkConstraints := !config.InitialCopyOnly + if err := s.ensurePullability(ctx, config, checkConstraints); err != nil { + return nil, fmt.Errorf("failed to ensure pullability: %w", err) } + // for initial copy only flows, we don't need to create the raw table if !config.InitialCopyOnly { // then create the raw table diff --git a/protos/flow.proto b/protos/flow.proto index c948a4b5c1..a27f853ffc 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -81,10 +81,6 @@ message FlowConnectionConfigs { bool initial_copy_only = 26; int64 idle_timeout_seconds = 27; - - // make InitialCopyOnly enable EnsurePullability for the use case of dynamic table addition - // TODO: find a better way to do this. - bool force_pkey_checks = 28; } message RenameTableOption { @@ -156,6 +152,7 @@ message EnsurePullabilityBatchInput { peerdb_peers.Peer peer_connection_config = 1; string flow_job_name = 2; repeated string source_table_identifiers = 3; + bool check_constraints = 4; } message PostgresTableIdentifier { diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index c3bad8f946..cf1579fe05 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -47,7 +47,6 @@ export const blankCDCSetting: FlowConnectionConfigs = { syncedAtColName: '', initialCopyOnly: false, idleTimeoutSeconds: 60, - forcePkeyChecks: false, }; export const blankQRepSetting = {