From ce7b9a5042352f1e21f5f1fb6b1b7ffb5b478424 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 24 Oct 2023 10:57:38 -0400 Subject: [PATCH 1/2] return WKT(s) as a result of GeoValidate --- dev-peerdb.sh | 0 flow/connectors/postgres/cdc.go | 15 ++++++++++----- flow/connectors/postgres/qrep_query_executor.go | 4 +++- flow/connectors/postgres/qvalue_convert.go | 16 ++++++++++------ run-peerdb.sh | 0 5 files changed, 23 insertions(+), 12 deletions(-) mode change 100644 => 100755 dev-peerdb.sh mode change 100644 => 100755 run-peerdb.sh diff --git a/dev-peerdb.sh b/dev-peerdb.sh old mode 100644 new mode 100755 diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 142f353146..9b9b07afb3 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -537,16 +537,21 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma typeName, ok := p.customTypeMapping[dataType] if ok { + var wkt string customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { - err := GeoValidate(string(data)) + wkt, err = GeoValidate(string(data)) if err != nil { - return &qvalue.QValue{Kind: customQKind, - Value: nil}, nil + return &qvalue.QValue{ + Kind: customQKind, + Value: nil, + }, nil } } - return &qvalue.QValue{Kind: customQKind, - Value: string(data)}, nil + return &qvalue.QValue{ + Kind: customQKind, + Value: wkt, + }, nil } return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 42ed6a1ada..8c83beeaa5 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -436,9 +436,11 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { wkbString, ok := values[i].(string) - err := GeoValidate(wkbString) + wkt, err := GeoValidate(wkbString) if err != nil || !ok { values[i] = nil + } else { + values[i] = wkt } } customTypeVal := qvalue.QValue{ diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index cc35db7cad..2d0f5b2c2c 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -407,24 +407,28 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { return qValueKind } -func GeoValidate(hexWkb string) error { - log.Infof("Validating geometry shape %s", hexWkb) +// returns the WKT representation of the geometry object if it is valid +func GeoValidate(hexWkb string) (string, error) { // Decode the WKB hex string into binary wkb, hexErr := hex.DecodeString(hexWkb) if hexErr != nil { log.Warnf("Ignoring invalid WKB: %s", hexWkb) - return hexErr + return "", hexErr } + // UnmarshalWKB performs geometry validation along with WKB parsing geometryObject, geoErr := geom.NewGeomFromWKB(wkb) if geoErr != nil { log.Warnf("Ignoring invalid geometry WKB %s: %v", hexWkb, geoErr) - return geoErr + return "", geoErr } + invalidReason := geometryObject.IsValidReason() if invalidReason != "Valid Geometry" { log.Warnf("Ignoring invalid geometry shape %s: %s", hexWkb, invalidReason) - return errors.New(invalidReason) + return "", errors.New(invalidReason) } - return nil + + wkt := geometryObject.ToWKT() + return wkt, nil } diff --git a/run-peerdb.sh b/run-peerdb.sh old mode 100644 new mode 100755 From 4a3e3dabd588b74fdf4d536f4ebddf870655554a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 24 Oct 2023 11:00:06 -0400 Subject: [PATCH 2/2] fix conditional flow --- flow/connectors/postgres/cdc.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 9b9b07afb3..b90e9d779b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -537,21 +537,26 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma typeName, ok := p.customTypeMapping[dataType] if ok { - var wkt string customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { - wkt, err = GeoValidate(string(data)) + wkt, err := GeoValidate(string(data)) if err != nil { return &qvalue.QValue{ Kind: customQKind, Value: nil, }, nil + } else { + return &qvalue.QValue{ + Kind: customQKind, + Value: wkt, + }, nil } + } else { + return &qvalue.QValue{ + Kind: customQKind, + Value: string(data), + }, nil } - return &qvalue.QValue{ - Kind: customQKind, - Value: wkt, - }, nil } return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil