Skip to content

Commit

Permalink
Geospatial Support for BigQuery (#963)
Browse files Browse the repository at this point in the history
Similar to #516 

Leverages https://cloud.google.com/bigquery/docs/geospatial-data to
implement syncing of Postgres' POSTGIS types to BigQuery's GEOGRAPHY
data type - for both QRep, Initial Load and CDC.
In `qrep_avro_sync`, we now have a function where we can perform
transformations of the data on staging table before copying to
destination - a feature which was needed here, and makes it easier to
support datatypes in QRep for BQ

Tests added for QRep and CDC
  • Loading branch information
Amogh-Bharadwaj authored Jan 2, 2024
1 parent 9a6cf9f commit 7ade9aa
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 28 deletions.
3 changes: 3 additions & 0 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, colName)
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`",
colName, bqType, colName)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
// json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added,
Expand Down
22 changes: 20 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ func (s *QRepAvroSyncMethod) SyncRecords(
return numRecords, nil
}

func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol string, softDeleteCol string) []string {
transformedColumns := make([]string, 0, len(dstTableMetadata.Schema))
for _, col := range dstTableMetadata.Schema {
if col.Name == syncedAtCol || col.Name == softDeleteCol {
continue
}
if col.Type == bigquery.GeographyFieldType {
transformedColumns = append(transformedColumns,
fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", col.Name, col.Name))
} else {
transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name))
}
}
return transformedColumns
}

func (s *QRepAvroSyncMethod) SyncQRepRecords(
flowJobName string,
dstTableName string,
Expand Down Expand Up @@ -151,7 +167,9 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client

selector := "*"
transformedColumns := getTransformedColumns(dstTableMetadata, syncedAtCol, softDeleteCol)
selector := strings.Join(transformedColumns, ", ")

if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
Expand Down Expand Up @@ -262,7 +280,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
}

switch bqField.Type {
case bigquery.StringFieldType:
case bigquery.StringFieldType, bigquery.GeographyFieldType:
return considerRepeated("string", bqField.Repeated), nil
case bigquery.BytesFieldType:
return "bytes", nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
return bigquery.IntegerFieldType
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64:
return bigquery.FloatFieldType
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
return bigquery.GeographyFieldType
// rest will be strings
default:
return bigquery.StringFieldType
Expand Down Expand Up @@ -76,7 +78,7 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind,
case bigquery.NumericFieldType:
return qvalue.QValueKindNumeric, nil
case bigquery.GeographyFieldType:
return qvalue.QValueKindString, nil
return qvalue.QValueKindGeography, nil
default:
return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType)
}
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ func GeoValidate(hexWkb string) (string, error) {
// UnmarshalWKB performs geometry validation along with WKB parsing
geometryObject, geoErr := geom.NewGeomFromWKB(wkb)
if geoErr != nil {
slog.Warn(fmt.Sprintf("Ignoring invalid geometry WKB %s: %v", hexWkb, geoErr))
return "", geoErr
}

Expand Down
8 changes: 6 additions & 2 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,15 @@ func (b *BigQueryTestHelper) RunCommand(command string) error {

// countRows(tableName) returns the number of rows in the given table.
func (b *BigQueryTestHelper) countRows(tableName string) (int, error) {
return b.countRowsWithDataset(b.datasetName, tableName)
return b.countRowsWithDataset(b.datasetName, tableName, "")
}

func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string) (int, error) {
func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) {
command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", dataset, tableName)
if nonNullCol != "" {
command = fmt.Sprintf("SELECT COUNT(CASE WHEN " + nonNullCol +
" IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "." + tableName + "`;")
}
it, err := b.client.Query(command).Read(context.Background())
if err != nil {
return 0, fmt.Errorf("failed to run command: %w", err)
Expand Down
86 changes: 85 additions & 1 deletion flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,90 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_invalid_geo_bq_avro_cdc")
dstTableName := "test_invalid_geo_bq_avro_cdc"

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
line GEOMETRY(LINESTRING) NOT NULL,
"polyPoly" GEOGRAPHY(POLYGON) NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_invalid_geo_bq_avro_cdc"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert 10 rows into the source table
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 4 invalid shapes and 6 valid shapes into the source table
for i := 0; i < 4; i++ {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,"polyPoly") VALUES ($1,$2)
`, srcTableName), "010200000001000000000000000000F03F0000000000000040",
"0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+
"579dc2c9bf6a6ad95a5fc64140cd82767449c2c9bf9570fbf85ec641408a07944db9c2c9bf729a18a55ec6414021b8b748c7c2c9bfba46de4c"+
"5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+
"c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf",
)
require.NoError(s.t, err)
}
s.t.Log("Inserted 4 invalid geography rows into the source table")
for i := 4; i < 10; i++ {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,"polyPoly") VALUES ($1,$2)
`, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
"010300000001000000050000000000000000000000000000000000000000000000"+
"00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+
"00f03f000000000000000000000000000000000000000000000000")
require.NoError(s.t, err)
}
s.t.Log("Inserted 6 valid geography rows and 10 total rows into source")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

// We inserted 4 invalid shapes in each.
// They should have been filtered out as null on destination
lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "line")
require.NoError(s.t, err)

polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "`polyPoly`")
require.NoError(s.t, err)

require.Equal(s.t, 6, lineCount)
require.Equal(s.t, 6, polyCount)

env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down Expand Up @@ -1248,7 +1332,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {

count1, err := s.bqHelper.countRows(dstTable1Name)
require.NoError(s.t, err)
count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name)
count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name, "")
require.NoError(s.t, err)

require.Equal(s.t, 1, count1)
Expand Down
15 changes: 2 additions & 13 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) {
schema := e2e.GetOwnersSchema()
err := s.bqHelper.CreateTable(dstTable, schema)

// fail if table creation fails
require.NoError(s.t, err)

s.t.Logf("created table on bigquery: %s.%s. %v", s.bqHelper.Config.DatasetId, dstTable, err)
}

func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) {
// read rows from source table
pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart")
Expand All @@ -52,9 +42,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {

numRows := 10

tblName := "test_qrep_flow_avro"
tblName := "test_qrep_flow_avro_bq"
s.setupSourceTable(tblName, numRows)
s.setupBQDestinationTable(tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.bqSuffix, tblName)
Expand All @@ -65,7 +54,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
query,
s.bqHelper.Peer,
"",
false,
true,
"")
require.NoError(s.t, err)
e2e.RunQrepFlowWorkflow(env, qrepConfig)
Expand Down
3 changes: 0 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
require.NoError(s.t, err)
require.Equal(s.t, 6, polyCount)

// TODO: verify that the data is correctly synced to the destination table
// on the bigquery side

env.AssertExpectations(s.t)
}

Expand Down
10 changes: 5 additions & 5 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err
"f7 jsonb",
"f8 smallint",
}
if strings.Contains(tableName, "sf") {
tblFields = append(tblFields, "geometry_point geometry(point)",
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
tblFields = append(tblFields, `"geometryPoint" geometry(point)`,
"geography_point geography(point)",
"geometry_linestring geometry(linestring)",
"geography_linestring geography(linestring)",
Expand Down Expand Up @@ -197,7 +197,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
id := uuid.New().String()
ids = append(ids, id)
geoValues := ""
if strings.Contains(tableName, "sf") {
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
geoValues = `,'POINT(1 2)','POINT(40.7128 -74.0060)',
'LINESTRING(0 0, 1 1, 2 2)',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)',
Expand All @@ -222,8 +222,8 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
}

geoColumns := ""
if strings.Contains(tableName, "sf") {
geoColumns = ",geometry_point, geography_point," +
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
geoColumns = `,"geometryPoint", geography_point,` +
"geometry_linestring, geography_linestring," +
"geometry_polygon, geography_polygon"
}
Expand Down

0 comments on commit 7ade9aa

Please sign in to comment.