diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 8f8c108405..7c4f38941f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -319,12 +319,12 @@ func (p *PostgresCDCSource) consumeStream( TableName: tableName, PkeyColVal: compositePKeyString, } - _, ok := tablePKeyLastSeen[tablePkeyVal] + recIndex, ok := tablePKeyLastSeen[tablePkeyVal] if !ok { addRecord(rec) tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1 } else { - oldRec := localRecords[tablePKeyLastSeen[tablePkeyVal]] + oldRec := localRecords[recIndex] // iterate through unchanged toast cols and set them in new record updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems()) for _, col := range updatedCols { @@ -362,9 +362,9 @@ func (p *PostgresCDCSource) consumeStream( TableName: tableName, PkeyColVal: compositePKeyString, } - _, ok := tablePKeyLastSeen[tablePkeyVal] + recIndex, ok := tablePKeyLastSeen[tablePkeyVal] if ok { - latestRecord := localRecords[tablePKeyLastSeen[tablePkeyVal]] + latestRecord := localRecords[recIndex] deleteRecord := rec.(*model.DeleteRecord) deleteRecord.Items = latestRecord.GetItems() updateRecord, ok := latestRecord.(*model.UpdateRecord)