diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index f06044ae6b..b2bc5144f1 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: // if the type is JSON, then just extract JSON - castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", colName, bqType, colName) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 7f7b1d5b83..ceb3b38402 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -122,7 +122,7 @@ func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", col.Name, col.Name)) case bigquery.JSONFieldType: transformedColumns = append(transformedColumns, - fmt.Sprintf("PARSE_JSON(`%s`) AS `%s`", col.Name, col.Name)) + fmt.Sprintf("PARSE_JSON(`%s`,wide_number_mode=>'round') AS `%s`", col.Name, col.Name)) case bigquery.DateFieldType: transformedColumns = append(transformedColumns, fmt.Sprintf("CAST(`%s` AS DATE) AS `%s`", col.Name, col.Name)) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 601172faa4..5da81a3e3e 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -743,7 +743,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { true,random_bytea(32),'s','test','1.1.10.2'::cidr, CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, - '{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, + '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), @@ -777,7 +777,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { require.True(s.t, noNulls) // check if JSON on bigquery side is a good JSON - err = s.checkJSONValue(dstTableName, "c17", "sai", "1") + err = s.checkJSONValue(dstTableName, "c17", "sai", "-8.021390374331551") require.NoError(s.t, err) env.AssertExpectations(s.t) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index a669691090..835e367f69 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -281,7 +281,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro 1.2345, false, 12345, '%s', 12345, 1, '%s', CURRENT_TIMESTAMP, 'refID', CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012], - ARRAY['varchar1', 'varchar2'], '{"key": 8.5}', + ARRAY['varchar1', 'varchar2'], '{"key": -8.02139037433155}', '[{"key1": "value1", "key2": "value2", "key3": "value3"}]', '{"key": "value"}', 15, CURRENT_DATE %s )`,