diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 1ba00ad453..22ca6e516e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -228,7 +228,7 @@ func (p *PostgresCDCSource) consumeStream( tablePkeyVal := model.TableWithPkey{ TableName: tableName, - PkeyColVal: pkeyColVal, + PkeyColVal: *pkeyColVal, } _, ok := records.TablePKeyLastSeen[tablePkeyVal] if !ok { @@ -252,7 +252,7 @@ func (p *PostgresCDCSource) consumeStream( } tablePkeyVal := model.TableWithPkey{ TableName: tableName, - PkeyColVal: pkeyColVal, + PkeyColVal: *pkeyColVal, } records.Records = append(records.Records, rec) // all columns will be set in insert record, so add it to the map diff --git a/flow/model/model.go b/flow/model/model.go index 6764ca87bd..3984ad2848 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -145,6 +145,7 @@ func (r *RecordItems) ToJSON() (string, error) { if err != nil { return "", err } + return string(jsonBytes), nil } @@ -233,7 +234,7 @@ func (r *DeleteRecord) GetItems() *RecordItems { type TableWithPkey struct { TableName string - PkeyColVal interface{} + PkeyColVal qvalue.QValue } type RecordBatch struct {