Skip to content

Commit

Permalink
set srid for geometry
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 20, 2024
1 parent 966e4a5 commit 30411d0
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 3 deletions.
12 changes: 12 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ func (g *GenericSQLQueryExecutor) CountNonNullRows(
return count.Int64, err
}

func (g *GenericSQLQueryExecutor) CountSRIDs(
ctx context.Context,
schemaName string,
tableName string,
columnName string,
) (int64, error) {
var count pgtype.Int8
err := g.db.QueryRowxContext(ctx, "SELECT COUNT(CASE WHEN "+columnName+
" <> 0 THEN 1 END) AS not_zero FROM "+schemaName+"."+tableName).Scan(&count)
return count.Int64, err
}

func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (model.QField, error) {
qvKind, ok := g.dbtypeToQValueKind[ct.DatabaseTypeName()]
if !ok {
Expand Down
17 changes: 14 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
for range 6 {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,poly) VALUES ($1,$2)
`, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
`, srcTableName), "SRID=5678;010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
"010300000001000000050000000000000000000000000000000000000000000000"+
"00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+
"00f03f000000000000000000000000000000000000000000000000")
Expand All @@ -143,6 +143,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
return false
}

// Make sure SRIDs are set
sridCount, err := s.sfHelper.CountSRIDs("test_invalid_geo_sf_avro_cdc", "line")
if err != nil {
return false
}

polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly")
if err != nil {
return false
Expand All @@ -151,9 +157,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
if lineCount != 6 || polyCount != 6 {
s.t.Logf("wrong counts, expect 6 lines 6 polies, not %d lines %d polies", lineCount, polyCount)
return false
} else {
return true
}

if sridCount != 6 {
s.t.Logf("there are some srids that are 0, expected 6 non-zero srids, got %d non-zero srids", sridCount)
return false
}

return true
})
env.Cancel()

Expand Down
9 changes: 9 additions & 0 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ func (s *SnowflakeTestHelper) CountNonNullRows(tableName string, columnName stri
return int(res), nil
}

func (s *SnowflakeTestHelper) CountSRIDs(tableName string, columnName string) (int, error) {
res, err := s.testClient.CountSRIDs(context.Background(), s.testSchemaName, tableName, columnName)
if err != nil {
return 0, err
}

return int(res), nil
}

func (s *SnowflakeTestHelper) CheckNull(tableName string, colNames []string) (bool, error) {
return s.testClient.CheckNull(context.Background(), s.testSchemaName, tableName, colNames)
}
Expand Down
1 change: 1 addition & 0 deletions flow/geo/geo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func GeoValidate(hexWkb string) (string, error) {
}

wkt := geometryObject.ToWKT()
wkt = fmt.Sprintf("SRID=%d;%s", geometryObject.SRID(), wkt)
return wkt, nil
}

Expand Down

0 comments on commit 30411d0

Please sign in to comment.