From dbd9add4fb4503937a195aa4f2ffeceffdd67709 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 24 Jan 2024 01:26:44 +0530 Subject: [PATCH] some renaming --- flow/connectors/postgres/client.go | 10 ++++++---- flow/connectors/postgres/postgres.go | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 408ae491e3..2813eb7e59 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -125,9 +125,11 @@ func (c *PostgresConnector) getReplicaIdentityType(schemaTable *utils.SchemaTabl return ReplicaIdentityType(replicaIdentity), nil } -// getPrimaryKeyColumns returns the primary key columns for a given table. -// Errors if there is no primary key column or if there is more than one primary key column. -func (c *PostgresConnector) getPrimaryKeyColumns( +// getUniqueColumns returns the unique columns (used to select in MERGE statement) for a given table. +// For replica identity 'd'/default, these are the primary key columns +// For replica identity 'i'/index, these are the columns in the selected index (indisreplident set) +// For replica identity 'f'/full, if there is a primary key we use that +func (c *PostgresConnector) getUniqueColumns( replicaIdentity ReplicaIdentityType, schemaTable *utils.SchemaTable, ) ([]string, error) { @@ -162,7 +164,7 @@ func (c *PostgresConnector) getReplicaIdentityIndexColumns(relID uint32, schemaT // Fetch the OID of the index used as the replica identity err := c.pool.QueryRow(c.ctx, `SELECT indexrelid FROM pg_index - WHERE indrelid = $1 AND indisreplident = true`, + WHERE indrelid=$1 AND indisreplident=true`, relID).Scan(&indexRelID) if err != nil { return nil, fmt.Errorf("error finding replica identity index for table %s: %w", schemaTable, err) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index af092152f3..b7811c8208 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -605,7 +605,7 @@ func (c *PostgresConnector) getTableSchemaForTable( if err != nil { return nil, fmt.Errorf("[getTableSchema] error getting replica identity for table %s: %w", schemaTable, err) } - pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentityType, schemaTable) + pKeyCols, err := c.getUniqueColumns(replicaIdentityType, schemaTable) if err != nil { return nil, fmt.Errorf("[getTableSchema] error getting primary key column for table %s: %w", schemaTable, err) } @@ -790,7 +790,7 @@ func (c *PostgresConnector) EnsurePullability( return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr) } - pKeyCols, err := c.getPrimaryKeyColumns(replicaIdentity, schemaTable) + pKeyCols, err := c.getUniqueColumns(replicaIdentity, schemaTable) if err != nil { return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err) }