From 8634f4f5b3ea83774b61526eb37820f537d0eb95 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 3 Jan 2024 02:28:42 +0530 Subject: [PATCH 1/4] implements geography datatype support for bigquery --- .../bigquery/merge_statement_generator.go | 3 + flow/connectors/bigquery/qrep_avro_sync.go | 21 ++++- flow/connectors/bigquery/qvalue_convert.go | 4 +- flow/connectors/postgres/qvalue_convert.go | 1 - flow/e2e/bigquery/bigquery_helper.go | 8 +- flow/e2e/bigquery/peer_flow_bq_test.go | 88 ++++++++++++++++++- flow/e2e/bigquery/qrep_flow_bq_test.go | 15 +--- flow/e2e/test_utils.go | 6 +- 8 files changed, 123 insertions(+), 23 deletions(-) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 2648b3ea6a..204572d605 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -56,6 +56,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", bqType, colName, colName) + case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: + castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`", + colName, bqType, colName) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} // json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added, diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 506be083c8..61b6318ef3 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -110,6 +110,21 @@ func (s *QRepAvroSyncMethod) SyncRecords( return numRecords, nil } +func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol string, softDeleteCol string) []string { + transformedColumns := make([]string, 0, len(dstTableMetadata.Schema)) + for _, col := range dstTableMetadata.Schema { + if col.Name == syncedAtCol || col.Name == softDeleteCol { + continue + } + if col.Type == bigquery.GeographyFieldType { + transformedColumns = append(transformedColumns, fmt.Sprintf("ST_GEOGFROMTEXT(%s) AS `%s`", col.Name, col.Name)) + } else { + transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name)) + } + } + return transformedColumns +} + func (s *QRepAvroSyncMethod) SyncQRepRecords( flowJobName string, dstTableName string, @@ -151,7 +166,9 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client - selector := "*" + transformedColumns := getTransformedColumns(dstTableMetadata, syncedAtCol, softDeleteCol) + selector := strings.Join(transformedColumns, ", ") + if softDeleteCol != "" { // PeerDB column selector += ", FALSE" } @@ -262,7 +279,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { } switch bqField.Type { - case bigquery.StringFieldType: + case bigquery.StringFieldType, bigquery.GeographyFieldType: return considerRepeated("string", bqField.Repeated), nil case bigquery.BytesFieldType: return "bytes", nil diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index 727a6b3f88..16ffaff0b1 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -46,6 +46,8 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType { return bigquery.IntegerFieldType case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64: return bigquery.FloatFieldType + case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: + return bigquery.GeographyFieldType // rest will be strings default: return bigquery.StringFieldType @@ -76,7 +78,7 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind, case bigquery.NumericFieldType: return qvalue.QValueKindNumeric, nil case bigquery.GeographyFieldType: - return qvalue.QValueKindString, nil + return qvalue.QValueKindGeography, nil default: return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType) } diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 55a29362f8..aafa00073e 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -420,7 +420,6 @@ func GeoValidate(hexWkb string) (string, error) { // UnmarshalWKB performs geometry validation along with WKB parsing geometryObject, geoErr := geom.NewGeomFromWKB(wkb) if geoErr != nil { - slog.Warn(fmt.Sprintf("Ignoring invalid geometry WKB %s: %v", hexWkb, geoErr)) return "", geoErr } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index ecc2cce1e8..0e7b4a7b47 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -168,11 +168,15 @@ func (b *BigQueryTestHelper) RunCommand(command string) error { // countRows(tableName) returns the number of rows in the given table. func (b *BigQueryTestHelper) countRows(tableName string) (int, error) { - return b.countRowsWithDataset(b.datasetName, tableName) + return b.countRowsWithDataset(b.datasetName, tableName, "") } -func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string) (int, error) { +func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) { command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", dataset, tableName) + if nonNullCol != "" { + command = fmt.Sprintf("SELECT COUNT(CASE WHEN " + nonNullCol + + " IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "`.`" + tableName + "`;") + } it, err := b.client.Query(command).Read(context.Background()) if err != nil { return 0, fmt.Errorf("failed to run command: %w", err) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index cc0352fae3..28ca883132 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -747,6 +747,92 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.AssertExpectations(s.t) } +func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + srcTableName := s.attachSchemaSuffix("test_invalid_geo_bq_avro_cdc") + dstTableName := "test_invalid_geo_bq_avro_cdc" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + line GEOMETRY(LINESTRING) NOT NULL, + poly GEOGRAPHY(POLYGON) NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_invalid_geo_bq_avro_cdc"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + CdcStagingPath: "", + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 10, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert 10 rows into the source table + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 4 invalid shapes and 6 valid shapes into the source table + for i := 0; i < 4; i++ { + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (line,poly) VALUES ($1,$2) + `, srcTableName), "010200000001000000000000000000F03F0000000000000040", + "0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+ + "579dc2c9bf6a6ad95a5fc64140cd82767449c2c9bf9570fbf85ec641408a07944db9c2c9bf729a18a55ec6414021b8b748c7c2c9bfba46de4c"+ + "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ + "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", + ) + require.NoError(s.t, err) + } + s.t.Log("Inserted 4 invalid geography rows into the source table") + for i := 4; i < 10; i++ { + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (line,poly) VALUES ($1,$2) + `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", + "010300000001000000050000000000000000000000000000000000000000000000"+ + "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ + "00f03f000000000000000000000000000000000000000000000000") + require.NoError(s.t, err) + } + s.t.Log("Inserted 6 valid geography rows and 10 total rows into source") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + require.Contains(s.t, err.Error(), "continue as new") + + // We inserted 4 invalid shapes in each. + // They should have been filtered out as null on destination + lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "line") + require.NoError(s.t, err) + s.Equal(6, lineCount) + + polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "poly") + require.NoError(s.t, err) + s.Equal(6, polyCount) + + // TODO: verify that the data is correctly synced to the destination table + // on the bigquery side + + env.AssertExpectations(s.t) +} + func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) @@ -1251,7 +1337,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { count1, err := s.bqHelper.countRows(dstTable1Name) require.NoError(s.t, err) - count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) + count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name, "") require.NoError(s.t, err) s.Equal(1, count1) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index def151071c..bd153b06e6 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -16,16 +16,6 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) require.NoError(s.t, err) } -func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { - schema := e2e.GetOwnersSchema() - err := s.bqHelper.CreateTable(dstTable, schema) - - // fail if table creation fails - require.NoError(s.t, err) - - s.t.Logf("created table on bigquery: %s.%s. %v", s.bqHelper.Config.DatasetId, dstTable, err) -} - func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") @@ -52,9 +42,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { numRows := 10 - tblName := "test_qrep_flow_avro" + tblName := "test_qrep_flow_avro_bq" s.setupSourceTable(tblName, numRows) - s.setupBQDestinationTable(tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) @@ -65,7 +54,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { query, s.bqHelper.Peer, "", - false, + true, "") require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4674667b97..72b7e45461 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -155,7 +155,7 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err "f7 jsonb", "f8 smallint", } - if strings.Contains(tableName, "sf") { + if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { tblFields = append(tblFields, "geometry_point geometry(point)", "geography_point geography(point)", "geometry_linestring geometry(linestring)", @@ -197,7 +197,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro id := uuid.New().String() ids = append(ids, id) geoValues := "" - if strings.Contains(tableName, "sf") { + if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { geoValues = `,'POINT(1 2)','POINT(40.7128 -74.0060)', 'LINESTRING(0 0, 1 1, 2 2)', 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)', @@ -222,7 +222,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro } geoColumns := "" - if strings.Contains(tableName, "sf") { + if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { geoColumns = ",geometry_point, geography_point," + "geometry_linestring, geography_linestring," + "geometry_polygon, geography_polygon" From 22f8ce531cd936b452a162448e371c5926f33632 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 3 Jan 2024 02:52:53 +0530 Subject: [PATCH 2/4] lint --- flow/e2e/bigquery/peer_flow_bq_test.go | 12 +++++------- flow/e2e/snowflake/peer_flow_sf_test.go | 3 --- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 28ca883132..ae3bb1303f 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -811,7 +811,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -821,14 +821,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { // They should have been filtered out as null on destination lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "line") require.NoError(s.t, err) - s.Equal(6, lineCount) polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "poly") require.NoError(s.t, err) - s.Equal(6, polyCount) - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side + require.Equal(s.t, 6, lineCount) + require.Equal(s.t, 6, polyCount) env.AssertExpectations(s.t) } @@ -986,7 +984,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1206,7 +1204,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 4520f929eb..fdd90af546 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -329,9 +329,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { require.NoError(s.t, err) s.Equal(6, polyCount) - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side - env.AssertExpectations(s.t) } From 25f8a9e8c494d516a9f4bd09b1d388076f736059 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 3 Jan 2024 03:10:01 +0530 Subject: [PATCH 3/4] add quote --- flow/connectors/bigquery/qrep_avro_sync.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 61b6318ef3..df30ee2c6b 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -117,7 +117,8 @@ func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol continue } if col.Type == bigquery.GeographyFieldType { - transformedColumns = append(transformedColumns, fmt.Sprintf("ST_GEOGFROMTEXT(%s) AS `%s`", col.Name, col.Name)) + transformedColumns = append(transformedColumns, + fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", col.Name, col.Name)) } else { transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name)) } From d749664079668392f6f690e52e9432ee6ba57875 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 3 Jan 2024 03:22:37 +0530 Subject: [PATCH 4/4] test:mixed case geo col --- flow/e2e/bigquery/bigquery_helper.go | 2 +- flow/e2e/bigquery/peer_flow_bq_test.go | 8 ++++---- flow/e2e/test_utils.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 0e7b4a7b47..c1487e01f6 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -175,7 +175,7 @@ func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, non command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", dataset, tableName) if nonNullCol != "" { command = fmt.Sprintf("SELECT COUNT(CASE WHEN " + nonNullCol + - " IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "`.`" + tableName + "`;") + " IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "." + tableName + "`;") } it, err := b.client.Query(command).Read(context.Background()) if err != nil { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index ae3bb1303f..b5d674e9ae 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -758,7 +758,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, line GEOMETRY(LINESTRING) NOT NULL, - poly GEOGRAPHY(POLYGON) NOT NULL + "polyPoly" GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) require.NoError(s.t, err) @@ -786,7 +786,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (line,poly) VALUES ($1,$2) + INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) `, srcTableName), "010200000001000000000000000000F03F0000000000000040", "0103000020e6100000010000000c0000001a8361d35dc64140afdb8d2b1bc3c9bf1b8ed4685fc641405ba64c"+ "579dc2c9bf6a6ad95a5fc64140cd82767449c2c9bf9570fbf85ec641408a07944db9c2c9bf729a18a55ec6414021b8b748c7c2c9bfba46de4c"+ @@ -798,7 +798,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { s.t.Log("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (line,poly) VALUES ($1,$2) + INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ @@ -822,7 +822,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "line") require.NoError(s.t, err) - polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "poly") + polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "`polyPoly`") require.NoError(s.t, err) require.Equal(s.t, 6, lineCount) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 72b7e45461..795d191ab6 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -156,7 +156,7 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err "f8 smallint", } if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { - tblFields = append(tblFields, "geometry_point geometry(point)", + tblFields = append(tblFields, `"geometryPoint" geometry(point)`, "geography_point geography(point)", "geometry_linestring geometry(linestring)", "geography_linestring geography(linestring)", @@ -223,7 +223,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro geoColumns := "" if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { - geoColumns = ",geometry_point, geography_point," + + geoColumns = `,"geometryPoint", geography_point,` + "geometry_linestring, geography_linestring," + "geometry_polygon, geography_polygon" }