From 0d24790d290ad48c62dc15ccf551c49f68946d13 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 27 Nov 2023 23:10:20 +0530 Subject: [PATCH] fixing minor review comment --- flow/connectors/postgres/cdc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 8f8c108405..7c4f38941f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -319,12 +319,12 @@ func (p *PostgresCDCSource) consumeStream( TableName: tableName, PkeyColVal: compositePKeyString, } - _, ok := tablePKeyLastSeen[tablePkeyVal] + recIndex, ok := tablePKeyLastSeen[tablePkeyVal] if !ok { addRecord(rec) tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1 } else { - oldRec := localRecords[tablePKeyLastSeen[tablePkeyVal]] + oldRec := localRecords[recIndex] // iterate through unchanged toast cols and set them in new record updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems()) for _, col := range updatedCols { @@ -362,9 +362,9 @@ func (p *PostgresCDCSource) consumeStream( TableName: tableName, PkeyColVal: compositePKeyString, } - _, ok := tablePKeyLastSeen[tablePkeyVal] + recIndex, ok := tablePKeyLastSeen[tablePkeyVal] if ok { - latestRecord := localRecords[tablePKeyLastSeen[tablePkeyVal]] + latestRecord := localRecords[recIndex] deleteRecord := rec.(*model.DeleteRecord) deleteRecord.Items = latestRecord.GetItems() updateRecord, ok := latestRecord.(*model.UpdateRecord)