diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 0f5cf9c33a..65a043df78 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -72,9 +72,9 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { } func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, tableName) + err := e2e.CreateTableForQRep(s.Conn(), s.suffix, tableName) require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.conn.Conn(), s.suffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount) require.NoError(s.t, err) } @@ -98,7 +98,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie "SELECT 1 FROM %s dst "+ "WHERE src.my_mood::text = dst.my_mood::text)) LIMIT 1;", srcSchemaQualified, dstSchemaQualified) - err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&exists) + err := s.Conn().QueryRow(context.Background(), query).Scan(&exists) if err != nil { return err } @@ -112,7 +112,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) - rows, err := s.conn.Conn().Query(context.Background(), query, pgx.QueryExecModeExec) + rows, err := s.Conn().Query(context.Background(), query, pgx.QueryExecModeExec) if err != nil { return err } @@ -146,7 +146,7 @@ func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualif func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) - rows, _ := s.conn.Conn().Query(context.Background(), query) + rows, _ := s.Conn().Query(context.Background(), query) defer rows.Close() for rows.Next() { @@ -166,12 +166,12 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { func (s PeerFlowE2ETestSuitePG) RunInt64Query(query string) (int64, error) { var count pgtype.Int8 - err := s.conn.Conn().QueryRow(context.Background(), query).Scan(&count) + err := s.Conn().QueryRow(context.Background(), query).Scan(&count) return count.Int64, err } func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { - setupTx, err := s.conn.Conn().Begin(context.Background()) + setupTx, err := s.Conn().Begin(context.Background()) require.NoError(s.t, err) // setup 3 tables in pgpeer_repl_test schema // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 @@ -220,7 +220,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { dstTable := "test_qrep_flow_avro_pg_2" - err := e2e.CreateTableForQRep(s.conn.Conn(), s.suffix, dstTable) + err := e2e.CreateTableForQRep(s.Conn(), s.suffix, dstTable) require.NoError(s.t, err) srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) diff --git a/flow/geo/geo.go b/flow/geo/geo.go index 882ed97dba..9640173973 100644 --- a/flow/geo/geo.go +++ b/flow/geo/geo.go @@ -43,18 +43,3 @@ func GeoToWKB(wkt string) ([]byte, error) { return geometryObject.ToWKB(), nil } - -// compares WKTs -func GeoCompare(wkt1, wkt2 string) bool { - geom1, geoErr := geom.NewGeomFromWKT(wkt1) - if geoErr != nil { - return false - } - - geom2, geoErr := geom.NewGeomFromWKT(wkt2) - if geoErr != nil { - return false - } - - return geom1.Equals(geom2) -} diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index b0b556b3d7..588dc1e4bb 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -2,6 +2,7 @@ package qvalue import ( "bytes" + "encoding/json" "fmt" "math" "math/big" @@ -11,9 +12,8 @@ import ( "cloud.google.com/go/civil" "github.com/google/uuid" - - "github.com/PeerDB-io/peer-flow/geo" - hstore_util "github.com/PeerDB-io/peer-flow/hstore" + "github.com/jackc/pgx/v5/pgtype" + geom "github.com/twpayne/go-geos" ) // if new types are added, register them in gob - cdc_records_storage.go @@ -66,6 +66,10 @@ func (q QValue) Equals(other QValue) bool { return compareJSON(q.Value, other.Value) case QValueKindBit: return compareBit(q.Value, other.Value) + case QValueKindGeometry: + return compareGeometry(q.Value, other.Value) + case QValueKindHStore: + return compareHstore(q.Value, other.Value) case QValueKindArrayFloat32: return compareNumericArrays(q.Value, other.Value) case QValueKindArrayFloat64: @@ -82,9 +86,9 @@ func (q QValue) Equals(other QValue) bool { return compareBoolArrays(q.Value, other.Value) case QValueKindArrayString: return compareArrayString(q.Value, other.Value) + default: + return false } - - return false } func (q QValue) GoTimeConvert() (string, error) { @@ -250,20 +254,34 @@ func compareString(value1, value2 interface{}) bool { if !ok1 || !ok2 { return false } - if str1 == str2 { - return true + return str1 == str2 +} + +func compareHstore(value1, value2 interface{}) bool { + bytes, err := json.Marshal(value1.(pgtype.Hstore)) + if err != nil { + panic(err) } + return string(bytes) == value2.(string) +} - // Catch matching HStore - parsedHstore1, err := hstore_util.ParseHstore(str1) - if err == nil && parsedHstore1 == str2 { - return true +func compareGeometry(value1, value2 interface{}) bool { + geo1, ok := value1.(*geom.Geom) + if !ok { + panic("value1 not geometry") } - // Catch matching WKB(in Postgres)-WKT(in destination) geo values - geoConvertedWKT, err := geo.GeoValidate(str1) + str2, ok := value2.(string) + if !ok { + panic("value2 not string") + } + + geo2, err := geom.NewGeomFromWKT(str2) + if err != nil { + panic(err) + } - return err == nil && geo.GeoCompare(geoConvertedWKT, str2) + return geo1.Equals(geo2) } func compareStruct(value1, value2 interface{}) bool {