diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index d5de8947c5..fcb3e64174 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/geo" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -362,8 +363,8 @@ func (p *PostgresCDCSource) consumeStream( if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved { return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg) } else { - p.logger.Warn(fmt.Sprintf( - "WAL segment removed, restarting replication retrying in 30 seconds..."), + p.logger.Warn( + "WAL segment removed, restarting replication retrying in 30 seconds...", slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved)) time.Sleep(30 * time.Second) continue @@ -761,7 +762,7 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma if ok { customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { - wkt, err := GeoValidate(string(data)) + wkt, err := geo.GeoValidate(string(data)) if err != nil { return qvalue.QValue{ Kind: customQKind, diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 75e258e70d..bb07fbb98f 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -7,6 +7,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/geo" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -446,7 +447,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { wkbString, ok := values[i].(string) - wkt, err := GeoValidate(wkbString) + wkt, err := geo.GeoValidate(wkbString) if err != nil || !ok { values[i] = nil } else { diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index aafa00073e..0037a43fa1 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -1,7 +1,6 @@ package connpostgres import ( - "encoding/hex" "encoding/json" "errors" "fmt" @@ -14,8 +13,6 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" - - geom "github.com/twpayne/go-geos" ) func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { @@ -407,28 +404,3 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { } return qValueKind } - -// returns the WKT representation of the geometry object if it is valid -func GeoValidate(hexWkb string) (string, error) { - // Decode the WKB hex string into binary - wkb, hexErr := hex.DecodeString(hexWkb) - if hexErr != nil { - slog.Warn(fmt.Sprintf("Ignoring invalid WKB: %s", hexWkb)) - return "", hexErr - } - - // UnmarshalWKB performs geometry validation along with WKB parsing - geometryObject, geoErr := geom.NewGeomFromWKB(wkb) - if geoErr != nil { - return "", geoErr - } - - invalidReason := geometryObject.IsValidReason() - if invalidReason != "Valid Geometry" { - slog.Warn(fmt.Sprintf("Ignoring invalid geometry shape %s: %s", hexWkb, invalidReason)) - return "", errors.New(invalidReason) - } - - wkt := geometryObject.ToWKT() - return wkt, nil -} diff --git a/flow/geo/geo.go b/flow/geo/geo.go new file mode 100644 index 0000000000..12f5a3107d --- /dev/null +++ b/flow/geo/geo.go @@ -0,0 +1,52 @@ +package geo + +import ( + "encoding/hex" + "errors" + "fmt" + "log/slog" + + geom "github.com/twpayne/go-geos" +) + +// returns the WKT representation of the geometry object if it is valid +func GeoValidate(hexWkb string) (string, error) { + // Decode the WKB hex string into binary + wkb, hexErr := hex.DecodeString(hexWkb) + if hexErr != nil { + slog.Warn(fmt.Sprintf("Ignoring invalid WKB: %s", hexWkb)) + return "", hexErr + } + + // UnmarshalWKB performs geometry validation along with WKB parsing + geometryObject, geoErr := geom.NewGeomFromWKB(wkb) + if geoErr != nil { + return "", geoErr + } + + invalidReason := geometryObject.IsValidReason() + if invalidReason != "Valid Geometry" { + slog.Warn(fmt.Sprintf("Ignoring invalid geometry shape %s: %s", hexWkb, invalidReason)) + return "", errors.New(invalidReason) + } + + wkt := geometryObject.ToWKT() + return wkt, nil +} + +// compares WKTs +func GeoCompare(wkt1, wkt2 string) bool { + geom1, geoErr := geom.NewGeomFromWKT(wkt1) + if geoErr != nil { + return false + } + + geom2, geoErr := geom.NewGeomFromWKT(wkt2) + if geoErr != nil { + return false + } + + // Compare the two geometries + compareResult := geom1.Equals(geom2) + return compareResult +} diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 4d80b0fb79..ae328ed922 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "github.com/PeerDB-io/peer-flow/geo" "github.com/google/uuid" ) @@ -204,6 +205,14 @@ func compareString(value1, value2 interface{}) bool { str1, ok1 := value1.(string) str2, ok2 := value2.(string) + // Catch matching WKB(in Postgres)-WKT(in destination) geo values + if ok1 && ok2 { + geoConvertedWKT, err := geo.GeoValidate(str1) + if err == nil && geo.GeoCompare(geoConvertedWKT, str2) { + return true + } + } + return ok1 && ok2 && str1 == str2 } @@ -267,6 +276,10 @@ func compareNumericArrays(value1, value2 interface{}) bool { return true } + if value1 == nil && value2 == "" { + return true + } + // Helper function to convert a value to float64 convertToFloat64 := func(val interface{}) []float64 { switch v := val.(type) { @@ -321,6 +334,11 @@ func compareArrayString(value1, value2 interface{}) bool { return true } + // nulls end up as empty 'variants' in snowflake + if value1 == nil && value2 == "" { + return true + } + array1, ok1 := value1.([]string) array2, ok2 := value2.([]string)