Skip to content

Commit

Permalink
RecordItems: only track values in map[string]qvalue.QValue (#1562)
Browse files Browse the repository at this point in the history
We never make use of slice ordering
  • Loading branch information
serprex authored Apr 2, 2024
1 parent 360e19e commit c26ed67
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 79 deletions.
8 changes: 4 additions & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *pglogrepl.RelationMessage,
exclude map[string]struct{},
) (*model.RecordItems, map[string]struct{}, error) {
) (model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
return model.NewRecordItems(0), make(map[string]struct{}), nil
Expand All @@ -613,19 +613,19 @@ func (p *PostgresCDCSource) convertTupleToMap(
/* bytea also appears here as a hex */
data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.TextFormatCode)
if err != nil {
return nil, nil, fmt.Errorf("error decoding text column data: %w", err)
return model.RecordItems{}, nil, fmt.Errorf("error decoding text column data: %w", err)
}
items.AddColumn(colName, data)
case 'b': // binary
data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.BinaryFormatCode)
if err != nil {
return nil, nil, fmt.Errorf("error decoding binary column data: %w", err)
return model.RecordItems{}, nil, fmt.Errorf("error decoding binary column data: %w", err)
}
items.AddColumn(colName, data)
case 'u': // unchanged toast
unchangedToastColumns[colName] = struct{}{}
default:
return nil, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType))
return model.RecordItems{}, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType))
}
}
return items, unchangedToastColumns, nil
Expand Down
15 changes: 5 additions & 10 deletions flow/connectors/utils/cdc_records/cdc_records_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,11 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) {
SourceTableName: "test_src_tbl",
DestinationTableName: "test_dst_tbl",
CommitID: 2,
Items: &model.RecordItems{
ColToValIdx: map[string]int{
"id": 0,
"ts": 1,
"rv": 2,
},
Values: []qvalue.QValue{
qvalue.QValueInt64{Val: 1},
qvalue.QValueTime{Val: tv},
qvalue.QValueNumeric{Val: rv},
Items: model.RecordItems{
ColToVal: map[string]qvalue.QValue{
"id": qvalue.QValueInt64{Val: 1},
"ts": qvalue.QValueTime{Val: tv},
"rv": qvalue.QValueNumeric{Val: rv},
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *InsertRecord) GetSourceTableName() string {
return r.SourceTableName
}

func (r *InsertRecord) GetItems() *RecordItems {
func (r *InsertRecord) GetItems() RecordItems {
return r.Items
}

Expand Down
18 changes: 9 additions & 9 deletions flow/model/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Record interface {
GetDestinationTableName() string
GetSourceTableName() string
// get columns and values for the record
GetItems() *RecordItems
GetItems() RecordItems
PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)
}

Expand All @@ -33,7 +33,7 @@ func (r *BaseRecord) GetCommitTime() time.Time {

type InsertRecord struct {
// Items is a map of column name to value.
Items *RecordItems
Items RecordItems
// Name of the source table
SourceTableName string
// Name of the destination table
Expand All @@ -49,9 +49,9 @@ func (r *InsertRecord) GetDestinationTableName() string {

type UpdateRecord struct {
// OldItems is a map of column name to value.
OldItems *RecordItems
OldItems RecordItems
// NewItems is a map of column name to value.
NewItems *RecordItems
NewItems RecordItems
// unchanged toast columns
UnchangedToastColumns map[string]struct{}
// Name of the source table
Expand All @@ -69,7 +69,7 @@ func (r *UpdateRecord) GetSourceTableName() string {
return r.SourceTableName
}

func (r *UpdateRecord) GetItems() *RecordItems {
func (r *UpdateRecord) GetItems() RecordItems {
return r.NewItems
}

Expand All @@ -82,7 +82,7 @@ func (r *UpdateRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts

type DeleteRecord struct {
// Items is a map of column name to value.
Items *RecordItems
Items RecordItems
// unchanged toast columns, filled from latest UpdateRecord
UnchangedToastColumns map[string]struct{}
// Name of the source table
Expand All @@ -100,7 +100,7 @@ func (r *DeleteRecord) GetSourceTableName() string {
return r.SourceTableName
}

func (r *DeleteRecord) GetItems() *RecordItems {
func (r *DeleteRecord) GetItems() RecordItems {
return r.Items
}

Expand All @@ -125,8 +125,8 @@ func (r *RelationRecord) GetSourceTableName() string {
return r.TableSchemaDelta.SrcTableName
}

func (r *RelationRecord) GetItems() *RecordItems {
return nil
func (r *RelationRecord) GetItems() RecordItems {
return RecordItems{ColToVal: nil}
}

func (r *RelationRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
Expand Down
70 changes: 26 additions & 44 deletions flow/model/record_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package model

import (
"encoding/json"
"errors"
"fmt"
"math"

Expand All @@ -14,78 +13,61 @@ import (

// encoding/gob cannot encode unexported fields
type RecordItems struct {
ColToValIdx map[string]int
Values []qvalue.QValue
ColToVal map[string]qvalue.QValue
}

func NewRecordItems(capacity int) *RecordItems {
return &RecordItems{
ColToValIdx: make(map[string]int, capacity),
Values: make([]qvalue.QValue, 0, capacity),
func NewRecordItems(capacity int) RecordItems {
return RecordItems{
ColToVal: make(map[string]qvalue.QValue, capacity),
}
}

func NewRecordItemWithData(cols []string, val []qvalue.QValue) *RecordItems {
func NewRecordItemWithData(cols []string, val []qvalue.QValue) RecordItems {
recordItem := NewRecordItems(len(cols))
for i, col := range cols {
recordItem.ColToValIdx[col] = len(recordItem.Values)
recordItem.Values = append(recordItem.Values, val[i])
recordItem.ColToVal[col] = val[i]
}
return recordItem
}

func (r *RecordItems) AddColumn(col string, val qvalue.QValue) {
if idx, ok := r.ColToValIdx[col]; ok {
r.Values[idx] = val
} else {
r.ColToValIdx[col] = len(r.Values)
r.Values = append(r.Values, val)
}
func (r RecordItems) AddColumn(col string, val qvalue.QValue) {
r.ColToVal[col] = val
}

func (r *RecordItems) GetColumnValue(col string) qvalue.QValue {
if idx, ok := r.ColToValIdx[col]; ok {
return r.Values[idx]
}
return nil
func (r RecordItems) GetColumnValue(col string) qvalue.QValue {
return r.ColToVal[col]
}

// UpdateIfNotExists takes in a RecordItems as input and updates the values of the
// current RecordItems with the values from the input RecordItems for the columns
// that are present in the input RecordItems but not in the current RecordItems.
// We return the slice of col names that were updated.
func (r *RecordItems) UpdateIfNotExists(input *RecordItems) []string {
func (r RecordItems) UpdateIfNotExists(input RecordItems) []string {
updatedCols := make([]string, 0)
for col, idx := range input.ColToValIdx {
if _, ok := r.ColToValIdx[col]; !ok {
r.ColToValIdx[col] = len(r.Values)
r.Values = append(r.Values, input.Values[idx])
for col, val := range input.ColToVal {
if _, ok := r.ColToVal[col]; !ok {
r.ColToVal[col] = val
updatedCols = append(updatedCols, col)
}
}
return updatedCols
}

func (r *RecordItems) GetValueByColName(colName string) (qvalue.QValue, error) {
idx, ok := r.ColToValIdx[colName]
func (r RecordItems) GetValueByColName(colName string) (qvalue.QValue, error) {
val, ok := r.ColToVal[colName]
if !ok {
return nil, fmt.Errorf("column name %s not found", colName)
}
return r.Values[idx], nil
return val, nil
}

func (r *RecordItems) Len() int {
return len(r.Values)
func (r RecordItems) Len() int {
return len(r.ColToVal)
}

func (r *RecordItems) toMap(hstoreAsJSON bool, opts ToJSONOptions) (map[string]interface{}, error) {
if r.ColToValIdx == nil {
return nil, errors.New("colToValIdx is nil")
}

jsonStruct := make(map[string]interface{}, len(r.ColToValIdx))
for col, idx := range r.ColToValIdx {
qv := r.Values[idx]
func (r RecordItems) toMap(hstoreAsJSON bool, opts ToJSONOptions) (map[string]interface{}, error) {
jsonStruct := make(map[string]interface{}, len(r.ColToVal))
for col, qv := range r.ColToVal {
if qv == nil {
jsonStruct[col] = nil
continue
Expand Down Expand Up @@ -224,20 +206,20 @@ func (r *RecordItems) toMap(hstoreAsJSON bool, opts ToJSONOptions) (map[string]i
return jsonStruct, nil
}

func (r *RecordItems) ToJSONWithOptions(options ToJSONOptions) (string, error) {
func (r RecordItems) ToJSONWithOptions(options ToJSONOptions) (string, error) {
bytes, err := r.MarshalJSONWithOptions(options)
return string(bytes), err
}

func (r *RecordItems) ToJSON() (string, error) {
func (r RecordItems) ToJSON() (string, error) {
return r.ToJSONWithOptions(NewToJSONOptions(nil, true))
}

func (r *RecordItems) MarshalJSON() ([]byte, error) {
func (r RecordItems) MarshalJSON() ([]byte, error) {
return r.MarshalJSONWithOptions(NewToJSONOptions(nil, true))
}

func (r *RecordItems) MarshalJSONWithOptions(opts ToJSONOptions) ([]byte, error) {
func (r RecordItems) MarshalJSONWithOptions(opts ToJSONOptions) ([]byte, error) {
jsonStruct, err := r.toMap(opts.HStoreAsJSON, opts)
if err != nil {
return nil, err
Expand Down
24 changes: 13 additions & 11 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

var (
LuaRecord = glua64.UserDataType[model.Record]{Name: "peerdb_record"}
LuaRow = glua64.UserDataType[*model.RecordItems]{Name: "peerdb_row"}
LuaRow = glua64.UserDataType[model.RecordItems]{Name: "peerdb_row"}
LuaTime = glua64.UserDataType[time.Time]{Name: "peerdb_time"}
LuaUuid = glua64.UserDataType[uuid.UUID]{Name: "peerdb_uuid"}
LuaBigInt = glua64.UserDataType[*big.Int]{Name: "peerdb_bigint"}
Expand Down Expand Up @@ -120,7 +120,7 @@ func LoadPeerdbScript(ls *lua.LState) int {
return 1
}

func GetRowQ(ls *lua.LState, row *model.RecordItems, col string) qvalue.QValue {
func GetRowQ(ls *lua.LState, row model.RecordItems, col string) qvalue.QValue {
qv, err := row.GetValueByColName(col)
if err != nil {
ls.RaiseError(err.Error())
Expand All @@ -137,15 +137,17 @@ func LuaRowIndex(ls *lua.LState) int {

func LuaRowLen(ls *lua.LState) int {
row := LuaRow.StartMethod(ls)
ls.Push(lua.LNumber(len(row.Values)))
ls.Push(lua.LNumber(len(row.ColToVal)))
return 1
}

func LuaRowColumns(ls *lua.LState) int {
row := LuaRow.StartMethod(ls)
tbl := ls.CreateTable(len(row.ColToValIdx), 0)
for col, idx := range row.ColToValIdx {
tbl.RawSetInt(idx+1, lua.LString(col))
tbl := ls.CreateTable(len(row.ColToVal), 0)
idx := 0
for col := range row.ColToVal {
idx += 1
tbl.RawSetInt(idx, lua.LString(col))
}
ls.Push(tbl)
return 1
Expand Down Expand Up @@ -173,33 +175,33 @@ func LuaRecordIndex(ls *lua.LState) int {
}
case "row":
items := record.GetItems()
if items != nil {
if items.ColToVal != nil {
ls.Push(LuaRow.New(ls, items))
} else {
ls.Push(lua.LNil)
}
case "old":
var items *model.RecordItems
var items model.RecordItems
switch rec := record.(type) {
case *model.UpdateRecord:
items = rec.OldItems
case *model.DeleteRecord:
items = rec.Items
}
if items != nil {
if items.ColToVal != nil {
ls.Push(LuaRow.New(ls, items))
} else {
ls.Push(lua.LNil)
}
case "new":
var items *model.RecordItems
var items model.RecordItems
switch rec := record.(type) {
case *model.InsertRecord:
items = rec.Items
case *model.UpdateRecord:
items = rec.NewItems
}
if items != nil {
if items.ColToVal != nil {
ls.Push(LuaRow.New(ls, items))
} else {
ls.Push(lua.LNil)
Expand Down

0 comments on commit c26ed67

Please sign in to comment.