Skip to content

Commit

Permalink
implement specialized geometry/hstore comparison since using connecto…
Browse files Browse the repository at this point in the history
…r we have proper parsing of pg types now in tests
  • Loading branch information
serprex committed Feb 14, 2024
1 parent 5a538b1 commit 4f067d7
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 37 deletions.
16 changes: 8 additions & 8 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG {
}

func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) {
err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, tableName)
err := e2e.CreateTableForQRep(s.Conn(), s.suffix, tableName)
require.NoError(s.t, err)
err = e2e.PopulateSourceTable(s.conn.Conn(), s.suffix, tableName, rowCount)
err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount)
require.NoError(s.t, err)
}

Expand All @@ -98,7 +98,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie
"SELECT 1 FROM %s dst "+
"WHERE src.my_mood::text = dst.my_mood::text)) LIMIT 1;", srcSchemaQualified,
dstSchemaQualified)
err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&exists)
err := s.Conn().QueryRow(context.Background(), query).Scan(&exists)
if err != nil {
return err
}
Expand All @@ -112,7 +112,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie
func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error {
query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified,
selector, dstSchemaQualified)
rows, err := s.conn.Conn().Query(context.Background(), query, pgx.QueryExecModeExec)
rows, err := s.Conn().Query(context.Background(), query, pgx.QueryExecModeExec)
if err != nil {
return err
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualif
func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error {
query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified)

rows, _ := s.conn.Conn().Query(context.Background(), query)
rows, _ := s.Conn().Query(context.Background(), query)

defer rows.Close()
for rows.Next() {
Expand All @@ -166,12 +166,12 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error {

func (s PeerFlowE2ETestSuitePG) RunInt64Query(query string) (int64, error) {
var count pgtype.Int8
err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&count)
err := s.Conn().QueryRow(context.Background(), query).Scan(&count)
return count.Int64, err
}

func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
setupTx, err := s.conn.Conn().Begin(context.Background())
setupTx, err := s.Conn().Begin(context.Background())
require.NoError(s.t, err)
// setup 3 tables in pgpeer_repl_test schema
// test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {

dstTable := "test_qrep_flow_avro_pg_2"

err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, dstTable)
err := e2e.CreateTableForQRep(s.Conn(), s.suffix, dstTable)
require.NoError(s.t, err)

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
Expand Down
15 changes: 0 additions & 15 deletions flow/geo/geo.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,3 @@ func GeoToWKB(wkt string) ([]byte, error) {

return geometryObject.ToWKB(), nil
}

// compares WKTs
func GeoCompare(wkt1, wkt2 string) bool {
geom1, geoErr := geom.NewGeomFromWKT(wkt1)
if geoErr != nil {
return false
}

geom2, geoErr := geom.NewGeomFromWKT(wkt2)
if geoErr != nil {
return false
}

return geom1.Equals(geom2)
}
46 changes: 32 additions & 14 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package qvalue

import (
"bytes"
"encoding/json"
"fmt"
"math"
"math/big"
Expand All @@ -11,9 +12,8 @@ import (

"cloud.google.com/go/civil"
"github.com/google/uuid"

"github.com/PeerDB-io/peer-flow/geo"
hstore_util "github.com/PeerDB-io/peer-flow/hstore"
"github.com/jackc/pgx/v5/pgtype"
geom "github.com/twpayne/go-geos"
)

// if new types are added, register them in gob - cdc_records_storage.go
Expand Down Expand Up @@ -66,6 +66,10 @@ func (q QValue) Equals(other QValue) bool {
return compareJSON(q.Value, other.Value)
case QValueKindBit:
return compareBit(q.Value, other.Value)
case QValueKindGeometry:
return compareGeometry(q.Value, other.Value)
case QValueKindHStore:
return compareHstore(q.Value, other.Value)
case QValueKindArrayFloat32:
return compareNumericArrays(q.Value, other.Value)
case QValueKindArrayFloat64:
Expand All @@ -82,9 +86,9 @@ func (q QValue) Equals(other QValue) bool {
return compareBoolArrays(q.Value, other.Value)
case QValueKindArrayString:
return compareArrayString(q.Value, other.Value)
default:
return false
}

return false
}

func (q QValue) GoTimeConvert() (string, error) {
Expand Down Expand Up @@ -250,20 +254,34 @@ func compareString(value1, value2 interface{}) bool {
if !ok1 || !ok2 {
return false
}
if str1 == str2 {
return true
return str1 == str2
}

func compareHstore(value1, value2 interface{}) bool {
bytes, err := json.Marshal(value1.(pgtype.Hstore))
if err != nil {
panic(err)
}
return string(bytes) == value2.(string)
}

// Catch matching HStore
parsedHstore1, err := hstore_util.ParseHstore(str1)
if err == nil && parsedHstore1 == str2 {
return true
func compareGeometry(value1, value2 interface{}) bool {
geo1, ok := value1.(*geom.Geom)
if !ok {
panic("value1 not geometry")
}

// Catch matching WKB(in Postgres)-WKT(in destination) geo values
geoConvertedWKT, err := geo.GeoValidate(str1)
str2, ok := value2.(string)
if !ok {
panic("value2 not string")
}

geo2, err := geom.NewGeomFromWKT(str2)
if err != nil {
panic(err)
}

return err == nil && geo.GeoCompare(geoConvertedWKT, str2)
return geo1.Equals(geo2)
}

func compareStruct(value1, value2 interface{}) bool {
Expand Down

0 comments on commit 4f067d7

Please sign in to comment.