Skip to content

Commit

Permalink
Fix time encoding
Browse files Browse the repository at this point in the history
- add a test for this as well
  • Loading branch information
iskakaushik committed Dec 29, 2023
1 parent 1619c93 commit 9c6fce4
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *cdcRecordsStore) initPebbleDB() error {
gob.Register(&model.InsertRecord{})
gob.Register(&model.UpdateRecord{})
gob.Register(&model.DeleteRecord{})
gob.Register(&time.Time{})
gob.Register(time.Time{})
gob.Register(&big.Rat{})

var err error
Expand Down
67 changes: 62 additions & 5 deletions flow/connectors/utils/cdc_records/cdc_records_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,42 @@ package cdc_records

import (
"crypto/rand"
"math/big"
"testing"
"time"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/stretchr/testify/require"
)

func getTimeForTesting(t *testing.T) time.Time {
tv, err := time.Parse(time.RFC3339, "2021-08-01T08:02:00Z")
require.NoError(t, err)

millisToAdd := 716
tv = tv.Add(time.Millisecond * time.Duration(millisToAdd))

microSecondsToAdd := 506
tv = tv.Add(time.Microsecond * time.Duration(microSecondsToAdd))

return tv
}

func getRatForTesting(t *testing.T) *big.Rat {
return big.NewRat(123456789, 987654321)
}

func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) {
t.Helper()

pkeyColVal := make([]byte, 32)
_, err := rand.Read(pkeyColVal)
require.NoError(t, err)

tv := getTimeForTesting(t)
rv := getRatForTesting(t)

key := model.TableWithPkey{
TableName: "test_src_tbl",
PkeyColVal: [32]byte(pkeyColVal),
Expand All @@ -26,11 +48,25 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) {
CheckPointID: 1,
CommitID: 2,
Items: &model.RecordItems{
ColToValIdx: map[string]int{"id": 0},
Values: []qvalue.QValue{{
Kind: qvalue.QValueKindInt64,
Value: 1,
}},
ColToValIdx: map[string]int{
"id": 0,
"ts": 1,
"rv": 2,
},
Values: []qvalue.QValue{
{
Kind: qvalue.QValueKindInt64,
Value: 1,
},
{
Kind: qvalue.QValueKindTime,
Value: tv,
},
{
Kind: qvalue.QValueKindNumeric,
Value: rv,
},
},
},
}
return key, rec
Expand Down Expand Up @@ -85,3 +121,24 @@ func TestRecordsTillSpill(t *testing.T) {

require.NoError(t, cdcRecordsStore.Close())
}

func TestTimeAndRatEncoding(t *testing.T) {
t.Parallel()

cdcRecordsStore := NewCDCRecordsStore("test_time_encoding")
cdcRecordsStore.numRecordsSwitchThreshold = 0

key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
require.NoError(t, err)

retreived, ok, err := cdcRecordsStore.Get(key)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, rec, retreived)

_, err = retreived.GetItems().ToJSON()
require.NoError(t, err)

require.NoError(t, cdcRecordsStore.Close())
}

0 comments on commit 9c6fce4

Please sign in to comment.