diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 51bd000ef9..c6dece9eb9 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -55,7 +55,7 @@ func (c *cdcRecordsStore) initPebbleDB() error { gob.Register(&model.InsertRecord{}) gob.Register(&model.UpdateRecord{}) gob.Register(&model.DeleteRecord{}) - gob.Register(&time.Time{}) + gob.Register(time.Time{}) gob.Register(&big.Rat{}) var err error diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 13aff7f00c..9aea00ae18 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -2,13 +2,34 @@ package cdc_records import ( "crypto/rand" + "math/big" "testing" + "time" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/stretchr/testify/require" ) +func getTimeForTesting(t *testing.T) time.Time { + t.Helper() + tv, err := time.Parse(time.RFC3339, "2021-08-01T08:02:00Z") + require.NoError(t, err) + + millisToAdd := 716 + tv = tv.Add(time.Millisecond * time.Duration(millisToAdd)) + + microSecondsToAdd := 506 + tv = tv.Add(time.Microsecond * time.Duration(microSecondsToAdd)) + + return tv +} + +func getRatForTesting(t *testing.T) *big.Rat { + t.Helper() + return big.NewRat(123456789, 987654321) +} + func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { t.Helper() @@ -16,6 +37,9 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { _, err := rand.Read(pkeyColVal) require.NoError(t, err) + tv := getTimeForTesting(t) + rv := getRatForTesting(t) + key := model.TableWithPkey{ TableName: "test_src_tbl", PkeyColVal: [32]byte(pkeyColVal), @@ -26,11 +50,25 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { CheckPointID: 1, CommitID: 2, Items: &model.RecordItems{ - ColToValIdx: map[string]int{"id": 0}, - Values: []qvalue.QValue{{ - Kind: qvalue.QValueKindInt64, - Value: 1, - }}, + ColToValIdx: map[string]int{ + "id": 0, + "ts": 1, + "rv": 2, + }, + Values: []qvalue.QValue{ + { + Kind: qvalue.QValueKindInt64, + Value: 1, + }, + { + Kind: qvalue.QValueKindTime, + Value: tv, + }, + { + Kind: qvalue.QValueKindNumeric, + Value: rv, + }, + }, }, } return key, rec @@ -85,3 +123,24 @@ func TestRecordsTillSpill(t *testing.T) { require.NoError(t, cdcRecordsStore.Close()) } + +func TestTimeAndRatEncoding(t *testing.T) { + t.Parallel() + + cdcRecordsStore := NewCDCRecordsStore("test_time_encoding") + cdcRecordsStore.numRecordsSwitchThreshold = 0 + + key, rec := genKeyAndRec(t) + err := cdcRecordsStore.Set(key, rec) + require.NoError(t, err) + + retreived, ok, err := cdcRecordsStore.Get(key) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, retreived) + + _, err = retreived.GetItems().ToJSON() + require.NoError(t, err) + + require.NoError(t, cdcRecordsStore.Close()) +}