diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index eb8ebb6177..cca68cdda7 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -2,6 +2,7 @@ package connbigquery import ( "fmt" + "slices" "strings" "cloud.google.com/go/bigquery" @@ -100,6 +101,22 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { m.syncBatchID, m.dstTableName) } +func (m *mergeStmtGenerator) transformedPkeyStrings() []string { + pkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) + for _, col := range m.normalizedTableSchema.Columns { + if slices.Contains(m.normalizedTableSchema.PrimaryKeyColumns, col.Name) { + pkeyCol := col.Name + switch qvalue.QValueKind(col.Type) { + case qvalue.QValueKindJSON: + pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol])) + default: + pkeys = append(pkeys, m.shortColumn[pkeyCol]) + } + } + } + return pkeys +} + // generateDeDupedCTE generates a de-duped CTE. func (m *mergeStmtGenerator) generateDeDupedCTE() string { const cte = `_dd AS ( @@ -111,11 +128,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { WHERE _peerdb_rank=1 ) SELECT * FROM _dd` - shortPkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) - for _, pkeyCol := range m.normalizedTableSchema.PrimaryKeyColumns { - shortPkeys = append(shortPkeys, m.shortColumn[pkeyCol]) - } - + shortPkeys := m.transformedPkeyStrings() pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys, ", '_peerdb_concat_', ")) return fmt.Sprintf(cte, pkeyColsStr) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 9d0b8f6a57..74a1949ff7 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1514,3 +1514,56 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { require.NoError(s.t, err) require.Equal(s.t, int64(0), numNewRows) } + +func (s PeerFlowE2ETestSuiteBQ) Test_JSON_PKey_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + + srcTableName := s.attachSchemaSuffix("test_json_pkey_bq") + dstTableName := "test_json_pkey_bq" + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL NOT NULL, + j JSON NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + ALTER TABLE %s REPLICA IDENTITY FULL + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_json_pkey_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.bqHelper.Peer, + CdcStagingPath: "", + } + + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig.MaxBatchSize = 100 + + go func() { + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + // insert 10 rows into the source table + for i := range 10 { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + testJson := `'{"name":"jack", "age":12, "spouse":null}'::json` + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value, j) VALUES ($1, $2, %s) + `, srcTableName, testJson), testKey, testValue) + e2e.EnvNoError(s.t, env, err) + } + s.t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTables(env, s, "normalize inserts", dstTableName, "id,key,value,j") + env.CancelWorkflow() + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.RequireEnvCanceled(s.t, env) +}