diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 90a773ea54..6ec2bc29a3 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -121,6 +121,9 @@ func (c *PostgresConnector) getReplicaIdentityType(schemaTable *utils.SchemaTabl if err != nil { return ReplicaIdentityDefault, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err) } + if replicaIdentity == rune(ReplicaIdentityNothing) { + return ReplicaIdentityType(replicaIdentity), fmt.Errorf("table %s has replica identity 'n'/NOTHING", schemaTable) + } return ReplicaIdentityType(replicaIdentity), nil } @@ -128,7 +131,7 @@ func (c *PostgresConnector) getReplicaIdentityType(schemaTable *utils.SchemaTabl // getUniqueColumns returns the unique columns (used to select in MERGE statement) for a given table. // For replica identity 'd'/default, these are the primary key columns // For replica identity 'i'/index, these are the columns in the selected index (indisreplident set) -// For replica identity 'f'/full, if there is a primary key we use that +// For replica identity 'f'/full, if there is a primary key we use that, else we return all columns func (c *PostgresConnector) getUniqueColumns( replicaIdentity ReplicaIdentityType, schemaTable *utils.SchemaTable, @@ -142,13 +145,13 @@ func (c *PostgresConnector) getUniqueColumns( return c.getReplicaIdentityIndexColumns(relID, schemaTable) } - // Find the primary key index OID + // Find the primary key index OID, for replica identity 'd'/default or 'f'/full var pkIndexOID oid.Oid err = c.pool.QueryRow(c.ctx, `SELECT indexrelid FROM pg_index WHERE indrelid = $1 AND indisprimary`, relID).Scan(&pkIndexOID) if err != nil { - // don't error out if no pkey columns, this would happen in EnsurePullability or UI. + // don't error out if no pkey index, this would happen in EnsurePullability or UI. if err == pgx.ErrNoRows { return []string{}, nil } @@ -615,8 +618,8 @@ func (c *PostgresConnector) CheckSourceTables(tableNames []string, pubName strin var pubTableCount int err := c.pool.QueryRow(c.ctx, fmt.Sprintf(` with source_table_components (sname, tname) as (values %s) - select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables - INNER JOIN source_table_components stc + select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables + INNER JOIN source_table_components stc ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount) if err != nil { return err diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index c69767abfc..eb55931172 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -640,6 +640,10 @@ func (c *PostgresConnector) getTableSchemaForTable( if err = rows.Err(); err != nil { return nil, fmt.Errorf("error iterating over table schema: %w", err) } + // if we have no pkey, we will use all columns as the pkey for the MERGE statement + if replicaIdentityType == ReplicaIdentityFull && len(pKeyCols) == 0 { + pKeyCols = columnNames + } return &protos.TableSchema{ TableIdentifier: tableName, @@ -797,7 +801,7 @@ func (c *PostgresConnector) EnsurePullability( // we only allow no primary key if the table has REPLICA IDENTITY FULL // this is ok for replica identity index as we populate the primary key columns - if len(pKeyCols) == 0 && !(replicaIdentity == ReplicaIdentityFull) { + if len(pKeyCols) == 0 && replicaIdentity != ReplicaIdentityFull { return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) }