diff --git a/dev-peerdb.sh b/dev-peerdb.sh old mode 100644 new mode 100755 diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 142f353146..b90e9d779b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -539,14 +539,24 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma if ok { customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { - err := GeoValidate(string(data)) + wkt, err := GeoValidate(string(data)) if err != nil { - return &qvalue.QValue{Kind: customQKind, - Value: nil}, nil + return &qvalue.QValue{ + Kind: customQKind, + Value: nil, + }, nil + } else { + return &qvalue.QValue{ + Kind: customQKind, + Value: wkt, + }, nil } + } else { + return &qvalue.QValue{ + Kind: customQKind, + Value: string(data), + }, nil } - return &qvalue.QValue{Kind: customQKind, - Value: string(data)}, nil } return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 42ed6a1ada..8c83beeaa5 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -436,9 +436,11 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { wkbString, ok := values[i].(string) - err := GeoValidate(wkbString) + wkt, err := GeoValidate(wkbString) if err != nil || !ok { values[i] = nil + } else { + values[i] = wkt } } customTypeVal := qvalue.QValue{ diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index cc35db7cad..2d0f5b2c2c 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -407,24 +407,28 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { return qValueKind } -func GeoValidate(hexWkb string) error { - log.Infof("Validating geometry shape %s", hexWkb) +// returns the WKT representation of the geometry object if it is valid +func GeoValidate(hexWkb string) (string, error) { // Decode the WKB hex string into binary wkb, hexErr := hex.DecodeString(hexWkb) if hexErr != nil { log.Warnf("Ignoring invalid WKB: %s", hexWkb) - return hexErr + return "", hexErr } + // UnmarshalWKB performs geometry validation along with WKB parsing geometryObject, geoErr := geom.NewGeomFromWKB(wkb) if geoErr != nil { log.Warnf("Ignoring invalid geometry WKB %s: %v", hexWkb, geoErr) - return geoErr + return "", geoErr } + invalidReason := geometryObject.IsValidReason() if invalidReason != "Valid Geometry" { log.Warnf("Ignoring invalid geometry shape %s: %s", hexWkb, invalidReason) - return errors.New(invalidReason) + return "", errors.New(invalidReason) } - return nil + + wkt := geometryObject.ToWKT() + return wkt, nil } diff --git a/run-peerdb.sh b/run-peerdb.sh old mode 100644 new mode 100755