Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-tenant MoveTables: allow switching replica/rdonly traffic separately before switching primary traffic #15768

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a7b9bf7
Modify vtgate route checks to include tablet types for keyspace routi…
rohit-nayak-ps Apr 19, 2024
7427810
Update e2e MultiTenantSimple test to switch reads and writes separate…
rohit-nayak-ps Apr 21, 2024
f8454c4
Add switching keyspace reads to switcher interface/implementations
rohit-nayak-ps Apr 21, 2024
3161ee3
Add helpers to get state of current route for a workflow
rohit-nayak-ps Apr 21, 2024
ba481c9
Implement switch reads for multi-tenant migrations
rohit-nayak-ps Apr 21, 2024
dabc1c2
Validate route in e2e test using vexplain.
rohit-nayak-ps Apr 22, 2024
7a6d21e
Update logic to find correct route for table using keyspace routing r…
rohit-nayak-ps Apr 22, 2024
ca1c031
Update vtctld logic to route and switch separately for replica/rdonly
rohit-nayak-ps Apr 22, 2024
1ed99f0
Self-review
rohit-nayak-ps Apr 22, 2024
a307f55
Improve check for reads/writes switched
rohit-nayak-ps Apr 23, 2024
201b838
Confirm reverse replication works
rohit-nayak-ps Apr 23, 2024
7404c8f
Refactor how keyspace routing state is set on building workflow state
rohit-nayak-ps Apr 23, 2024
81257be
Tiny refactor
rohit-nayak-ps Apr 23, 2024
bb531af
Don't route target keyspace since multiple tenants are being migrated…
rohit-nayak-ps Apr 23, 2024
d00e8aa
Add rows after complete for concurrent migration test
rohit-nayak-ps Apr 23, 2024
9fa5a63
Address review comment: remove superfluous json specifiers
rohit-nayak-ps Apr 27, 2024
3d3eb22
Address more review comments
rohit-nayak-ps Apr 27, 2024
116a3b4
Fix incorrectly fixed review comment
rohit-nayak-ps Apr 27, 2024
430d2ca
Fix merge issue
rohit-nayak-ps Apr 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"vitess.io/vitess/go/json2"
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqlescape"
Expand Down Expand Up @@ -284,6 +285,7 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da
require.NoError(t, err)
require.NotNil(t, qr)
if wantRes == fmt.Sprintf("%v", qr.Rows) {
log.Infof("waitForRowCountInTablet: found %d rows in table %s on tablet %s", want, table, vttablet.Name)
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
return
}
select {
Expand Down Expand Up @@ -986,3 +988,45 @@ func getCellNames(cells []*Cell) string {
}
return strings.Join(cellNames, ",")
}

// VExplainPlan is the struct that represents the json output of a vexplain query.
type VExplainPlan struct {
OperatorType string
Variant string
Keyspace VExplainKeyspace
FieldQuery string
Query string
Table string
}

type VExplainKeyspace struct {
Name string
Sharded bool
}

// vexplain runs vexplain on the given query and returns the plan. Useful for validating routing rules.
func vexplain(t *testing.T, database, query string) *VExplainPlan {
vtgateConn := vc.GetVTGateConn(t)
defer vtgateConn.Close()

qr := execVtgateQuery(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query))
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
json := qr.Rows[0][0].ToString()

var plan VExplainPlan
require.NoError(t, json2.Unmarshal([]byte(json), &plan))
return &plan
}

// confirmKeyspacesRoutedTo confirms that the given keyspaces are routed as expected for the given tablet types, using vexplain.
func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, table string, tabletTypes []string) {
if len(tabletTypes) == 0 {
tabletTypes = []string{"primary", "replica", "rdonly"}
}
for _, tt := range tabletTypes {
database := fmt.Sprintf("%s@%s", keyspace, tt)
plan := vexplain(t, database, fmt.Sprintf("select * from %s.%s", keyspace, table))
require.Equalf(t, routedKeyspace, plan.Keyspace.Name, "for database %s, keyspace %v, tabletType %s", database, keyspace, tt)
}
}
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
109 changes: 64 additions & 45 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
Expand Down Expand Up @@ -132,13 +131,6 @@ func TestMultiTenantSimple(t *testing.T) {
_, err = vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, sourceKeyspace, "0", stVSchema, stSchema, 1, 0, getInitialTabletIdForTenant(tenantId), nil)
require.NoError(t, err)

targetPrimary := vc.getPrimaryTablet(t, targetKeyspace, "0")
sourcePrimary := vc.getPrimaryTablet(t, sourceKeyspace, "0")
primaries := map[string]*cluster.VttabletProcess{
"target": targetPrimary,
"source": sourcePrimary,
}

vtgateConn, closeConn := getVTGateConn()
defer closeConn()
numRows := 10
Expand All @@ -164,64 +156,90 @@ func TestMultiTenantSimple(t *testing.T) {
},
})

preSwitchRules := &vschemapb.KeyspaceRoutingRules{
// Expected keyspace routing rules on creation of the workflow.
initialRules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "s1"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "s1"},
{FromKeyspace: "s1@replica", ToKeyspace: "s1"},
},
}
postSwitchRules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "mt"},
},
}
rulesMap := map[string]*vschemapb.KeyspaceRoutingRules{
"pre": preSwitchRules,
"post": postSwitchRules,
}

require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules))
mt.Create()
validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, false)
// Note: we cannot insert into the target keyspace since that is never routed to the source keyspace.
confirmKeyspacesRoutedTo(t, sourceKeyspace, "s1", "t1", nil)
validateKeyspaceRoutingRules(t, vc, initialRules)

lastIndex = insertRows(lastIndex, sourceKeyspace)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, mt.workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
mt.SwitchReadsAndWrites()
validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, true)

mt.SwitchReads()
confirmOnlyReadsSwitched(t)

mt.SwitchWrites()
confirmBothReadsAndWritesSwitched(t)

// Note: here we have already switched, and we can insert into the target keyspace, and it should get reverse
// replicated to the source keyspace. The source keyspace is routed to the target keyspace at this point.
lastIndex = insertRows(lastIndex, sourceKeyspace)
sourceTablet := vc.getPrimaryTablet(t, sourceKeyspace, "0")
require.NotNil(t, sourceTablet)
// Wait for the rows to be reverse replicated to the source keyspace.
waitForRowCountInTablet(t, sourceTablet, sourceKeyspace, "t1", int(lastIndex))

mt.Complete()
require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules))
// Targeting to target keyspace should start working now. Upto this point we had to target the source keyspace.
lastIndex = insertRows(lastIndex, targetKeyspace)

actualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
log.Infof("Migration completed, total rows in target: %d", actualRowsInserted)
require.Equal(t, lastIndex, int64(actualRowsInserted))

}

// If switched, queries with source qualifiers should execute on target, else on source. Confirm that
// the routing rules are as expected and that the query executes on the expected tablet.
func validateKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, primaries map[string]*cluster.VttabletProcess, rulesMap map[string]*vschemapb.KeyspaceRoutingRules, switched bool) {
currentRules := getKeyspaceRoutingRules(t, vc)
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
queryTemplate := "select count(*) from %s.t1"
matchQuery := "select count(*) from t1"
func confirmOnlyReadsSwitched(t *testing.T) {
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"rdonly", "replica"})
confirmKeyspacesRoutedTo(t, "s1", "s1", "t1", []string{"primary"})
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
rules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "s1"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "mt"},
{FromKeyspace: "s1@replica", ToKeyspace: "mt"},
},
}
validateKeyspaceRoutingRules(t, vc, rules)
}

validateQueryRoute := func(qualifier, dest string) {
query := fmt.Sprintf(queryTemplate, qualifier)
assertQueryExecutesOnTablet(t, vtgateConn, primaries[dest], "", query, matchQuery)
log.Infof("query %s executed on %s", query, dest)
func confirmOnlyWritesSwitched(t *testing.T) {
confirmKeyspacesRoutedTo(t, "s1", "s1", "t1", []string{"rdonly", "replica"})
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"primary"})
rules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "mt"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "s1"},
{FromKeyspace: "s1@replica", ToKeyspace: "s1"},
},
}
validateKeyspaceRoutingRules(t, vc, rules)
}

if switched {
require.ElementsMatch(t, rulesMap["post"].Rules, currentRules.Rules)
validateQueryRoute("mt", "target")
validateQueryRoute("s1", "target")
} else {
require.ElementsMatch(t, rulesMap["pre"].Rules, currentRules.Rules)
// Note that with multi-tenant migration, we cannot redirect the target keyspace since
// there are multiple source keyspaces and the target has the aggregate of all the tenants.
validateQueryRoute("mt", "target")
validateQueryRoute("s1", "source")
func confirmBothReadsAndWritesSwitched(t *testing.T) {
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"rdonly", "replica"})
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"primary"})
rules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "mt"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "mt"},
{FromKeyspace: "s1@replica", ToKeyspace: "mt"},
},
}
validateKeyspaceRoutingRules(t, vc, rules)
}

func validateKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, expectedRules *vschemapb.KeyspaceRoutingRules) {
currentRules := getKeyspaceRoutingRules(t, vc)
require.ElementsMatch(t, expectedRules.Rules, currentRules.Rules)
}

func getSourceKeyspace(tenantId int64) string {
Expand Down Expand Up @@ -280,7 +298,7 @@ func TestMultiTenantComplex(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
t.Run("Verify all rows have been migrated", func(t *testing.T) {
numAdditionalInsertSets := 2 // during the SwitchTraffic stop
numAdditionalInsertSets := 2 /* during the SwitchTraffic stop */ + 1 /* after Complete */
totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*numAdditionalInsertSets
totalRowsInserted := totalRowsInsertedPerTenant * numTenants
totalActualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", mtm.targetKeyspace, "t1"))
Expand Down Expand Up @@ -398,6 +416,7 @@ func (mtm *multiTenantMigration) switchTraffic(tenantId int64) {
func (mtm *multiTenantMigration) complete(tenantId int64) {
mt := mtm.getActiveMoveTables(tenantId)
mt.Complete()
mtm.insertSomeData(mtm.t, tenantId, mtm.targetKeyspace, numAdditionalRowsPerTenant)
vtgateConn := vc.GetVTGateConn(mtm.t)
defer vtgateConn.Close()
waitForQueryResult(mtm.t, vtgateConn, "",
Expand Down
26 changes: 14 additions & 12 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
}

func (vmt *VtctlMoveTables) Show() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand All @@ -148,12 +148,12 @@ func (vmt *VtctlMoveTables) exec(action string) {
require.NoError(vmt.vc.t, err)
}
func (vmt *VtctlMoveTables) SwitchReads() {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (vmt *VtctlMoveTables) SwitchWrites() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down Expand Up @@ -232,13 +232,15 @@ func (v VtctldMoveTables) Show() {
}

func (v VtctldMoveTables) SwitchReads() {
//TODO implement me
panic("implement me")
args := []string{"SwitchTraffic", "--tablet-types=rdonly,replica"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldMoveTables) SwitchWrites() {
//TODO implement me
panic("implement me")
args := []string{"SwitchTraffic", "--tablet-types=primary"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldMoveTables) Cancel() {
Expand Down Expand Up @@ -327,7 +329,7 @@ func (vrs *VtctlReshard) ReverseReadsAndWrites() {
}

func (vrs *VtctlReshard) Show() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand All @@ -339,12 +341,12 @@ func (vrs *VtctlReshard) exec(action string) {
}

func (vrs *VtctlReshard) SwitchReads() {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (vrs *VtctlReshard) SwitchWrites() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down Expand Up @@ -421,12 +423,12 @@ func (v VtctldReshard) Show() {
}

func (v VtctldReshard) SwitchReads() {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (v VtctldReshard) SwitchWrites() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down
Loading
Loading