diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index b10938f40..c18636d1c 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -86,6 +86,23 @@ func (c *PostgresConnector) getRelIDForTable(schemaTable *SchemaTable) (uint32, return relID, nil } +// getReplicaIdentity returns the replica identity for a table. +func (c *PostgresConnector) getReplicaIdentityForTable(schemaTable *SchemaTable) (string, error) { + relID, relIDErr := c.getRelIDForTable(schemaTable) + if relIDErr != nil { + return "", fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr) + } + + var replicaIdentity rune + err := c.pool.QueryRow(c.ctx, + `SELECT relreplident FROM pg_class WHERE oid = $1;`, + relID).Scan(&replicaIdentity) + if err != nil { + return "", fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err) + } + return string(replicaIdentity), nil +} + // getPrimaryKeyColumn for table returns the primary key column for a given table // errors if there is no primary key column or if there is more than one primary key column. func (c *PostgresConnector) getPrimaryKeyColumn(schemaTable *SchemaTable) (string, error) { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 078a912ce..83801ac2f 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -568,7 +568,10 @@ func (c *PostgresConnector) getTableSchemaForTable( pkey, err := c.getPrimaryKeyColumn(schemaTable) if err != nil { - return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) + replicaIdentity, err := c.getReplicaIdentityForTable(schemaTable) + if err != nil || replicaIdentity != "f" { + return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) + } } res := &protos.TableSchema{