From 722d8c6338e6881516b3150a54f1798e5a15b4ee Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 18 Sep 2023 22:31:26 +0530 Subject: [PATCH] replica identity check for pull --- flow/connectors/postgres/client.go | 17 +++++++++++++++++ flow/connectors/postgres/postgres.go | 5 ++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index b10938f405..c18636d1c2 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -86,6 +86,23 @@ func (c *PostgresConnector) getRelIDForTable(schemaTable *SchemaTable) (uint32, return relID, nil } +// getReplicaIdentity returns the replica identity for a table. +func (c *PostgresConnector) getReplicaIdentityForTable(schemaTable *SchemaTable) (string, error) { + relID, relIDErr := c.getRelIDForTable(schemaTable) + if relIDErr != nil { + return "", fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr) + } + + var replicaIdentity rune + err := c.pool.QueryRow(c.ctx, + `SELECT relreplident FROM pg_class WHERE oid = $1;`, + relID).Scan(&replicaIdentity) + if err != nil { + return "", fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err) + } + return string(replicaIdentity), nil +} + // getPrimaryKeyColumn for table returns the primary key column for a given table // errors if there is no primary key column or if there is more than one primary key column. func (c *PostgresConnector) getPrimaryKeyColumn(schemaTable *SchemaTable) (string, error) { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 078a912ce5..83801ac2f7 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -568,7 +568,10 @@ func (c *PostgresConnector) getTableSchemaForTable( pkey, err := c.getPrimaryKeyColumn(schemaTable) if err != nil { - return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) + replicaIdentity, err := c.getReplicaIdentityForTable(schemaTable) + if err != nil || replicaIdentity != "f" { + return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) + } } res := &protos.TableSchema{