Skip to content

Commit

Permalink
refine check constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 15, 2024
1 parent 14c5b4d commit ffb7d4b
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 25 deletions.
22 changes: 15 additions & 7 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,20 @@ func (c *PostgresConnector) EnsurePullability(
return nil, err
}

tableIdentifierMapping[tableName] = &protos.TableIdentifier{
TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{
PostgresTableIdentifier: &protos.PostgresTableIdentifier{
RelId: relID,
},
},
}

if !req.CheckConstraints {
msg := fmt.Sprintf("[no-constriants] ensured pullability table %s", tableName)

Check failure on line 790 in flow/connectors/postgres/postgres.go

View workflow job for this annotation

GitHub Actions / lint

`constriants` is a misspelling of `constraints` (misspell)
utils.RecordHeartbeatWithRecover(c.ctx, msg)
continue
}

replicaIdentity, replErr := c.getReplicaIdentityType(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
Expand All @@ -789,17 +803,11 @@ func (c *PostgresConnector) EnsurePullability(
}

// we only allow no primary key if the table has REPLICA IDENTITY FULL
// this is ok for replica identity idex as we populate the primary key columns
if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

tableIdentifierMapping[tableName] = &protos.TableIdentifier{
TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{
PostgresTableIdentifier: &protos.PostgresTableIdentifier{
RelId: relID,
},
},
}
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
}

Expand Down
1 change: 0 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
additionalTablesWorkflowCfg.DoInitialCopy = true
additionalTablesWorkflowCfg.InitialCopyOnly = true
additionalTablesWorkflowCfg.ForcePkeyChecks = true
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables
additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName,
strings.ToLower(shared.RandomString(8)))
Expand Down
23 changes: 11 additions & 12 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
func (s *SetupFlowExecution) ensurePullability(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
checkConstraints bool,
) error {
s.logger.Info("ensuring pullability for peer flow - ", s.CDCFlowName)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 15 * time.Minute,
})
tmpMap := make(map[uint32]string)

srcTblIdentifiers := maps.Keys(s.tableNameMapping)
sort.Strings(srcTblIdentifiers)
Expand All @@ -121,6 +121,7 @@ func (s *SetupFlowExecution) ensurePullability(
PeerConnectionConfig: config.Source,
FlowJobName: s.CDCFlowName,
SourceTableIdentifiers: srcTblIdentifiers,
CheckConstraints: checkConstraints,
}

future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput)
Expand All @@ -133,6 +134,7 @@ func (s *SetupFlowExecution) ensurePullability(
sortedTableNames := maps.Keys(ensurePullabilityOutput.TableIdentifierMapping)
sort.Strings(sortedTableNames)

tmpMap := make(map[uint32]string)
for _, tableName := range sortedTableNames {
tableIdentifier := ensurePullabilityOutput.TableIdentifierMapping[tableName]
switch typedTableIdentifier := tableIdentifier.TableIdentifier.(type) {
Expand Down Expand Up @@ -187,11 +189,10 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sourceTables)

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.CDCFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly &&
!flowConnectionConfigs.ForcePkeyChecks,
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.CDCFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialCopyOnly,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down Expand Up @@ -272,13 +273,11 @@ func (s *SetupFlowExecution) executeSetupFlow(
return nil, fmt.Errorf("failed to check connections and setup metadata tables: %w", err)
}

// override for dynamic adding of tables
if !config.InitialCopyOnly || config.ForcePkeyChecks {
// then ensure pullability
if err := s.ensurePullability(ctx, config); err != nil {
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}
checkConstraints := !config.InitialCopyOnly
if err := s.ensurePullability(ctx, config, checkConstraints); err != nil {
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

// for initial copy only flows, we don't need to create the raw table
if !config.InitialCopyOnly {
// then create the raw table
Expand Down
5 changes: 1 addition & 4 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ message FlowConnectionConfigs {
bool initial_copy_only = 26;

int64 idle_timeout_seconds = 27;

// make InitialCopyOnly enable EnsurePullability for the use case of dynamic table addition
// TODO: find a better way to do this.
bool force_pkey_checks = 28;
}

message RenameTableOption {
Expand Down Expand Up @@ -156,6 +152,7 @@ message EnsurePullabilityBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
string flow_job_name = 2;
repeated string source_table_identifiers = 3;
bool check_constraints = 4;
}

message PostgresTableIdentifier {
Expand Down
1 change: 0 additions & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ export const blankCDCSetting: FlowConnectionConfigs = {
syncedAtColName: '',
initialCopyOnly: false,
idleTimeoutSeconds: 60,
forcePkeyChecks: false,
};

export const blankQRepSetting = {
Expand Down

0 comments on commit ffb7d4b

Please sign in to comment.