Skip to content

Commit

Permalink
some renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 23, 2024
1 parent 36f4049 commit dbd9add
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
10 changes: 6 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ func (c *PostgresConnector) getReplicaIdentityType(schemaTable *utils.SchemaTabl
return ReplicaIdentityType(replicaIdentity), nil
}

// getPrimaryKeyColumns returns the primary key columns for a given table.
// Errors if there is no primary key column or if there is more than one primary key column.
func (c *PostgresConnector) getPrimaryKeyColumns(
// getUniqueColumns returns the unique columns (used to select in MERGE statement) for a given table.
// For replica identity 'd'/default, these are the primary key columns
// For replica identity 'i'/index, these are the columns in the selected index (indisreplident set)
// For replica identity 'f'/full, if there is a primary key we use that
func (c *PostgresConnector) getUniqueColumns(
replicaIdentity ReplicaIdentityType,
schemaTable *utils.SchemaTable,
) ([]string, error) {
Expand Down Expand Up @@ -162,7 +164,7 @@ func (c *PostgresConnector) getReplicaIdentityIndexColumns(relID uint32, schemaT
// Fetch the OID of the index used as the replica identity
err := c.pool.QueryRow(c.ctx,
`SELECT indexrelid FROM pg_index
WHERE indrelid = $1 AND indisreplident = true`,
WHERE indrelid=$1 AND indisreplident=true`,
relID).Scan(&indexRelID)
if err != nil {
return nil, fmt.Errorf("error finding replica identity index for table %s: %w", schemaTable, err)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func (c *PostgresConnector) getTableSchemaForTable(
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting replica identity for table %s: %w", schemaTable, err)
}
pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentityType, schemaTable)
pKeyCols, err := c.getUniqueColumns(replicaIdentityType, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting primary key column for table %s: %w", schemaTable, err)
}
Expand Down Expand Up @@ -790,7 +790,7 @@ func (c *PostgresConnector) EnsurePullability(
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentity, schemaTable)
pKeyCols, err := c.getUniqueColumns(replicaIdentity, schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
Expand Down

0 comments on commit dbd9add

Please sign in to comment.