Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica Identity Check #392

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ func (c *PostgresConnector) getRelIDForTable(schemaTable *SchemaTable) (uint32,
return relID, nil
}

// getReplicaIdentity returns the replica identity for a table.
func (c *PostgresConnector) getReplicaIdentityForTable(schemaTable *SchemaTable) (string, error) {
relID, relIDErr := c.getRelIDForTable(schemaTable)
if relIDErr != nil {
return "", fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr)
}

var replicaIdentity rune
err := c.pool.QueryRow(c.ctx,
`SELECT relreplident FROM pg_class WHERE oid = $1;`,
relID).Scan(&replicaIdentity)
if err != nil {
return "", fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err)
}
return string(replicaIdentity), nil
}

// getPrimaryKeyColumn for table returns the primary key column for a given table
// errors if there is no primary key column or if there is more than one primary key column.
func (c *PostgresConnector) getPrimaryKeyColumn(schemaTable *SchemaTable) (string, error) {
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,10 @@ func (c *PostgresConnector) getTableSchemaForTable(

pkey, err := c.getPrimaryKeyColumn(schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
replicaIdentity, err := c.getReplicaIdentityForTable(schemaTable)
if err != nil || replicaIdentity != "f" {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
}

res := &protos.TableSchema{
Expand Down