Skip to content

Commit

Permalink
return WKT(s) as a result of GeoValidate
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 24, 2023
1 parent 7b566b0 commit ce7b9a5
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
Empty file modified dev-peerdb.sh
100644 → 100755
Empty file.
15 changes: 10 additions & 5 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,16 +537,21 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma

typeName, ok := p.customTypeMapping[dataType]
if ok {
var wkt string
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
err := GeoValidate(string(data))
wkt, err = GeoValidate(string(data))
if err != nil {
return &qvalue.QValue{Kind: customQKind,
Value: nil}, nil
return &qvalue.QValue{
Kind: customQKind,
Value: nil,
}, nil
}
}
return &qvalue.QValue{Kind: customQKind,
Value: string(data)}, nil
return &qvalue.QValue{
Kind: customQKind,
Value: wkt,
}, nil
}

return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,11 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
wkbString, ok := values[i].(string)
err := GeoValidate(wkbString)
wkt, err := GeoValidate(wkbString)
if err != nil || !ok {
values[i] = nil
} else {
values[i] = wkt
}
}
customTypeVal := qvalue.QValue{
Expand Down
16 changes: 10 additions & 6 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,24 +407,28 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
return qValueKind
}

func GeoValidate(hexWkb string) error {
log.Infof("Validating geometry shape %s", hexWkb)
// returns the WKT representation of the geometry object if it is valid
func GeoValidate(hexWkb string) (string, error) {
// Decode the WKB hex string into binary
wkb, hexErr := hex.DecodeString(hexWkb)
if hexErr != nil {
log.Warnf("Ignoring invalid WKB: %s", hexWkb)
return hexErr
return "", hexErr
}

// UnmarshalWKB performs geometry validation along with WKB parsing
geometryObject, geoErr := geom.NewGeomFromWKB(wkb)
if geoErr != nil {
log.Warnf("Ignoring invalid geometry WKB %s: %v", hexWkb, geoErr)
return geoErr
return "", geoErr
}

invalidReason := geometryObject.IsValidReason()
if invalidReason != "Valid Geometry" {
log.Warnf("Ignoring invalid geometry shape %s: %s", hexWkb, invalidReason)
return errors.New(invalidReason)
return "", errors.New(invalidReason)
}
return nil

wkt := geometryObject.ToWKT()
return wkt, nil
}
Empty file modified run-peerdb.sh
100644 → 100755
Empty file.

0 comments on commit ce7b9a5

Please sign in to comment.