Skip to content

Commit

Permalink
Multi-tenant MoveTables: allow switching replica/rdonly traffic separ…
Browse files Browse the repository at this point in the history
…ately before switching primary traffic (#15768)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Apr 27, 2024
1 parent 5fd70c4 commit 1e143c3
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 104 deletions.
44 changes: 44 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqlescape"
Expand Down Expand Up @@ -284,6 +285,7 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da
require.NoError(t, err)
require.NotNil(t, qr)
if wantRes == fmt.Sprintf("%v", qr.Rows) {
log.Infof("waitForRowCountInTablet: found %d rows in table %s on tablet %s", want, table, vttablet.Name)
return
}
select {
Expand Down Expand Up @@ -986,3 +988,45 @@ func getCellNames(cells []*Cell) string {
}
return strings.Join(cellNames, ",")
}

// VExplainPlan is the struct that represents the json output of a vexplain query.
type VExplainPlan struct {
OperatorType string
Variant string
Keyspace VExplainKeyspace
FieldQuery string
Query string
Table string
}

type VExplainKeyspace struct {
Name string
Sharded bool
}

// vexplain runs vexplain on the given query and returns the plan. Useful for validating routing rules.
func vexplain(t *testing.T, database, query string) *VExplainPlan {
vtgateConn := vc.GetVTGateConn(t)
defer vtgateConn.Close()

qr := execVtgateQuery(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query))
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
json := qr.Rows[0][0].ToString()

var plan VExplainPlan
require.NoError(t, json2.Unmarshal([]byte(json), &plan))
return &plan
}

// confirmKeyspacesRoutedTo confirms that the given keyspaces are routed as expected for the given tablet types, using vexplain.
func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, table string, tabletTypes []string) {
if len(tabletTypes) == 0 {
tabletTypes = []string{"primary", "replica", "rdonly"}
}
for _, tt := range tabletTypes {
database := fmt.Sprintf("%s@%s", keyspace, tt)
plan := vexplain(t, database, fmt.Sprintf("select * from %s.%s", keyspace, table))
require.Equalf(t, routedKeyspace, plan.Keyspace.Name, "for database %s, keyspace %v, tabletType %s", database, keyspace, tt)
}
}
109 changes: 64 additions & 45 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
Expand Down Expand Up @@ -132,13 +131,6 @@ func TestMultiTenantSimple(t *testing.T) {
_, err = vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, sourceKeyspace, "0", stVSchema, stSchema, 1, 0, getInitialTabletIdForTenant(tenantId), nil)
require.NoError(t, err)

targetPrimary := vc.getPrimaryTablet(t, targetKeyspace, "0")
sourcePrimary := vc.getPrimaryTablet(t, sourceKeyspace, "0")
primaries := map[string]*cluster.VttabletProcess{
"target": targetPrimary,
"source": sourcePrimary,
}

vtgateConn, closeConn := getVTGateConn()
defer closeConn()
numRows := 10
Expand All @@ -164,64 +156,90 @@ func TestMultiTenantSimple(t *testing.T) {
},
})

preSwitchRules := &vschemapb.KeyspaceRoutingRules{
// Expected keyspace routing rules on creation of the workflow.
initialRules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "s1"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "s1"},
{FromKeyspace: "s1@replica", ToKeyspace: "s1"},
},
}
postSwitchRules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "mt"},
},
}
rulesMap := map[string]*vschemapb.KeyspaceRoutingRules{
"pre": preSwitchRules,
"post": postSwitchRules,
}

require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules))
mt.Create()
validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, false)
// Note: we cannot insert into the target keyspace since that is never routed to the source keyspace.
confirmKeyspacesRoutedTo(t, sourceKeyspace, "s1", "t1", nil)
validateKeyspaceRoutingRules(t, vc, initialRules)

lastIndex = insertRows(lastIndex, sourceKeyspace)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, mt.workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
mt.SwitchReadsAndWrites()
validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, true)

mt.SwitchReads()
confirmOnlyReadsSwitched(t)

mt.SwitchWrites()
confirmBothReadsAndWritesSwitched(t)

// Note: here we have already switched, and we can insert into the target keyspace, and it should get reverse
// replicated to the source keyspace. The source keyspace is routed to the target keyspace at this point.
lastIndex = insertRows(lastIndex, sourceKeyspace)
sourceTablet := vc.getPrimaryTablet(t, sourceKeyspace, "0")
require.NotNil(t, sourceTablet)
// Wait for the rows to be reverse replicated to the source keyspace.
waitForRowCountInTablet(t, sourceTablet, sourceKeyspace, "t1", int(lastIndex))

mt.Complete()
require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules))
// Targeting to target keyspace should start working now. Upto this point we had to target the source keyspace.
lastIndex = insertRows(lastIndex, targetKeyspace)

actualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
log.Infof("Migration completed, total rows in target: %d", actualRowsInserted)
require.Equal(t, lastIndex, int64(actualRowsInserted))

}

// If switched, queries with source qualifiers should execute on target, else on source. Confirm that
// the routing rules are as expected and that the query executes on the expected tablet.
func validateKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, primaries map[string]*cluster.VttabletProcess, rulesMap map[string]*vschemapb.KeyspaceRoutingRules, switched bool) {
currentRules := getKeyspaceRoutingRules(t, vc)
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
queryTemplate := "select count(*) from %s.t1"
matchQuery := "select count(*) from t1"
func confirmOnlyReadsSwitched(t *testing.T) {
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"rdonly", "replica"})
confirmKeyspacesRoutedTo(t, "s1", "s1", "t1", []string{"primary"})
rules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "s1"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "mt"},
{FromKeyspace: "s1@replica", ToKeyspace: "mt"},
},
}
validateKeyspaceRoutingRules(t, vc, rules)
}

validateQueryRoute := func(qualifier, dest string) {
query := fmt.Sprintf(queryTemplate, qualifier)
assertQueryExecutesOnTablet(t, vtgateConn, primaries[dest], "", query, matchQuery)
log.Infof("query %s executed on %s", query, dest)
func confirmOnlyWritesSwitched(t *testing.T) {
confirmKeyspacesRoutedTo(t, "s1", "s1", "t1", []string{"rdonly", "replica"})
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"primary"})
rules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "mt"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "s1"},
{FromKeyspace: "s1@replica", ToKeyspace: "s1"},
},
}
validateKeyspaceRoutingRules(t, vc, rules)
}

if switched {
require.ElementsMatch(t, rulesMap["post"].Rules, currentRules.Rules)
validateQueryRoute("mt", "target")
validateQueryRoute("s1", "target")
} else {
require.ElementsMatch(t, rulesMap["pre"].Rules, currentRules.Rules)
// Note that with multi-tenant migration, we cannot redirect the target keyspace since
// there are multiple source keyspaces and the target has the aggregate of all the tenants.
validateQueryRoute("mt", "target")
validateQueryRoute("s1", "source")
func confirmBothReadsAndWritesSwitched(t *testing.T) {
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"rdonly", "replica"})
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"primary"})
rules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "s1", ToKeyspace: "mt"},
{FromKeyspace: "s1@rdonly", ToKeyspace: "mt"},
{FromKeyspace: "s1@replica", ToKeyspace: "mt"},
},
}
validateKeyspaceRoutingRules(t, vc, rules)
}

func validateKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, expectedRules *vschemapb.KeyspaceRoutingRules) {
currentRules := getKeyspaceRoutingRules(t, vc)
require.ElementsMatch(t, expectedRules.Rules, currentRules.Rules)
}

func getSourceKeyspace(tenantId int64) string {
Expand Down Expand Up @@ -280,7 +298,7 @@ func TestMultiTenantComplex(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
t.Run("Verify all rows have been migrated", func(t *testing.T) {
numAdditionalInsertSets := 2 // during the SwitchTraffic stop
numAdditionalInsertSets := 2 /* during the SwitchTraffic stop */ + 1 /* after Complete */
totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*numAdditionalInsertSets
totalRowsInserted := totalRowsInsertedPerTenant * numTenants
totalActualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", mtm.targetKeyspace, "t1"))
Expand Down Expand Up @@ -398,6 +416,7 @@ func (mtm *multiTenantMigration) switchTraffic(tenantId int64) {
func (mtm *multiTenantMigration) complete(tenantId int64) {
mt := mtm.getActiveMoveTables(tenantId)
mt.Complete()
mtm.insertSomeData(mtm.t, tenantId, mtm.targetKeyspace, numAdditionalRowsPerTenant)
vtgateConn := vc.GetVTGateConn(mtm.t)
defer vtgateConn.Close()
waitForQueryResult(mtm.t, vtgateConn, "",
Expand Down
26 changes: 14 additions & 12 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
}

func (vmt *VtctlMoveTables) Show() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand All @@ -148,12 +148,12 @@ func (vmt *VtctlMoveTables) exec(action string) {
require.NoError(vmt.vc.t, err)
}
func (vmt *VtctlMoveTables) SwitchReads() {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (vmt *VtctlMoveTables) SwitchWrites() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down Expand Up @@ -232,13 +232,15 @@ func (v VtctldMoveTables) Show() {
}

func (v VtctldMoveTables) SwitchReads() {
//TODO implement me
panic("implement me")
args := []string{"SwitchTraffic", "--tablet-types=rdonly,replica"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldMoveTables) SwitchWrites() {
//TODO implement me
panic("implement me")
args := []string{"SwitchTraffic", "--tablet-types=primary"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldMoveTables) Cancel() {
Expand Down Expand Up @@ -327,7 +329,7 @@ func (vrs *VtctlReshard) ReverseReadsAndWrites() {
}

func (vrs *VtctlReshard) Show() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand All @@ -339,12 +341,12 @@ func (vrs *VtctlReshard) exec(action string) {
}

func (vrs *VtctlReshard) SwitchReads() {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (vrs *VtctlReshard) SwitchWrites() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down Expand Up @@ -421,12 +423,12 @@ func (v VtctldReshard) Show() {
}

func (v VtctldReshard) SwitchReads() {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (v VtctldReshard) SwitchWrites() {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down
Loading

0 comments on commit 1e143c3

Please sign in to comment.