Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geospatial Support for BigQuery #963

Merged
merged 4 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, colName)
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`",
colName, bqType, colName)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
// json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added,
Expand Down
22 changes: 20 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ func (s *QRepAvroSyncMethod) SyncRecords(
return numRecords, nil
}

func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol string, softDeleteCol string) []string {
transformedColumns := make([]string, 0, len(dstTableMetadata.Schema))
for _, col := range dstTableMetadata.Schema {
if col.Name == syncedAtCol || col.Name == softDeleteCol {
continue
}
if col.Type == bigquery.GeographyFieldType {
transformedColumns = append(transformedColumns,
fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", col.Name, col.Name))
} else {
transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name))
}
}
return transformedColumns
}

func (s *QRepAvroSyncMethod) SyncQRepRecords(
flowJobName string,
dstTableName string,
Expand Down Expand Up @@ -151,7 +167,9 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client

selector := "*"
transformedColumns := getTransformedColumns(dstTableMetadata, syncedAtCol, softDeleteCol)
selector := strings.Join(transformedColumns, ", ")

if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
Expand Down Expand Up @@ -262,7 +280,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
}

switch bqField.Type {
case bigquery.StringFieldType:
case bigquery.StringFieldType, bigquery.GeographyFieldType:
return considerRepeated("string", bqField.Repeated), nil
case bigquery.BytesFieldType:
return "bytes", nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
return bigquery.IntegerFieldType
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64:
return bigquery.FloatFieldType
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
return bigquery.GeographyFieldType
// rest will be strings
default:
return bigquery.StringFieldType
Expand Down Expand Up @@ -76,7 +78,7 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind,
case bigquery.NumericFieldType:
return qvalue.QValueKindNumeric, nil
case bigquery.GeographyFieldType:
return qvalue.QValueKindString, nil
return qvalue.QValueKindGeography, nil
default:
return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType)
}
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ func GeoValidate(hexWkb string) (string, error) {
// UnmarshalWKB performs geometry validation along with WKB parsing
geometryObject, geoErr := geom.NewGeomFromWKB(wkb)
if geoErr != nil {
slog.Warn(fmt.Sprintf("Ignoring invalid geometry WKB %s: %v", hexWkb, geoErr))
return "", geoErr
}

Expand Down
8 changes: 6 additions & 2 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,15 @@ func (b *BigQueryTestHelper) RunCommand(command string) error {

// countRows(tableName) returns the number of rows in the given table.
func (b *BigQueryTestHelper) countRows(tableName string) (int, error) {
return b.countRowsWithDataset(b.datasetName, tableName)
return b.countRowsWithDataset(b.datasetName, tableName, "")
}

func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string) (int, error) {
func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) {
command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", dataset, tableName)
if nonNullCol != "" {
command = fmt.Sprintf("SELECT COUNT(CASE WHEN " + nonNullCol +
" IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "`.`" + tableName + "`;")
}
it, err := b.client.Query(command).Read(context.Background())
if err != nil {
return 0, fmt.Errorf("failed to run command: %w", err)
Expand Down
90 changes: 87 additions & 3 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,90 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_invalid_geo_bq_avro_cdc")
dstTableName := "test_invalid_geo_bq_avro_cdc"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
line GEOMETRY(LINESTRING) NOT NULL,
poly GEOGRAPHY(POLYGON) NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_invalid_geo_bq_avro_cdc"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert 10 rows into the source table
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 4 invalid shapes and 6 valid shapes into the source table
for i := 0; i < 4; i++ {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,poly) VALUES ($1,$2)
`, srcTableName), "010200000001000000000000000000F03F0000000000000040",
"0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+
"579dc2c9bf6a6ad95a5fc64140cd82767449c2c9bf9570fbf85ec641408a07944db9c2c9bf729a18a55ec6414021b8b748c7c2c9bfba46de4c"+
"5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+
"c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf",
)
require.NoError(s.t, err)
}
s.t.Log("Inserted 4 invalid geography rows into the source table")
for i := 4; i < 10; i++ {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,poly) VALUES ($1,$2)
`, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
"010300000001000000050000000000000000000000000000000000000000000000"+
"00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+
"00f03f000000000000000000000000000000000000000000000000")
require.NoError(s.t, err)
}
s.t.Log("Inserted 6 valid geography rows and 10 total rows into source")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

// We inserted 4 invalid shapes in each.
// They should have been filtered out as null on destination
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "line")
require.NoError(s.t, err)

Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "poly")
require.NoError(s.t, err)

require.Equal(s.t, 6, lineCount)
require.Equal(s.t, 6, polyCount)

env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down Expand Up @@ -900,7 +984,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
Expand Down Expand Up @@ -1120,7 +1204,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
Expand Down Expand Up @@ -1251,7 +1335,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {

count1, err := s.bqHelper.countRows(dstTable1Name)
require.NoError(s.t, err)
count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name)
count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name, "")
require.NoError(s.t, err)

s.Equal(1, count1)
Expand Down
15 changes: 2 additions & 13 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) {
schema := e2e.GetOwnersSchema()
err := s.bqHelper.CreateTable(dstTable, schema)

// fail if table creation fails
require.NoError(s.t, err)

s.t.Logf("created table on bigquery: %s.%s. %v", s.bqHelper.Config.DatasetId, dstTable, err)
}

func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) {
// read rows from source table
pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart")
Expand All @@ -52,9 +42,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {

numRows := 10

tblName := "test_qrep_flow_avro"
tblName := "test_qrep_flow_avro_bq"
s.setupSourceTable(tblName, numRows)
s.setupBQDestinationTable(tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.bqSuffix, tblName)
Expand All @@ -65,7 +54,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
query,
s.bqHelper.Peer,
"",
false,
true,
"")
require.NoError(s.t, err)
e2e.RunQrepFlowWorkflow(env, qrepConfig)
Expand Down
3 changes: 0 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
require.NoError(s.t, err)
s.Equal(6, polyCount)

// TODO: verify that the data is correctly synced to the destination table
// on the bigquery side

env.AssertExpectations(s.t)
}

Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err
"f7 jsonb",
"f8 smallint",
}
if strings.Contains(tableName, "sf") {
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
tblFields = append(tblFields, "geometry_point geometry(point)",
"geography_point geography(point)",
"geometry_linestring geometry(linestring)",
Expand Down Expand Up @@ -197,7 +197,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
id := uuid.New().String()
ids = append(ids, id)
geoValues := ""
if strings.Contains(tableName, "sf") {
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
geoValues = `,'POINT(1 2)','POINT(40.7128 -74.0060)',
'LINESTRING(0 0, 1 1, 2 2)',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)',
Expand All @@ -222,7 +222,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
}

geoColumns := ""
if strings.Contains(tableName, "sf") {
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
geoColumns = ",geometry_point, geography_point," +
"geometry_linestring, geography_linestring," +
"geometry_polygon, geography_polygon"
Expand Down
Loading