diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 46aaab5d4e..047f42f6ee 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -590,7 +590,7 @@ func (p *PostgresCDCSource) convertTupleToMap( tuple *pglogrepl.TupleData, rel *pglogrepl.RelationMessage, exclude map[string]struct{}, -) (*model.RecordItems, map[string]struct{}, error) { +) (model.RecordItems, map[string]struct{}, error) { // if the tuple is nil, return an empty map if tuple == nil { return model.NewRecordItems(0), make(map[string]struct{}), nil @@ -613,19 +613,19 @@ func (p *PostgresCDCSource) convertTupleToMap( /* bytea also appears here as a hex */ data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.TextFormatCode) if err != nil { - return nil, nil, fmt.Errorf("error decoding text column data: %w", err) + return model.RecordItems{}, nil, fmt.Errorf("error decoding text column data: %w", err) } items.AddColumn(colName, data) case 'b': // binary data, err := p.decodeColumnData(col.Data, rel.Columns[idx].DataType, pgtype.BinaryFormatCode) if err != nil { - return nil, nil, fmt.Errorf("error decoding binary column data: %w", err) + return model.RecordItems{}, nil, fmt.Errorf("error decoding binary column data: %w", err) } items.AddColumn(colName, data) case 'u': // unchanged toast unchangedToastColumns[colName] = struct{}{} default: - return nil, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType)) + return model.RecordItems{}, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType)) } } return items, unchangedToastColumns, nil diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 0d0b6da9e2..3d4756067b 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -54,16 +54,11 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { SourceTableName: "test_src_tbl", DestinationTableName: "test_dst_tbl", CommitID: 2, - Items: &model.RecordItems{ - ColToValIdx: map[string]int{ - "id": 0, - "ts": 1, - "rv": 2, - }, - Values: []qvalue.QValue{ - qvalue.QValueInt64{Val: 1}, - qvalue.QValueTime{Val: tv}, - qvalue.QValueNumeric{Val: rv}, + Items: model.RecordItems{ + ColToVal: map[string]qvalue.QValue{ + "id": qvalue.QValueInt64{Val: 1}, + "ts": qvalue.QValueTime{Val: tv}, + "rv": qvalue.QValueNumeric{Val: rv}, }, }, } diff --git a/flow/model/model.go b/flow/model/model.go index 6e29554132..53f4f28652 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -70,7 +70,7 @@ func (r *InsertRecord) GetSourceTableName() string { return r.SourceTableName } -func (r *InsertRecord) GetItems() *RecordItems { +func (r *InsertRecord) GetItems() RecordItems { return r.Items } diff --git a/flow/model/record.go b/flow/model/record.go index b3977943fe..8d7f20e484 100644 --- a/flow/model/record.go +++ b/flow/model/record.go @@ -12,7 +12,7 @@ type Record interface { GetDestinationTableName() string GetSourceTableName() string // get columns and values for the record - GetItems() *RecordItems + GetItems() RecordItems PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) } @@ -33,7 +33,7 @@ func (r *BaseRecord) GetCommitTime() time.Time { type InsertRecord struct { // Items is a map of column name to value. - Items *RecordItems + Items RecordItems // Name of the source table SourceTableName string // Name of the destination table @@ -49,9 +49,9 @@ func (r *InsertRecord) GetDestinationTableName() string { type UpdateRecord struct { // OldItems is a map of column name to value. - OldItems *RecordItems + OldItems RecordItems // NewItems is a map of column name to value. - NewItems *RecordItems + NewItems RecordItems // unchanged toast columns UnchangedToastColumns map[string]struct{} // Name of the source table @@ -69,7 +69,7 @@ func (r *UpdateRecord) GetSourceTableName() string { return r.SourceTableName } -func (r *UpdateRecord) GetItems() *RecordItems { +func (r *UpdateRecord) GetItems() RecordItems { return r.NewItems } @@ -82,7 +82,7 @@ func (r *UpdateRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts type DeleteRecord struct { // Items is a map of column name to value. - Items *RecordItems + Items RecordItems // unchanged toast columns, filled from latest UpdateRecord UnchangedToastColumns map[string]struct{} // Name of the source table @@ -100,7 +100,7 @@ func (r *DeleteRecord) GetSourceTableName() string { return r.SourceTableName } -func (r *DeleteRecord) GetItems() *RecordItems { +func (r *DeleteRecord) GetItems() RecordItems { return r.Items } @@ -125,8 +125,8 @@ func (r *RelationRecord) GetSourceTableName() string { return r.TableSchemaDelta.SrcTableName } -func (r *RelationRecord) GetItems() *RecordItems { - return nil +func (r *RelationRecord) GetItems() RecordItems { + return RecordItems{ColToVal: nil} } func (r *RelationRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { diff --git a/flow/model/record_items.go b/flow/model/record_items.go index 1feb2598c8..b7d18a86fa 100644 --- a/flow/model/record_items.go +++ b/flow/model/record_items.go @@ -2,7 +2,6 @@ package model import ( "encoding/json" - "errors" "fmt" "math" @@ -14,78 +13,61 @@ import ( // encoding/gob cannot encode unexported fields type RecordItems struct { - ColToValIdx map[string]int - Values []qvalue.QValue + ColToVal map[string]qvalue.QValue } -func NewRecordItems(capacity int) *RecordItems { - return &RecordItems{ - ColToValIdx: make(map[string]int, capacity), - Values: make([]qvalue.QValue, 0, capacity), +func NewRecordItems(capacity int) RecordItems { + return RecordItems{ + ColToVal: make(map[string]qvalue.QValue, capacity), } } -func NewRecordItemWithData(cols []string, val []qvalue.QValue) *RecordItems { +func NewRecordItemWithData(cols []string, val []qvalue.QValue) RecordItems { recordItem := NewRecordItems(len(cols)) for i, col := range cols { - recordItem.ColToValIdx[col] = len(recordItem.Values) - recordItem.Values = append(recordItem.Values, val[i]) + recordItem.ColToVal[col] = val[i] } return recordItem } -func (r *RecordItems) AddColumn(col string, val qvalue.QValue) { - if idx, ok := r.ColToValIdx[col]; ok { - r.Values[idx] = val - } else { - r.ColToValIdx[col] = len(r.Values) - r.Values = append(r.Values, val) - } +func (r RecordItems) AddColumn(col string, val qvalue.QValue) { + r.ColToVal[col] = val } -func (r *RecordItems) GetColumnValue(col string) qvalue.QValue { - if idx, ok := r.ColToValIdx[col]; ok { - return r.Values[idx] - } - return nil +func (r RecordItems) GetColumnValue(col string) qvalue.QValue { + return r.ColToVal[col] } // UpdateIfNotExists takes in a RecordItems as input and updates the values of the // current RecordItems with the values from the input RecordItems for the columns // that are present in the input RecordItems but not in the current RecordItems. // We return the slice of col names that were updated. -func (r *RecordItems) UpdateIfNotExists(input *RecordItems) []string { +func (r RecordItems) UpdateIfNotExists(input RecordItems) []string { updatedCols := make([]string, 0) - for col, idx := range input.ColToValIdx { - if _, ok := r.ColToValIdx[col]; !ok { - r.ColToValIdx[col] = len(r.Values) - r.Values = append(r.Values, input.Values[idx]) + for col, val := range input.ColToVal { + if _, ok := r.ColToVal[col]; !ok { + r.ColToVal[col] = val updatedCols = append(updatedCols, col) } } return updatedCols } -func (r *RecordItems) GetValueByColName(colName string) (qvalue.QValue, error) { - idx, ok := r.ColToValIdx[colName] +func (r RecordItems) GetValueByColName(colName string) (qvalue.QValue, error) { + val, ok := r.ColToVal[colName] if !ok { return nil, fmt.Errorf("column name %s not found", colName) } - return r.Values[idx], nil + return val, nil } -func (r *RecordItems) Len() int { - return len(r.Values) +func (r RecordItems) Len() int { + return len(r.ColToVal) } -func (r *RecordItems) toMap(hstoreAsJSON bool, opts ToJSONOptions) (map[string]interface{}, error) { - if r.ColToValIdx == nil { - return nil, errors.New("colToValIdx is nil") - } - - jsonStruct := make(map[string]interface{}, len(r.ColToValIdx)) - for col, idx := range r.ColToValIdx { - qv := r.Values[idx] +func (r RecordItems) toMap(hstoreAsJSON bool, opts ToJSONOptions) (map[string]interface{}, error) { + jsonStruct := make(map[string]interface{}, len(r.ColToVal)) + for col, qv := range r.ColToVal { if qv == nil { jsonStruct[col] = nil continue @@ -224,20 +206,20 @@ func (r *RecordItems) toMap(hstoreAsJSON bool, opts ToJSONOptions) (map[string]i return jsonStruct, nil } -func (r *RecordItems) ToJSONWithOptions(options ToJSONOptions) (string, error) { +func (r RecordItems) ToJSONWithOptions(options ToJSONOptions) (string, error) { bytes, err := r.MarshalJSONWithOptions(options) return string(bytes), err } -func (r *RecordItems) ToJSON() (string, error) { +func (r RecordItems) ToJSON() (string, error) { return r.ToJSONWithOptions(NewToJSONOptions(nil, true)) } -func (r *RecordItems) MarshalJSON() ([]byte, error) { +func (r RecordItems) MarshalJSON() ([]byte, error) { return r.MarshalJSONWithOptions(NewToJSONOptions(nil, true)) } -func (r *RecordItems) MarshalJSONWithOptions(opts ToJSONOptions) ([]byte, error) { +func (r RecordItems) MarshalJSONWithOptions(opts ToJSONOptions) ([]byte, error) { jsonStruct, err := r.toMap(opts.HStoreAsJSON, opts) if err != nil { return nil, err diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 801c5c6523..ccbd06aed6 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -23,7 +23,7 @@ import ( var ( LuaRecord = glua64.UserDataType[model.Record]{Name: "peerdb_record"} - LuaRow = glua64.UserDataType[*model.RecordItems]{Name: "peerdb_row"} + LuaRow = glua64.UserDataType[model.RecordItems]{Name: "peerdb_row"} LuaTime = glua64.UserDataType[time.Time]{Name: "peerdb_time"} LuaUuid = glua64.UserDataType[uuid.UUID]{Name: "peerdb_uuid"} LuaBigInt = glua64.UserDataType[*big.Int]{Name: "peerdb_bigint"} @@ -120,7 +120,7 @@ func LoadPeerdbScript(ls *lua.LState) int { return 1 } -func GetRowQ(ls *lua.LState, row *model.RecordItems, col string) qvalue.QValue { +func GetRowQ(ls *lua.LState, row model.RecordItems, col string) qvalue.QValue { qv, err := row.GetValueByColName(col) if err != nil { ls.RaiseError(err.Error()) @@ -137,15 +137,17 @@ func LuaRowIndex(ls *lua.LState) int { func LuaRowLen(ls *lua.LState) int { row := LuaRow.StartMethod(ls) - ls.Push(lua.LNumber(len(row.Values))) + ls.Push(lua.LNumber(len(row.ColToVal))) return 1 } func LuaRowColumns(ls *lua.LState) int { row := LuaRow.StartMethod(ls) - tbl := ls.CreateTable(len(row.ColToValIdx), 0) - for col, idx := range row.ColToValIdx { - tbl.RawSetInt(idx+1, lua.LString(col)) + tbl := ls.CreateTable(len(row.ColToVal), 0) + idx := 0 + for col := range row.ColToVal { + idx += 1 + tbl.RawSetInt(idx, lua.LString(col)) } ls.Push(tbl) return 1 @@ -173,33 +175,33 @@ func LuaRecordIndex(ls *lua.LState) int { } case "row": items := record.GetItems() - if items != nil { + if items.ColToVal != nil { ls.Push(LuaRow.New(ls, items)) } else { ls.Push(lua.LNil) } case "old": - var items *model.RecordItems + var items model.RecordItems switch rec := record.(type) { case *model.UpdateRecord: items = rec.OldItems case *model.DeleteRecord: items = rec.Items } - if items != nil { + if items.ColToVal != nil { ls.Push(LuaRow.New(ls, items)) } else { ls.Push(lua.LNil) } case "new": - var items *model.RecordItems + var items model.RecordItems switch rec := record.(type) { case *model.InsertRecord: items = rec.Items case *model.UpdateRecord: items = rec.NewItems } - if items != nil { + if items.ColToVal != nil { ls.Push(LuaRow.New(ls, items)) } else { ls.Push(lua.LNil)