Skip to content

Commit

Permalink
refine check constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored and heavycrystal committed Jan 17, 2024
1 parent bf825f4 commit 769870f
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 19 deletions.
14 changes: 11 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,16 @@ func (c *PostgresConnector) EnsurePullability(
return nil, err
}

tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{
RelId: relID,
}

if !req.CheckConstraints {
msg := fmt.Sprintf("[no-constriants] ensured pullability table %s", tableName)
utils.RecordHeartbeatWithRecover(c.ctx, msg)
continue
}

replicaIdentity, replErr := c.getReplicaIdentityType(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
Expand All @@ -799,13 +809,11 @@ func (c *PostgresConnector) EnsurePullability(
}

// we only allow no primary key if the table has REPLICA IDENTITY FULL
// this is ok for replica identity idex as we populate the primary key columns
if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{
RelId: relID,
}
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
}

Expand Down
1 change: 0 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
additionalTablesWorkflowCfg.DoInitialCopy = true
additionalTablesWorkflowCfg.InitialCopyOnly = true
additionalTablesWorkflowCfg.ForcePkeyChecks = true
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables
additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName,
strings.ToLower(shared.RandomString(8)))
Expand Down
25 changes: 11 additions & 14 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
func (s *SetupFlowExecution) ensurePullability(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
checkConstraints bool,
) (map[uint32]string, error) {
s.logger.Info("ensuring pullability for peer flow - ", s.cdcFlowName)

Expand All @@ -116,6 +117,7 @@ func (s *SetupFlowExecution) ensurePullability(
PeerConnectionConfig: config.Source,
FlowJobName: s.cdcFlowName,
SourceTableIdentifiers: srcTblIdentifiers,
CheckConstraints: checkConstraints,
}

future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput)
Expand Down Expand Up @@ -177,11 +179,10 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sourceTables)

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly &&
!flowConnectionConfigs.ForcePkeyChecks,
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down Expand Up @@ -262,16 +263,12 @@ func (s *SetupFlowExecution) executeSetupFlow(
}

setupFlowOutput := protos.SetupFlowOutput{}
// for initial copy only flows, we don't need to ensure pullability or create the raw table
// as we don't need the primary key requirement.
if !config.InitialSnapshotOnly {
// then ensure pullability
srcTableIdNameMapping, err := s.ensurePullability(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}
setupFlowOutput.SrcTableIdNameMapping = srcTableIdNameMapping
checkConstraints := !config.InitialSnapshotOnly
srcTableIdNameMapping, err := s.ensurePullability(ctx, config, checkConstraints)
setupFlowOutput.SrcTableIdNameMapping = srcTableIdNameMapping

// for initial copy only flows, we don't need to create the raw table
if !config.InitialSnapshotOnly {
// then create the raw table
if err := s.createRawTable(ctx, config); err != nil {
return nil, fmt.Errorf("failed to create raw table: %w", err)
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ message EnsurePullabilityBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
string flow_job_name = 2;
repeated string source_table_identifiers = 3;
bool check_constraints = 4;
}

message PostgresTableIdentifier {
Expand Down
1 change: 0 additions & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export const blankCDCSetting: FlowConnectionConfigs = {
syncedAtColName: '',
initialSnapshotOnly: false,
idleTimeoutSeconds: 60,
forcePkeyChecks: false,
};

export const blankQRepSetting = {
Expand Down

0 comments on commit 769870f

Please sign in to comment.