From 769870ff15e92e7f31c2ec0223722df9536096a1 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 15 Jan 2024 16:13:38 -0500 Subject: [PATCH] refine check constraints --- flow/connectors/postgres/postgres.go | 14 +++++++++++--- flow/workflows/cdc_flow.go | 1 - flow/workflows/setup_flow.go | 25 +++++++++++-------------- protos/flow.proto | 1 + ui/app/mirrors/create/helpers/common.ts | 1 - 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 92f734428f..40c1249378 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -788,6 +788,16 @@ func (c *PostgresConnector) EnsurePullability( return nil, err } + tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{ + RelId: relID, + } + + if !req.CheckConstraints { + msg := fmt.Sprintf("[no-constriants] ensured pullability table %s", tableName) + utils.RecordHeartbeatWithRecover(c.ctx, msg) + continue + } + replicaIdentity, replErr := c.getReplicaIdentityType(schemaTable) if replErr != nil { return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr) @@ -799,13 +809,11 @@ func (c *PostgresConnector) EnsurePullability( } // we only allow no primary key if the table has REPLICA IDENTITY FULL + // this is ok for replica identity idex as we populate the primary key columns if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) { return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) } - tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{ - RelId: relID, - } utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index a705491164..7e2b6f1175 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -206,7 +206,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesWorkflowCfg.DoInitialCopy = true additionalTablesWorkflowCfg.InitialCopyOnly = true - additionalTablesWorkflowCfg.ForcePkeyChecks = true additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName, strings.ToLower(shared.RandomString(8))) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index aa53114c87..92637880eb 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -100,6 +100,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( func (s *SetupFlowExecution) ensurePullability( ctx workflow.Context, config *protos.FlowConnectionConfigs, + checkConstraints bool, ) (map[uint32]string, error) { s.logger.Info("ensuring pullability for peer flow - ", s.cdcFlowName) @@ -116,6 +117,7 @@ func (s *SetupFlowExecution) ensurePullability( PeerConnectionConfig: config.Source, FlowJobName: s.cdcFlowName, SourceTableIdentifiers: srcTblIdentifiers, + CheckConstraints: checkConstraints, } future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput) @@ -177,11 +179,10 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( sort.Strings(sourceTables) tableSchemaInput := &protos.GetTableSchemaBatchInput{ - PeerConnectionConfig: flowConnectionConfigs.Source, - TableIdentifiers: sourceTables, - FlowName: s.cdcFlowName, - SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly && - !flowConnectionConfigs.ForcePkeyChecks, + PeerConnectionConfig: flowConnectionConfigs.Source, + TableIdentifiers: sourceTables, + FlowName: s.cdcFlowName, + SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly, } future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) @@ -262,16 +263,12 @@ func (s *SetupFlowExecution) executeSetupFlow( } setupFlowOutput := protos.SetupFlowOutput{} - // for initial copy only flows, we don't need to ensure pullability or create the raw table - // as we don't need the primary key requirement. - if !config.InitialSnapshotOnly { - // then ensure pullability - srcTableIdNameMapping, err := s.ensurePullability(ctx, config) - if err != nil { - return nil, fmt.Errorf("failed to ensure pullability: %w", err) - } - setupFlowOutput.SrcTableIdNameMapping = srcTableIdNameMapping + checkConstraints := !config.InitialSnapshotOnly + srcTableIdNameMapping, err := s.ensurePullability(ctx, config, checkConstraints) + setupFlowOutput.SrcTableIdNameMapping = srcTableIdNameMapping + // for initial copy only flows, we don't need to create the raw table + if !config.InitialSnapshotOnly { // then create the raw table if err := s.createRawTable(ctx, config); err != nil { return nil, fmt.Errorf("failed to create raw table: %w", err) diff --git a/protos/flow.proto b/protos/flow.proto index b49d6c9bb7..6e1218b3e3 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -140,6 +140,7 @@ message EnsurePullabilityBatchInput { peerdb_peers.Peer peer_connection_config = 1; string flow_job_name = 2; repeated string source_table_identifiers = 3; + bool check_constraints = 4; } message PostgresTableIdentifier { diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index bc8f8e823b..1af51e8d85 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -39,7 +39,6 @@ export const blankCDCSetting: FlowConnectionConfigs = { syncedAtColName: '', initialSnapshotOnly: false, idleTimeoutSeconds: 60, - forcePkeyChecks: false, }; export const blankQRepSetting = {