Skip to content

Commit

Permalink
Fix replica identity for pull records (#394)
Browse files Browse the repository at this point in the history
- Includes a field in TableSchema proto denoting whether the table has
replica identity full or not. This information is used in Postgres pull
flow.
  • Loading branch information
Amogh-Bharadwaj authored Sep 18, 2023
1 parent 5372df9 commit d2bb6ae
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 293 deletions.
70 changes: 40 additions & 30 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,45 +218,55 @@ func (p *PostgresCDCSource) consumeStream(
switch r := rec.(type) {
case *model.UpdateRecord:
// tableName here is destination tableName.
// should be ideally sourceTableName as we are in pullRecrods.
// should be ideally sourceTableName as we are in PullRecords.
// will change in future
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
records.Records = append(records.Records, rec)
} else {
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: *pkeyColVal,
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: *pkeyColVal,
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
} else {
oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]]
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
}
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
case *model.InsertRecord:
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
} else {
oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]]
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: *pkeyColVal,
}
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
}
case *model.InsertRecord:
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: *pkeyColVal,
}
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
case *model.DeleteRecord:
records.Records = append(records.Records, rec)
case *model.RelationRecord:
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ func (c *PostgresConnector) getRelIDForTable(schemaTable *SchemaTable) (uint32,
}

// getReplicaIdentity returns the replica identity for a table.
func (c *PostgresConnector) getReplicaIdentityForTable(schemaTable *SchemaTable) (string, error) {
func (c *PostgresConnector) isTableFullReplica(schemaTable *SchemaTable) (bool, error) {
relID, relIDErr := c.getRelIDForTable(schemaTable)
if relIDErr != nil {
return "", fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr)
return false, fmt.Errorf("failed to get relation id for table %s: %w", schemaTable, relIDErr)
}

var replicaIdentity rune
err := c.pool.QueryRow(c.ctx,
`SELECT relreplident FROM pg_class WHERE oid = $1;`,
relID).Scan(&replicaIdentity)
if err != nil {
return "", fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err)
return false, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, err)
}
return string(replicaIdentity), nil
return string(replicaIdentity) == "f", nil
}

// getPrimaryKeyColumn for table returns the primary key column for a given table
Expand Down
15 changes: 10 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,18 +566,23 @@ func (c *PostgresConnector) getTableSchemaForTable(
}
defer rows.Close()

isFullReplica, replErr := c.isTableFullReplica(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pkey, err := c.getPrimaryKeyColumn(schemaTable)
if err != nil {
replicaIdentity, err := c.getReplicaIdentityForTable(schemaTable)
if err != nil || replicaIdentity != "f" {
if !isFullReplica {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
}

res := &protos.TableSchema{
TableIdentifier: tableName,
Columns: make(map[string]string),
PrimaryKeyColumn: pkey,
TableIdentifier: tableName,
Columns: make(map[string]string),
PrimaryKeyColumn: pkey,
IsReplicaIdentityFull: isFullReplica,
}

for _, fieldDescription := range rows.FieldDescriptions() {
Expand Down
Loading

0 comments on commit d2bb6ae

Please sign in to comment.