Skip to content

Commit

Permalink
fix comparisons for null jsons and geo strings
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 3, 2024
1 parent 2cfcd10 commit fad17cd
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 32 deletions.
7 changes: 4 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/geo"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -362,8 +363,8 @@ func (p *PostgresCDCSource) consumeStream(
if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved {
return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg)
} else {
p.logger.Warn(fmt.Sprintf(
"WAL segment removed, restarting replication retrying in 30 seconds..."),
p.logger.Warn(
"WAL segment removed, restarting replication retrying in 30 seconds...",
slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved))
time.Sleep(30 * time.Second)
continue
Expand Down Expand Up @@ -761,7 +762,7 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
if ok {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
wkt, err := GeoValidate(string(data))
wkt, err := geo.GeoValidate(string(data))
if err != nil {
return qvalue.QValue{
Kind: customQKind,
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/geo"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -446,7 +447,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
wkbString, ok := values[i].(string)
wkt, err := GeoValidate(wkbString)
wkt, err := geo.GeoValidate(wkbString)
if err != nil || !ok {
values[i] = nil
} else {
Expand Down
28 changes: 0 additions & 28 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package connpostgres

import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -14,8 +13,6 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pgx/v5/pgtype"
"github.com/lib/pq/oid"

geom "github.com/twpayne/go-geos"
)

func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
Expand Down Expand Up @@ -407,28 +404,3 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
}
return qValueKind
}

// returns the WKT representation of the geometry object if it is valid
func GeoValidate(hexWkb string) (string, error) {
// Decode the WKB hex string into binary
wkb, hexErr := hex.DecodeString(hexWkb)
if hexErr != nil {
slog.Warn(fmt.Sprintf("Ignoring invalid WKB: %s", hexWkb))
return "", hexErr
}

// UnmarshalWKB performs geometry validation along with WKB parsing
geometryObject, geoErr := geom.NewGeomFromWKB(wkb)
if geoErr != nil {
return "", geoErr
}

invalidReason := geometryObject.IsValidReason()
if invalidReason != "Valid Geometry" {
slog.Warn(fmt.Sprintf("Ignoring invalid geometry shape %s: %s", hexWkb, invalidReason))
return "", errors.New(invalidReason)
}

wkt := geometryObject.ToWKT()
return wkt, nil
}
52 changes: 52 additions & 0 deletions flow/geo/geo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package geo

import (
"encoding/hex"
"errors"
"fmt"
"log/slog"

geom "github.com/twpayne/go-geos"
)

// returns the WKT representation of the geometry object if it is valid
func GeoValidate(hexWkb string) (string, error) {
// Decode the WKB hex string into binary
wkb, hexErr := hex.DecodeString(hexWkb)
if hexErr != nil {
slog.Warn(fmt.Sprintf("Ignoring invalid WKB: %s", hexWkb))
return "", hexErr
}

// UnmarshalWKB performs geometry validation along with WKB parsing
geometryObject, geoErr := geom.NewGeomFromWKB(wkb)
if geoErr != nil {
return "", geoErr
}

invalidReason := geometryObject.IsValidReason()
if invalidReason != "Valid Geometry" {
slog.Warn(fmt.Sprintf("Ignoring invalid geometry shape %s: %s", hexWkb, invalidReason))
return "", errors.New(invalidReason)
}

wkt := geometryObject.ToWKT()
return wkt, nil
}

// compares WKTs
func GeoCompare(wkt1, wkt2 string) bool {
geom1, geoErr := geom.NewGeomFromWKT(wkt1)
if geoErr != nil {
return false
}

geom2, geoErr := geom.NewGeomFromWKT(wkt2)
if geoErr != nil {
return false
}

// Compare the two geometries
compareResult := geom1.Equals(geom2)
return compareResult
}
18 changes: 18 additions & 0 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"time"

"github.com/PeerDB-io/peer-flow/geo"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -204,6 +205,14 @@ func compareString(value1, value2 interface{}) bool {
str1, ok1 := value1.(string)
str2, ok2 := value2.(string)

// Catch matching WKB(in Postgres)-WKT(in destination) geo values
if ok1 && ok2 {
geoConvertedWKT, err := geo.GeoValidate(str1)
if err == nil && geo.GeoCompare(geoConvertedWKT, str2) {
return true
}
}

return ok1 && ok2 && str1 == str2
}

Expand Down Expand Up @@ -267,6 +276,10 @@ func compareNumericArrays(value1, value2 interface{}) bool {
return true
}

if value1 == nil && value2 == "" {
return true
}

// Helper function to convert a value to float64
convertToFloat64 := func(val interface{}) []float64 {
switch v := val.(type) {
Expand Down Expand Up @@ -321,6 +334,11 @@ func compareArrayString(value1, value2 interface{}) bool {
return true
}

// nulls end up as empty 'variants' in snowflake
if value1 == nil && value2 == "" {
return true
}

array1, ok1 := value1.([]string)
array2, ok2 := value2.([]string)

Expand Down

0 comments on commit fad17cd

Please sign in to comment.