Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Sep 14, 2023
1 parent ca5deee commit 386ac93
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
11 changes: 9 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ func (p *PostgresCDCSource) consumeStream(
// should be ideally sourceTableName as we are in pullRecrods.
// will change in future
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal := rec.GetItems().GetColumnValue(pkeyCol)
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: pkeyColVal,
Expand All @@ -242,7 +246,10 @@ func (p *PostgresCDCSource) consumeStream(
}
case *model.InsertRecord:
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal := rec.GetItems().GetColumnValue(pkeyCol)
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: pkeyColVal,
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/postgres/postgres_cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,11 @@ func (suite *PostgresCDCTestSuite) validateMutatedToastRecords(records []model.R
suite.Equal(dstTableName, updateRecord.DestinationTableName)
items := updateRecord.NewItems
suite.Equal(2, items.Len())
v := items.GetColumnValue("id")
v, err := items.GetValueByColName("id")
suite.NoError(err, "Error fetching id")
suite.Equal(int32(1), v.Value.(int32))
v = items.GetColumnValue("n_t")
v, err = items.GetValueByColName("n_t")
suite.NoError(err, "Error fetching n_t")
suite.Equal(qvalue.QValueKindString, v.Kind)
suite.Equal(65536, len(v.Value.(string)))
suite.Equal(3, len(updateRecord.UnchangedToastColumns))
Expand Down
19 changes: 9 additions & 10 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,27 @@ func NewRecordItemWithData(data map[string]*qvalue.QValue) *RecordItems {
}

func (r *RecordItems) AddColumn(col string, val *qvalue.QValue) {
if _, ok := r.colToValIdx[col]; ok {
return
if idx, ok := r.colToValIdx[col]; ok {
r.values[idx] = val
} else {
r.colToValIdx[col] = len(r.values)
r.values = append(r.values, val)
}

r.colToValIdx[col] = len(r.values)
r.values = append(r.values, val)
}

func (r *RecordItems) GetColumnValue(col string) *qvalue.QValue {
idx, ok := r.colToValIdx[col]
if !ok {
return nil
if idx, ok := r.colToValIdx[col]; ok {
return r.values[idx]
}
return r.values[idx]
return nil
}

// UpdateIfNotExists takes in a RecordItems as input and updates the values of the
// current RecordItems with the values from the input RecordItems for the columns
// that are present in the input RecordItems but not in the current RecordItems.
// We return the slice of col names that were updated.
func (r *RecordItems) UpdateIfNotExists(input *RecordItems) []string {
var updatedCols []string
updatedCols := make([]string, 0)
for col, idx := range input.colToValIdx {
if _, ok := r.colToValIdx[col]; !ok {
r.colToValIdx[col] = len(r.values)
Expand Down

0 comments on commit 386ac93

Please sign in to comment.