Skip to content

Commit

Permalink
replica identity check for pull
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 18, 2023
1 parent ec342ea commit 722d8c6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
17 changes: 17 additions & 0 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ func (c *PostgresConnector) getRelIDForTable(schemaTable *SchemaTable) (uint32,
return relID, nil
}

// getReplicaIdentity returns the replica identity for a table.
func (c *PostgresConnector) getReplicaIdentityForTable(schemaTable *SchemaTable) (string, error) {
relID, relIDErr := c.getRelIDForTable(schemaTable)
if relIDErr != nil {
return "", fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr)
}

var replicaIdentity rune
err := c.pool.QueryRow(c.ctx,
`SELECT relreplident FROM pg_class WHERE oid = $1;`,
relID).Scan(&replicaIdentity)
if err != nil {
return "", fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err)
}
return string(replicaIdentity), nil
}

// getPrimaryKeyColumn for table returns the primary key column for a given table
// errors if there is no primary key column or if there is more than one primary key column.
func (c *PostgresConnector) getPrimaryKeyColumn(schemaTable *SchemaTable) (string, error) {
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,10 @@ func (c *PostgresConnector) getTableSchemaForTable(

pkey, err := c.getPrimaryKeyColumn(schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
replicaIdentity, err := c.getReplicaIdentityForTable(schemaTable)
if err != nil || replicaIdentity != "f" {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
}

res := &protos.TableSchema{
Expand Down

0 comments on commit 722d8c6

Please sign in to comment.