From 028d4d2f64b77136e4c072d628f2d749591ec87d Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 9 Jan 2024 13:05:37 +0530 Subject: [PATCH] handle null in merge,add tests --- .../bigquery/merge_statement_generator.go | 2 +- flow/e2e/bigquery/bigquery_helper.go | 29 ++++++++++ flow/e2e/bigquery/peer_flow_bq_test.go | 55 ++++++++++++++++++- flow/model/model.go | 51 +++++++++++++++++ 4 files changed, 135 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 3582f5a2f6..01affcddfd 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -58,7 +58,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ - "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", + "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element WHERE element IS NOT null) AS `%s`", bqType, colName, shortCol) case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`", diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index c1487e01f6..5851bc98f6 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -396,6 +396,35 @@ func (b *BigQueryTestHelper) CheckNull(tableName string, ColName []string) (bool } } +// check if NaN, Inf double values are null +func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, ColName []string) (bool, error) { + csep := strings.Join(ColName, ",") + command := fmt.Sprintf("SELECT %s FROM `%s.%s`", + csep, b.Config.DatasetId, tableName) + it, err := b.client.Query(command).Read(context.Background()) + if err != nil { + return false, fmt.Errorf("failed to run command: %w", err) + } + + var row []bigquery.Value + for { + err := it.Next(&row) + if err == iterator.Done { + break + } + if err != nil { + return false, fmt.Errorf("failed to iterate over query results: %w", err) + } + } + + floatArr, _ := row[1].([]float64) + if row[0] != nil || len(floatArr) > 0 { + return false, nil + } + + return true, nil +} + func qValueKindToBqColTypeString(val qvalue.QValueKind) (string, error) { switch val { case qvalue.QValueKindInt16: diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 9b80f51ccf..c73e2f9a62 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -744,7 +744,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { txid_current_snapshot(), '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), ARRAY[10299301,2579827], - ARRAY[0.0003, 8902.0092], + ARRAY[0.0003, 8902.0092, 'NaN'], ARRAY['hello','bye'],'happy'; `, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -776,6 +776,59 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { require.NoError(s.t, err) } +func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + srcTableName := s.attachSchemaSuffix("test_nans_bq") + dstTableName := "test_nans_bq" + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 double precision,c2 double precision[]); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_nans_bq"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + CdcStagingPath: "", + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 1, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and execute a transaction touching toast columns + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + /* test inserting various types*/ + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s SELECT 2, 'NaN'::double precision, '{NaN, Infinity, -Infinity}'; + `, srcTableName)) + e2e.EnvNoError(s.t, env, err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + require.True(s.t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + require.Contains(s.t, err.Error(), "continue as new") + + // check if JSON on bigquery side is a good JSON + good, err := s.bqHelper.CheckDoubleValues(dstTableName, []string{"c1", "c2"}) + require.NoError(s.t, err) + require.True(s.t, good) +} + func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) diff --git a/flow/model/model.go b/flow/model/model.go index 10c04d599c..15846b6d74 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/big" "slices" "sync/atomic" @@ -167,6 +168,56 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) { return nil, errors.New("expected *big.Rat value") } jsonStruct[col] = bigRat.FloatString(9) + case qvalue.QValueKindFloat64: + floatVal, ok := v.Value.(float64) + if !ok { + return nil, errors.New("expected float64 value") + } + if math.IsNaN(floatVal) || math.IsInf(floatVal, 0) { + jsonStruct[col] = nil + } else { + jsonStruct[col] = floatVal + } + case qvalue.QValueKindFloat32: + floatVal, ok := v.Value.(float32) + if !ok { + return nil, errors.New("expected float32 value") + } + if math.IsNaN(float64(floatVal)) || math.IsInf(float64(floatVal), 0) { + jsonStruct[col] = nil + } else { + jsonStruct[col] = floatVal + } + case qvalue.QValueKindArrayFloat64: + floatArr, ok := v.Value.([]float64) + if !ok { + return nil, errors.New("expected []float64 value") + } + + nullableFloatArr := make([]interface{}, 0, len(floatArr)) + for _, val := range floatArr { + if math.IsNaN(val) || math.IsInf(val, 0) { + nullableFloatArr = append(nullableFloatArr, nil) + } else { + nullableFloatArr = append(nullableFloatArr, val) + } + } + jsonStruct[col] = nullableFloatArr + case qvalue.QValueKindArrayFloat32: + floatArr, ok := v.Value.([]float32) + if !ok { + return nil, errors.New("expected []float32 value") + } + nullableFloatArr := make([]interface{}, 0, len(floatArr)) + for _, val := range floatArr { + if math.IsNaN(float64(val)) || math.IsInf(float64(val), 0) { + nullableFloatArr = append(nullableFloatArr, nil) + } else { + nullableFloatArr = append(nullableFloatArr, val) + } + } + jsonStruct[col] = nullableFloatArr + default: jsonStruct[col] = v.Value }