Skip to content

Commit

Permalink
add support for vtgate traffic mirroring (queryserving) (#15992)
Browse files Browse the repository at this point in the history
Signed-off-by: Max Englander <[email protected]>
Signed-off-by: Max Englander <[email protected]>
Signed-off-by: Andres Taylor <[email protected]>
Co-authored-by: Deepthi Sigireddi <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
Co-authored-by: Harshit Gangal <[email protected]>
  • Loading branch information
4 people authored Aug 28, 2024
1 parent d0d2679 commit fdb7f30
Show file tree
Hide file tree
Showing 47 changed files with 2,541 additions and 110 deletions.
186 changes: 183 additions & 3 deletions go/test/endtoend/vtgate/queries/benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ package dml

import (
"fmt"
"maps"
"math/rand/v2"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
mapsx "golang.org/x/exp/maps"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/utils"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

type testQuery struct {
Expand Down Expand Up @@ -127,7 +135,7 @@ func BenchmarkShardedTblNoLookup(b *testing.B) {
}
for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} {
insStmt := tq.getInsertQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, insStmt)
}
Expand All @@ -150,7 +158,7 @@ func BenchmarkShardedTblUpdateIn(b *testing.B) {
_ = utils.Exec(b, conn, insStmt)
for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} {
updStmt := tq.getUpdateQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, updStmt)
}
Expand All @@ -168,7 +176,7 @@ func BenchmarkShardedTblDeleteIn(b *testing.B) {
insStmt := tq.getInsertQuery(rows)
_ = utils.Exec(b, conn, insStmt)
delStmt := tq.getDeleteQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, delStmt)
}
Expand Down Expand Up @@ -197,3 +205,175 @@ func BenchmarkShardedAggrPushDown(b *testing.B) {
}
}
}

var mirrorInitOnce sync.Once

func BenchmarkMirror(b *testing.B) {
const numRows = 10000

conn, closer := start(b)
defer closer()

// Each time this BenchmarkMirror runs, use a different source of
// randomness. But use the same source of randomness across test cases and
// mirror percentages sub test cases.
pcg := rand.NewPCG(rand.Uint64(), rand.Uint64())

ksTables := map[string]string{
sKs2: "mirror_tbl1",
sKs3: "mirror_tbl2",
}
targetKeyspaces := mapsx.Keys(ksTables)

mirrorInitOnce.Do(func() {
b.Logf("seeding database for benchmark...")

for i := 0; i < numRows; i++ {
_, err := conn.ExecuteFetch(
fmt.Sprintf("INSERT INTO %s.mirror_tbl1(id) VALUES(%d)", sKs1, i), -1, false)
require.NoError(b, err)

_, err = conn.ExecuteFetch(
fmt.Sprintf("INSERT INTO %s.mirror_tbl2(id) VALUES(%d)", sKs1, i), -1, false)
require.NoError(b, err)
}

_, err := conn.ExecuteFetch(
fmt.Sprintf("SELECT COUNT(id) FROM %s.%s", sKs1, "mirror_tbl1"), 1, false)
require.NoError(b, err)

b.Logf("finished (inserted %d rows)", numRows)

b.Logf("using MoveTables to copy data from source keyspace to target keyspaces")

// Set up MoveTables workflows, which is (at present) the only way to set up
// mirror rules.
for tks, tbl := range ksTables {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"MoveTables", "--target-keyspace", tks, "--workflow", fmt.Sprintf("%s2%s", sKs1, tks),
"create", "--source-keyspace", sKs1, "--tables", tbl)
require.NoError(b, err, output)
}

// Wait for tables to be copied from source to targets.
pending := make(map[string]string, len(ksTables))
maps.Copy(pending, ksTables)
for len(pending) > 0 {
for tks := range ksTables {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"Workflow", "--keyspace", tks, "show", "--workflow", fmt.Sprintf("%s2%s", sKs1, tks))
require.NoError(b, err, output)

var response vtctldatapb.GetWorkflowsResponse
require.NoError(b, protojson.Unmarshal([]byte(output), &response))

require.Len(b, response.Workflows, 1)
workflow := response.Workflows[0]

require.Len(b, workflow.ShardStreams, 4 /*shards*/)
for _, ss := range workflow.ShardStreams {
for _, s := range ss.Streams {
if s.State == "Running" {
delete(pending, tks)
} else {
b.Logf("waiting for workflow %s.%s stream %s=>%s to be running; last state: %s",
workflow.Target, workflow.Name, s.BinlogSource.Shard, s.Shard, s.State)
time.Sleep(1 * time.Second)
}
}
}
}
}
})

testCases := []struct {
name string
run func(*testing.B, *rand.Rand)
}{
{
name: "point select, { sks1 => sks2 }.mirror_tbl1",
run: func(b *testing.B, rnd *rand.Rand) {
for i := 0; i < b.N; i++ {
id := rnd.Int32N(numRows)
_, err := conn.ExecuteFetch(fmt.Sprintf(
"SELECT t1.id FROM %s.mirror_tbl1 AS t1 WHERE t1.id = %d",
sKs1, id,
), 1, false)
if err != nil {
b.Error(err)
}
}
},
},
{
name: "point select, { sks1 => sks2 }.mirror_tbl1, { sks1 => sks3 }.mirror_tbl2",
run: func(b *testing.B, rnd *rand.Rand) {
for i := 0; i < b.N; i++ {
id := rnd.Int32N(numRows)
_, err := conn.ExecuteFetch(fmt.Sprintf(
"SELECT t1.id, t2.id FROM %s.mirror_tbl1 AS t1, %s.mirror_tbl2 AS t2 WHERE t1.id = %d AND t2.id = %d",
sKs1, sKs1, id, id,
), 1, false)
if err != nil {
b.Error(err)
}
}
},
},
}

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
b.Run("mirror 0%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 0)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 1%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 1)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 5%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 5)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 10%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 10)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 25%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 25)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 50%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 50)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 100%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 100)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})
})
}
}

func mirrorTraffic(b *testing.B, targetKeyspaces []string, percent float32) {
for _, tks := range targetKeyspaces {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"MoveTables", "--target-keyspace", tks, "--workflow", fmt.Sprintf("%s2%s", sKs1, tks),
"mirrortraffic", "--percent", fmt.Sprintf("%.02f", percent))
require.NoError(b, err, output)
}
}
108 changes: 79 additions & 29 deletions go/test/endtoend/vtgate/queries/benchmark/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
_ "embed"
"flag"
"fmt"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -34,36 +36,34 @@ var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
sKs = "sks"
uKs = "uks"
sKs1 = "sks1"
sKs2 = "sks2"
sKs3 = "sks3"
cell = "test"

//go:embed sharded_schema.sql
sSchemaSQL string
//go:embed sharded_schema1.sql
sSchemaSQL1 string

//go:embed vschema.json
sVSchema string
)
//go:embed vschema1.json
sVSchema1 string

var (
shards4 = []string{
"-40", "40-80", "80-c0", "c0-",
}
//go:embed sharded_schema2.sql
sSchemaSQL2 string

shards8 = []string{
"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-",
}
//go:embed vschema2.json
sVSchema2 string

shards16 = []string{
"-10", "10-20", "20-30", "30-40", "40-50", "50-60", "60-70", "70-80", "80-90", "90-a0", "a0-b0", "b0-c0", "c0-d0", "d0-e0", "e0-f0", "f0-",
}
//go:embed sharded_schema3.sql
sSchemaSQL3 string

shards32 = []string{
"-05", "05-10", "10-15", "15-20", "20-25", "25-30", "30-35", "35-40", "40-45", "45-50", "50-55", "55-60", "60-65", "65-70", "70-75", "75-80",
"80-85", "85-90", "90-95", "95-a0", "a0-a5", "a5-b0", "b0-b5", "b5-c0", "c0-c5", "c5-d0", "d0-d5", "d5-e0", "e0-e5", "e5-f0", "f0-f5", "f5-",
}
//go:embed vschema3.json
sVSchema3 string
)

var shards4 = []string{
"-40", "40-80", "80-c0", "c0-",
}

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand All @@ -78,14 +78,38 @@ func TestMain(m *testing.M) {
return 1
}

// Start sharded keyspace
sKeyspace := &cluster.Keyspace{
Name: sKs,
SchemaSQL: sSchemaSQL,
VSchema: sVSchema,
// Start sharded keyspace 1
sKeyspace1 := &cluster.Keyspace{
Name: sKs1,
SchemaSQL: sSchemaSQL1,
VSchema: sVSchema1,
}

err = clusterInstance.StartKeyspace(*sKeyspace, shards4, 0, false)
err = clusterInstance.StartKeyspace(*sKeyspace1, shards4, 0, false)
if err != nil {
return 1
}

// Start sharded keyspace 2
sKeyspace2 := &cluster.Keyspace{
Name: sKs2,
SchemaSQL: sSchemaSQL2,
VSchema: sVSchema2,
}

err = clusterInstance.StartKeyspace(*sKeyspace2, shards4, 0, false)
if err != nil {
return 1
}

// Start sharded keyspace 3
sKeyspace3 := &cluster.Keyspace{
Name: sKs3,
SchemaSQL: sSchemaSQL3,
VSchema: sVSchema3,
}

err = clusterInstance.StartKeyspace(*sKeyspace3, shards4, 0, false)
if err != nil {
return 1
}
Expand All @@ -96,7 +120,7 @@ func TestMain(m *testing.M) {
return 1
}

vtParams = clusterInstance.GetVTParams(sKs)
vtParams = clusterInstance.GetVTParams("@primary")

return m.Run()
}()
Expand All @@ -108,12 +132,38 @@ func start(b *testing.B) (*mysql.Conn, func()) {
require.NoError(b, err)

deleteAll := func() {
tables := []string{"tbl_no_lkp_vdx"}
tables := []string{
fmt.Sprintf("%s.tbl_no_lkp_vdx", sKs1),
fmt.Sprintf("%s.mirror_tbl1", sKs1),
fmt.Sprintf("%s.mirror_tbl2", sKs1),
fmt.Sprintf("%s.mirror_tbl1", sKs2),
fmt.Sprintf("%s.mirror_tbl2", sKs3),
}
for _, table := range tables {
_, _ = utils.ExecAllowError(b, conn, "delete from "+table)
}
}

// Make sure all keyspaces are serving.
pending := map[string]string{
sKs1: "mirror_tbl1",
sKs2: "mirror_tbl1",
sKs3: "mirror_tbl2",
}
for len(pending) > 0 {
for ks, tbl := range pending {
_, err := conn.ExecuteFetch(
fmt.Sprintf("SELECT COUNT(id) FROM %s.%s", ks, tbl), 1, false)
if err != nil {
b.Logf("waiting for keyspace %s to be serving; last error: %v", ks, err)
time.Sleep(1 * time.Second)
} else {
delete(pending, ks)
}
}
}

// Delete any pre-existing data.
deleteAll()

return conn, func() {
Expand Down
Loading

0 comments on commit fdb7f30

Please sign in to comment.