diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 2648b3ea6a..204572d605 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -56,6 +56,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", bqType, colName, colName) + case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: + castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`", + colName, bqType, colName) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} // json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added, diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 506be083c8..df30ee2c6b 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -110,6 +110,22 @@ func (s *QRepAvroSyncMethod) SyncRecords( return numRecords, nil } +func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol string, softDeleteCol string) []string { + transformedColumns := make([]string, 0, len(dstTableMetadata.Schema)) + for _, col := range dstTableMetadata.Schema { + if col.Name == syncedAtCol || col.Name == softDeleteCol { + continue + } + if col.Type == bigquery.GeographyFieldType { + transformedColumns = append(transformedColumns, + fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", col.Name, col.Name)) + } else { + transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name)) + } + } + return transformedColumns +} + func (s *QRepAvroSyncMethod) SyncQRepRecords( flowJobName string, dstTableName string, @@ -151,7 +167,9 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client - selector := "*" + transformedColumns := getTransformedColumns(dstTableMetadata, syncedAtCol, softDeleteCol) + selector := strings.Join(transformedColumns, ", ") + if softDeleteCol != "" { // PeerDB column selector += ", FALSE" } @@ -262,7 +280,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { } switch bqField.Type { - case bigquery.StringFieldType: + case bigquery.StringFieldType, bigquery.GeographyFieldType: return considerRepeated("string", bqField.Repeated), nil case bigquery.BytesFieldType: return "bytes", nil diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index 727a6b3f88..16ffaff0b1 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -46,6 +46,8 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType { return bigquery.IntegerFieldType case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64: return bigquery.FloatFieldType + case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: + return bigquery.GeographyFieldType // rest will be strings default: return bigquery.StringFieldType @@ -76,7 +78,7 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind, case bigquery.NumericFieldType: return qvalue.QValueKindNumeric, nil case bigquery.GeographyFieldType: - return qvalue.QValueKindString, nil + return qvalue.QValueKindGeography, nil default: return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType) } diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 55a29362f8..aafa00073e 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -420,7 +420,6 @@ func GeoValidate(hexWkb string) (string, error) { // UnmarshalWKB performs geometry validation along with WKB parsing geometryObject, geoErr := geom.NewGeomFromWKB(wkb) if geoErr != nil { - slog.Warn(fmt.Sprintf("Ignoring invalid geometry WKB %s: %v", hexWkb, geoErr)) return "", geoErr } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index ecc2cce1e8..c1487e01f6 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -168,11 +168,15 @@ func (b *BigQueryTestHelper) RunCommand(command string) error { // countRows(tableName) returns the number of rows in the given table. func (b *BigQueryTestHelper) countRows(tableName string) (int, error) { - return b.countRowsWithDataset(b.datasetName, tableName) + return b.countRowsWithDataset(b.datasetName, tableName, "") } -func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string) (int, error) { +func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) { command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", dataset, tableName) + if nonNullCol != "" { + command = fmt.Sprintf("SELECT COUNT(CASE WHEN " + nonNullCol + + " IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "." + tableName + "`;") + } it, err := b.client.Query(command).Read(context.Background()) if err != nil { return 0, fmt.Errorf("failed to run command: %w", err) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index cc0352fae3..b5d674e9ae 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -747,6 +747,90 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.AssertExpectations(s.t) } +func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + srcTableName := s.attachSchemaSuffix("test_invalid_geo_bq_avro_cdc") + dstTableName := "test_invalid_geo_bq_avro_cdc" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + line GEOMETRY(LINESTRING) NOT NULL, + "polyPoly" GEOGRAPHY(POLYGON) NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_invalid_geo_bq_avro_cdc"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + CdcStagingPath: "", + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 10, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert 10 rows into the source table + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 4 invalid shapes and 6 valid shapes into the source table + for i := 0; i < 4; i++ { + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) + `, srcTableName), "010200000001000000000000000000F03F0000000000000040", + "0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+ + "579dc2c9bf6a6ad95a5fc64140cd82767449c2c9bf9570fbf85ec641408a07944db9c2c9bf729a18a55ec6414021b8b748c7c2c9bfba46de4c"+ + "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ + "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", + ) + require.NoError(s.t, err) + } + s.t.Log("Inserted 4 invalid geography rows into the source table") + for i := 4; i < 10; i++ { + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) + `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", + "010300000001000000050000000000000000000000000000000000000000000000"+ + "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ + "00f03f000000000000000000000000000000000000000000000000") + require.NoError(s.t, err) + } + s.t.Log("Inserted 6 valid geography rows and 10 total rows into source") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + require.True(s.t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + require.Contains(s.t, err.Error(), "continue as new") + + // We inserted 4 invalid shapes in each. + // They should have been filtered out as null on destination + lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "line") + require.NoError(s.t, err) + + polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "`polyPoly`") + require.NoError(s.t, err) + + require.Equal(s.t, 6, lineCount) + require.Equal(s.t, 6, polyCount) + + env.AssertExpectations(s.t) +} + func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) @@ -900,7 +984,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1120,7 +1204,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1251,7 +1335,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { count1, err := s.bqHelper.countRows(dstTable1Name) require.NoError(s.t, err) - count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) + count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name, "") require.NoError(s.t, err) s.Equal(1, count1) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index def151071c..bd153b06e6 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -16,16 +16,6 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) require.NoError(s.t, err) } -func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { - schema := e2e.GetOwnersSchema() - err := s.bqHelper.CreateTable(dstTable, schema) - - // fail if table creation fails - require.NoError(s.t, err) - - s.t.Logf("created table on bigquery: %s.%s. %v", s.bqHelper.Config.DatasetId, dstTable, err) -} - func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") @@ -52,9 +42,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { numRows := 10 - tblName := "test_qrep_flow_avro" + tblName := "test_qrep_flow_avro_bq" s.setupSourceTable(tblName, numRows) - s.setupBQDestinationTable(tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) @@ -65,7 +54,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { query, s.bqHelper.Peer, "", - false, + true, "") require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 4520f929eb..fdd90af546 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -329,9 +329,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { require.NoError(s.t, err) s.Equal(6, polyCount) - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side - env.AssertExpectations(s.t) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4674667b97..795d191ab6 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -155,8 +155,8 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err "f7 jsonb", "f8 smallint", } - if strings.Contains(tableName, "sf") { - tblFields = append(tblFields, "geometry_point geometry(point)", + if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { + tblFields = append(tblFields, `"geometryPoint" geometry(point)`, "geography_point geography(point)", "geometry_linestring geometry(linestring)", "geography_linestring geography(linestring)", @@ -197,7 +197,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro id := uuid.New().String() ids = append(ids, id) geoValues := "" - if strings.Contains(tableName, "sf") { + if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { geoValues = `,'POINT(1 2)','POINT(40.7128 -74.0060)', 'LINESTRING(0 0, 1 1, 2 2)', 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)', @@ -222,8 +222,8 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro } geoColumns := "" - if strings.Contains(tableName, "sf") { - geoColumns = ",geometry_point, geography_point," + + if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { + geoColumns = `,"geometryPoint", geography_point,` + "geometry_linestring, geography_linestring," + "geometry_polygon, geography_polygon" }