From 9c6fce438ee8665f74af3b182f1594aea90c1812 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 29 Dec 2023 11:54:30 -0500 Subject: [PATCH 1/2] Fix time encoding - add a test for this as well --- .../utils/cdc_records/cdc_records_storage.go | 2 +- .../cdc_records/cdc_records_storage_test.go | 67 +++++++++++++++++-- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 51bd000ef9..c6dece9eb9 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -55,7 +55,7 @@ func (c *cdcRecordsStore) initPebbleDB() error { gob.Register(&model.InsertRecord{}) gob.Register(&model.UpdateRecord{}) gob.Register(&model.DeleteRecord{}) - gob.Register(&time.Time{}) + gob.Register(time.Time{}) gob.Register(&big.Rat{}) var err error diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 13aff7f00c..50dc42dc0e 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -2,13 +2,32 @@ package cdc_records import ( "crypto/rand" + "math/big" "testing" + "time" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/stretchr/testify/require" ) +func getTimeForTesting(t *testing.T) time.Time { + tv, err := time.Parse(time.RFC3339, "2021-08-01T08:02:00Z") + require.NoError(t, err) + + millisToAdd := 716 + tv = tv.Add(time.Millisecond * time.Duration(millisToAdd)) + + microSecondsToAdd := 506 + tv = tv.Add(time.Microsecond * time.Duration(microSecondsToAdd)) + + return tv +} + +func getRatForTesting(t *testing.T) *big.Rat { + return big.NewRat(123456789, 987654321) +} + func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { t.Helper() @@ -16,6 +35,9 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { _, err := rand.Read(pkeyColVal) require.NoError(t, err) + tv := getTimeForTesting(t) + rv := getRatForTesting(t) + key := model.TableWithPkey{ TableName: "test_src_tbl", PkeyColVal: [32]byte(pkeyColVal), @@ -26,11 +48,25 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { CheckPointID: 1, CommitID: 2, Items: &model.RecordItems{ - ColToValIdx: map[string]int{"id": 0}, - Values: []qvalue.QValue{{ - Kind: qvalue.QValueKindInt64, - Value: 1, - }}, + ColToValIdx: map[string]int{ + "id": 0, + "ts": 1, + "rv": 2, + }, + Values: []qvalue.QValue{ + { + Kind: qvalue.QValueKindInt64, + Value: 1, + }, + { + Kind: qvalue.QValueKindTime, + Value: tv, + }, + { + Kind: qvalue.QValueKindNumeric, + Value: rv, + }, + }, }, } return key, rec @@ -85,3 +121,24 @@ func TestRecordsTillSpill(t *testing.T) { require.NoError(t, cdcRecordsStore.Close()) } + +func TestTimeAndRatEncoding(t *testing.T) { + t.Parallel() + + cdcRecordsStore := NewCDCRecordsStore("test_time_encoding") + cdcRecordsStore.numRecordsSwitchThreshold = 0 + + key, rec := genKeyAndRec(t) + err := cdcRecordsStore.Set(key, rec) + require.NoError(t, err) + + retreived, ok, err := cdcRecordsStore.Get(key) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, retreived) + + _, err = retreived.GetItems().ToJSON() + require.NoError(t, err) + + require.NoError(t, cdcRecordsStore.Close()) +} From 7544c2f2ceded1d83cd4ffa4ac801f85e8ab114e Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 29 Dec 2023 23:10:16 +0530 Subject: [PATCH 2/2] fixing lints pt.1 --- flow/connectors/utils/cdc_records/cdc_records_storage_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 50dc42dc0e..9aea00ae18 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -12,6 +12,7 @@ import ( ) func getTimeForTesting(t *testing.T) time.Time { + t.Helper() tv, err := time.Parse(time.RFC3339, "2021-08-01T08:02:00Z") require.NoError(t, err) @@ -25,6 +26,7 @@ func getTimeForTesting(t *testing.T) time.Time { } func getRatForTesting(t *testing.T) *big.Rat { + t.Helper() return big.NewRat(123456789, 987654321) }