Skip to content

Commit

Permalink
Adding e2e test to repro error with reference table routing
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Apr 24, 2024
1 parent fd2ae75 commit d325cc1
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 1 deletion.
34 changes: 33 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/json2"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -463,7 +465,7 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
w = strings.TrimSpace(w[1:])
result := strings.HasPrefix(g, w)
match = result
//t.Logf("Partial match |%v|%v|%v\n", w, g, match)
// t.Logf("Partial match |%v|%v|%v\n", w, g, match)
} else {
match = g == w
}
Expand Down Expand Up @@ -831,3 +833,33 @@ func (lg *loadGenerator) waitForCount(want int64) {
}
}
}

// VExplainPlan is the struct that represents the json output of a vexplain query.
type VExplainPlan struct {
OperatorType string `json:"OperatorType"`
Variant string `json:"Variant"`
Keyspace VExplainKeyspace `json:"Keyspace"`
FieldQuery string `json:"FieldQuery"`
Query string `json:"Query"`
Table string `json:"TableName"`
}

type VExplainKeyspace struct {
Name string `json:"Name"`
Sharded bool `json:"Sharded"`
}

// vexplain runs vexplain on the given query and returns the plan. Useful for validating routing rules.
func vexplain(t *testing.T, database, query string) *VExplainPlan {
vtgateConn := vc.GetVTGateConn(t)
defer vtgateConn.Close()

qr := execVtgateQuery(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query))
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
json := qr.Rows[0][0].ToString()

var plan VExplainPlan
require.NoError(t, json2.Unmarshal([]byte(json), &plan))
return &plan
}
131 changes: 131 additions & 0 deletions go/test/endtoend/vreplication/reference_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package vreplication

import (
"testing"

"github.com/stretchr/testify/require"
)

const (
uksSchema = `
create table product (id int, mfg_id int, cat_id int, name varchar(128), primary key(id));
create table cat (id int, name varchar(128), primary key(id));
create table mfg (id int, name varchar(128), primary key(id));
`
sksSchema = `
create table product (id int, mfg_id int, cat_id int, name varchar(128), primary key(id));
create table cat (id int, name varchar(128), primary key(id));
create table mfg2 (id int, name varchar(128), primary key(id));
`
uksVSchema = `
{
"sharded": false,
"tables": {
"product": {},
"cat": {},
"mfg": {}
}
}`

sksVSchema = `
{
"sharded": true,
"tables": {
"product": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"cat": {
"type": "reference",
"source": "uks.cat"
},
"mfg2": {
"type": "reference",
"source": "uks.mfg"
}
},
"vindexes": {
"hash": {
"type": "hash"
}
}
}`
materializeCatSpec = `
{
"workflow": "wfCat",
"source_keyspace": "uks",
"target_keyspace": "sks",
"table_settings": [ {"target_table": "cat", "source_expression": "select id, name from cat" }]
}`
materializeMfgSpec = `
{
"workflow": "wfMfg",
"source_keyspace": "uks",
"target_keyspace": "sks",
"table_settings": [ {"target_table": "mfg2", "source_expression": "select id, name from mfg" }]
}`
initializeTables = `
use uks;
insert into product values (1, 1, 1, 'p1');
insert into product values (2, 2, 2, 'p2');
insert into product values (3, 3, 3, 'p3');
insert into cat values (1, 'c1');
insert into cat values (2, 'c2');
insert into cat values (3, 'c3');
insert into mfg values (1, 'm1');
insert into mfg values (2, 'm2');
insert into mfg values (3, 'm3');
`
)

func TestReferenceTableMaterializationAndRouting(t *testing.T) {
var err error
defaultCellName := "zone1"
allCells := []string{defaultCellName}
allCellNames = defaultCellName
vc = NewVitessCluster(t, "TestReferenceTableMaterializationAndRouting", allCells, mainClusterConfig)
defer vc.TearDown(t)
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func() { defaultReplicas = 1 }()
uks := "uks"
sks := "sks"

defaultCell := vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, uks, "0", uksVSchema, uksSchema, defaultReplicas, defaultRdonly, 100, nil)
vc.AddKeyspace(t, []*Cell{defaultCell}, sks, "-80,80-", sksVSchema, sksSchema, defaultReplicas, defaultRdonly, 200, nil)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
verifyClusterHealth(t, vc)
_, _, err = vtgateConn.ExecuteFetchMulti(initializeTables, 0, false)
require.NoError(t, err)
materialize(t, materializeCatSpec, false)
materialize(t, materializeMfgSpec, false)

tabDash80 := vc.getPrimaryTablet(t, sks, "-80")
tab80Dash := vc.getPrimaryTablet(t, sks, "80-")
catchup(t, tabDash80, "wfCat", "Materialize Category")
catchup(t, tab80Dash, "wfCat", "Materialize Category")
catchup(t, tabDash80, "wfMfg", "Materialize Manufacturer")
catchup(t, tab80Dash, "wfMfg", "Materialize Manufacturer")

vtgateConn.Close()
vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
waitForRowCount(t, vtgateConn, sks, "cat", 3)
waitForRowCount(t, vtgateConn, sks, "mfg2", 3)

insertQuery := "insert into mfg values (4, 'm4')"
_, err = vtgateConn.ExecuteFetch(insertQuery, 0, false)
require.Contains(t, err.Error(), "table mfg not found")

insertQuery = "insert into mfg2 values (4, 'm4')"
_, err = vtgateConn.ExecuteFetch(insertQuery, 0, false)
require.Contains(t, err.Error(), "Table 'vt_uks.mfg2' doesn't exist")

insertQuery = "insert into uks.mfg values (4, 'm4')"
_, err = vtgateConn.ExecuteFetch(insertQuery, 0, false)
require.NoError(t, err)
}

0 comments on commit d325cc1

Please sign in to comment.