Skip to content

Commit

Permalink
fix ref to val
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Sep 14, 2023
1 parent 386ac93 commit 5668fdc
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (p *PostgresCDCSource) consumeStream(

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: pkeyColVal,
PkeyColVal: *pkeyColVal,
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
Expand All @@ -252,7 +252,7 @@ func (p *PostgresCDCSource) consumeStream(
}
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: pkeyColVal,
PkeyColVal: *pkeyColVal,
}
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
Expand Down
3 changes: 2 additions & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (r *RecordItems) ToJSON() (string, error) {
if err != nil {
return "", err
}

return string(jsonBytes), nil
}

Expand Down Expand Up @@ -233,7 +234,7 @@ func (r *DeleteRecord) GetItems() *RecordItems {

type TableWithPkey struct {
TableName string
PkeyColVal interface{}
PkeyColVal qvalue.QValue
}

type RecordBatch struct {
Expand Down

0 comments on commit 5668fdc

Please sign in to comment.