Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove SkipPkeyAndReplicaCheck #1107

Merged
merged 4 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ func (c *PostgresConnector) getReplicaIdentityType(schemaTable *utils.SchemaTabl
return ReplicaIdentityType(replicaIdentity), nil
}

// getPrimaryKeyColumns returns the primary key columns for a given table.
// Errors if there is no primary key column or if there is more than one primary key column.
func (c *PostgresConnector) getPrimaryKeyColumns(
// getUniqueColumns returns the unique columns (used to select in MERGE statement) for a given table.
// For replica identity 'd'/default, these are the primary key columns
// For replica identity 'i'/index, these are the columns in the selected index (indisreplident set)
// For replica identity 'f'/full, if there is a primary key we use that
func (c *PostgresConnector) getUniqueColumns(
replicaIdentity ReplicaIdentityType,
schemaTable *utils.SchemaTable,
) ([]string, error) {
Expand All @@ -146,6 +148,10 @@ func (c *PostgresConnector) getPrimaryKeyColumns(
`SELECT indexrelid FROM pg_index WHERE indrelid = $1 AND indisprimary`,
relID).Scan(&pkIndexOID)
if err != nil {
// don't error out if no pkey columns, this would happen in EnsurePullability or UI.
if err == pgx.ErrNoRows {
return []string{}, nil
}
return nil, fmt.Errorf("error finding primary key index for table %s: %w", schemaTable, err)
}

Expand All @@ -158,7 +164,7 @@ func (c *PostgresConnector) getReplicaIdentityIndexColumns(relID uint32, schemaT
// Fetch the OID of the index used as the replica identity
err := c.pool.QueryRow(c.ctx,
`SELECT indexrelid FROM pg_index
WHERE indrelid = $1 AND indisreplident = true`,
WHERE indrelid=$1 AND indisreplident=true`,
relID).Scan(&indexRelID)
if err != nil {
return nil, fmt.Errorf("error finding replica identity index for table %s: %w", schemaTable, err)
Expand Down
26 changes: 9 additions & 17 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func (c *PostgresConnector) GetTableSchema(
) (*protos.GetTableSchemaBatchOutput, error) {
res := make(map[string]*protos.TableSchema)
for _, tableName := range req.TableIdentifiers {
tableSchema, err := c.getTableSchemaForTable(tableName, req.SkipPkeyAndReplicaCheck)
tableSchema, err := c.getTableSchemaForTable(tableName)
if err != nil {
return nil, err
}
Expand All @@ -595,27 +595,19 @@ func (c *PostgresConnector) GetTableSchema(

func (c *PostgresConnector) getTableSchemaForTable(
tableName string,
skipPkeyAndReplicaCheck bool,
) (*protos.TableSchema, error) {
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
return nil, err
}

var pKeyCols []string
var replicaIdentityType ReplicaIdentityType
if !skipPkeyAndReplicaCheck {
var replErr error
replicaIdentityType, replErr = c.getReplicaIdentityType(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("[getTableSchema]:error getting replica identity for table %s: %w", schemaTable, replErr)
}

var err error
pKeyCols, err = c.getPrimaryKeyColumns(replicaIdentityType, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema]:error getting primary key column for table %s: %w", schemaTable, err)
}
replicaIdentityType, err := c.getReplicaIdentityType(schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting replica identity for table %s: %w", schemaTable, err)
}
pKeyCols, err := c.getUniqueColumns(replicaIdentityType, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting primary key column for table %s: %w", schemaTable, err)
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
}

// Get the column names and types
Expand Down Expand Up @@ -798,7 +790,7 @@ func (c *PostgresConnector) EnsurePullability(
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentity, schemaTable)
pKeyCols, err := c.getUniqueColumns(replicaIdentity, schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
Expand Down
7 changes: 3 additions & 4 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,9 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
})

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
SkipPkeyAndReplicaCheck: true,
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
FlowName: q.config.FlowJobName,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
7 changes: 3 additions & 4 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,9 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sort.Strings(sourceTables)

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
SkipPkeyAndReplicaCheck: flowConnectionConfigs.InitialSnapshotOnly,
PeerConnectionConfig: flowConnectionConfigs.Source,
TableIdentifiers: sourceTables,
FlowName: s.cdcFlowName,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
1 change: 0 additions & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ message GetTableSchemaBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
repeated string table_identifiers = 2;
string flow_name = 3;
bool skip_pkey_and_replica_check = 4;
}

message GetTableSchemaBatchOutput {
Expand Down
Loading