diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index b109cc0e996..e952670f6ab 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -32,6 +32,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/json2" + "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -463,7 +465,7 @@ func validateDryRunResults(t *testing.T, output string, want []string) { w = strings.TrimSpace(w[1:]) result := strings.HasPrefix(g, w) match = result - //t.Logf("Partial match |%v|%v|%v\n", w, g, match) + // t.Logf("Partial match |%v|%v|%v\n", w, g, match) } else { match = g == w } @@ -831,3 +833,33 @@ func (lg *loadGenerator) waitForCount(want int64) { } } } + +// VExplainPlan is the struct that represents the json output of a vexplain query. +type VExplainPlan struct { + OperatorType string `json:"OperatorType"` + Variant string `json:"Variant"` + Keyspace VExplainKeyspace `json:"Keyspace"` + FieldQuery string `json:"FieldQuery"` + Query string `json:"Query"` + Table string `json:"TableName"` +} + +type VExplainKeyspace struct { + Name string `json:"Name"` + Sharded bool `json:"Sharded"` +} + +// vexplain runs vexplain on the given query and returns the plan. Useful for validating routing rules. +func vexplain(t *testing.T, database, query string) *VExplainPlan { + vtgateConn := vc.GetVTGateConn(t) + defer vtgateConn.Close() + + qr := execVtgateQuery(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query)) + require.NotNil(t, qr) + require.Equal(t, 1, len(qr.Rows)) + json := qr.Rows[0][0].ToString() + + var plan VExplainPlan + require.NoError(t, json2.Unmarshal([]byte(json), &plan)) + return &plan +} diff --git a/go/test/endtoend/vreplication/reference_test.go b/go/test/endtoend/vreplication/reference_test.go new file mode 100644 index 00000000000..40a8b1e39c3 --- /dev/null +++ b/go/test/endtoend/vreplication/reference_test.go @@ -0,0 +1,131 @@ +package vreplication + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + uksSchema = ` +create table product (id int, mfg_id int, cat_id int, name varchar(128), primary key(id)); +create table cat (id int, name varchar(128), primary key(id)); +create table mfg (id int, name varchar(128), primary key(id)); +` + sksSchema = ` +create table product (id int, mfg_id int, cat_id int, name varchar(128), primary key(id)); +create table cat (id int, name varchar(128), primary key(id)); +create table mfg2 (id int, name varchar(128), primary key(id)); +` + uksVSchema = ` +{ + "sharded": false, + "tables": { + "product": {}, + "cat": {}, + "mfg": {} + } +}` + + sksVSchema = ` +{ + "sharded": true, + "tables": { + "product": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + }, + "cat": { + "type": "reference", + "source": "uks.cat" + }, + "mfg2": { + "type": "reference", + "source": "uks.mfg" + } + }, + "vindexes": { + "hash": { + "type": "hash" + } + } +}` + materializeCatSpec = ` +{ + "workflow": "wfCat", + "source_keyspace": "uks", + "target_keyspace": "sks", + "table_settings": [ {"target_table": "cat", "source_expression": "select id, name from cat" }] +}` + materializeMfgSpec = ` +{ + "workflow": "wfMfg", + "source_keyspace": "uks", + "target_keyspace": "sks", + "table_settings": [ {"target_table": "mfg2", "source_expression": "select id, name from mfg" }] +}` + initializeTables = ` +use uks; +insert into product values (1, 1, 1, 'p1'); +insert into product values (2, 2, 2, 'p2'); +insert into product values (3, 3, 3, 'p3'); +insert into cat values (1, 'c1'); +insert into cat values (2, 'c2'); +insert into cat values (3, 'c3'); +insert into mfg values (1, 'm1'); +insert into mfg values (2, 'm2'); +insert into mfg values (3, 'm3'); +` +) + +func TestReferenceTableMaterializationAndRouting(t *testing.T) { + var err error + defaultCellName := "zone1" + allCells := []string{defaultCellName} + allCellNames = defaultCellName + vc = NewVitessCluster(t, "TestReferenceTableMaterializationAndRouting", allCells, mainClusterConfig) + defer vc.TearDown(t) + defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets + defer func() { defaultReplicas = 1 }() + uks := "uks" + sks := "sks" + + defaultCell := vc.Cells[defaultCellName] + vc.AddKeyspace(t, []*Cell{defaultCell}, uks, "0", uksVSchema, uksSchema, defaultReplicas, defaultRdonly, 100, nil) + vc.AddKeyspace(t, []*Cell{defaultCell}, sks, "-80,80-", sksVSchema, sksSchema, defaultReplicas, defaultRdonly, 200, nil) + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + verifyClusterHealth(t, vc) + _, _, err = vtgateConn.ExecuteFetchMulti(initializeTables, 0, false) + require.NoError(t, err) + materialize(t, materializeCatSpec, false) + materialize(t, materializeMfgSpec, false) + + tabDash80 := vc.getPrimaryTablet(t, sks, "-80") + tab80Dash := vc.getPrimaryTablet(t, sks, "80-") + catchup(t, tabDash80, "wfCat", "Materialize Category") + catchup(t, tab80Dash, "wfCat", "Materialize Category") + catchup(t, tabDash80, "wfMfg", "Materialize Manufacturer") + catchup(t, tab80Dash, "wfMfg", "Materialize Manufacturer") + + vtgateConn.Close() + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + waitForRowCount(t, vtgateConn, sks, "cat", 3) + waitForRowCount(t, vtgateConn, sks, "mfg2", 3) + + insertQuery := "insert into mfg values (4, 'm4')" + _, err = vtgateConn.ExecuteFetch(insertQuery, 0, false) + require.Contains(t, err.Error(), "table mfg not found") + + insertQuery = "insert into mfg2 values (4, 'm4')" + _, err = vtgateConn.ExecuteFetch(insertQuery, 0, false) + require.Contains(t, err.Error(), "Table 'vt_uks.mfg2' doesn't exist") + + insertQuery = "insert into uks.mfg values (4, 'm4')" + _, err = vtgateConn.ExecuteFetch(insertQuery, 0, false) + require.NoError(t, err) +}