diff --git a/go/test/endtoend/vtgate/queries/benchmark/benchmark_test.go b/go/test/endtoend/vtgate/queries/benchmark/benchmark_test.go index b76ae5a35c7..5e3be7c7eff 100644 --- a/go/test/endtoend/vtgate/queries/benchmark/benchmark_test.go +++ b/go/test/endtoend/vtgate/queries/benchmark/benchmark_test.go @@ -18,12 +18,20 @@ package dml import ( "fmt" + "maps" "math/rand/v2" "strconv" "strings" + "sync" "testing" + "time" + + "github.com/stretchr/testify/require" + mapsx "golang.org/x/exp/maps" + "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/test/endtoend/utils" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) type testQuery struct { @@ -127,7 +135,7 @@ func BenchmarkShardedTblNoLookup(b *testing.B) { } for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} { insStmt := tq.getInsertQuery(rows) - b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) { + b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) { for i := 0; i < b.N; i++ { _ = utils.Exec(b, conn, insStmt) } @@ -150,7 +158,7 @@ func BenchmarkShardedTblUpdateIn(b *testing.B) { _ = utils.Exec(b, conn, insStmt) for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} { updStmt := tq.getUpdateQuery(rows) - b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) { + b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) { for i := 0; i < b.N; i++ { _ = utils.Exec(b, conn, updStmt) } @@ -168,7 +176,7 @@ func BenchmarkShardedTblDeleteIn(b *testing.B) { insStmt := tq.getInsertQuery(rows) _ = utils.Exec(b, conn, insStmt) delStmt := tq.getDeleteQuery(rows) - b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) { + b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) { for i := 0; i < b.N; i++ { _ = utils.Exec(b, conn, delStmt) } @@ -197,3 +205,175 @@ func BenchmarkShardedAggrPushDown(b *testing.B) { } } } + +var mirrorInitOnce sync.Once + +func BenchmarkMirror(b *testing.B) { + const numRows = 10000 + + conn, closer := start(b) + defer closer() + + // Each time this BenchmarkMirror runs, use a different source of + // randomness. But use the same source of randomness across test cases and + // mirror percentages sub test cases. + pcg := rand.NewPCG(rand.Uint64(), rand.Uint64()) + + ksTables := map[string]string{ + sKs2: "mirror_tbl1", + sKs3: "mirror_tbl2", + } + targetKeyspaces := mapsx.Keys(ksTables) + + mirrorInitOnce.Do(func() { + b.Logf("seeding database for benchmark...") + + for i := 0; i < numRows; i++ { + _, err := conn.ExecuteFetch( + fmt.Sprintf("INSERT INTO %s.mirror_tbl1(id) VALUES(%d)", sKs1, i), -1, false) + require.NoError(b, err) + + _, err = conn.ExecuteFetch( + fmt.Sprintf("INSERT INTO %s.mirror_tbl2(id) VALUES(%d)", sKs1, i), -1, false) + require.NoError(b, err) + } + + _, err := conn.ExecuteFetch( + fmt.Sprintf("SELECT COUNT(id) FROM %s.%s", sKs1, "mirror_tbl1"), 1, false) + require.NoError(b, err) + + b.Logf("finished (inserted %d rows)", numRows) + + b.Logf("using MoveTables to copy data from source keyspace to target keyspaces") + + // Set up MoveTables workflows, which is (at present) the only way to set up + // mirror rules. + for tks, tbl := range ksTables { + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "MoveTables", "--target-keyspace", tks, "--workflow", fmt.Sprintf("%s2%s", sKs1, tks), + "create", "--source-keyspace", sKs1, "--tables", tbl) + require.NoError(b, err, output) + } + + // Wait for tables to be copied from source to targets. + pending := make(map[string]string, len(ksTables)) + maps.Copy(pending, ksTables) + for len(pending) > 0 { + for tks := range ksTables { + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "Workflow", "--keyspace", tks, "show", "--workflow", fmt.Sprintf("%s2%s", sKs1, tks)) + require.NoError(b, err, output) + + var response vtctldatapb.GetWorkflowsResponse + require.NoError(b, protojson.Unmarshal([]byte(output), &response)) + + require.Len(b, response.Workflows, 1) + workflow := response.Workflows[0] + + require.Len(b, workflow.ShardStreams, 4 /*shards*/) + for _, ss := range workflow.ShardStreams { + for _, s := range ss.Streams { + if s.State == "Running" { + delete(pending, tks) + } else { + b.Logf("waiting for workflow %s.%s stream %s=>%s to be running; last state: %s", + workflow.Target, workflow.Name, s.BinlogSource.Shard, s.Shard, s.State) + time.Sleep(1 * time.Second) + } + } + } + } + } + }) + + testCases := []struct { + name string + run func(*testing.B, *rand.Rand) + }{ + { + name: "point select, { sks1 => sks2 }.mirror_tbl1", + run: func(b *testing.B, rnd *rand.Rand) { + for i := 0; i < b.N; i++ { + id := rnd.Int32N(numRows) + _, err := conn.ExecuteFetch(fmt.Sprintf( + "SELECT t1.id FROM %s.mirror_tbl1 AS t1 WHERE t1.id = %d", + sKs1, id, + ), 1, false) + if err != nil { + b.Error(err) + } + } + }, + }, + { + name: "point select, { sks1 => sks2 }.mirror_tbl1, { sks1 => sks3 }.mirror_tbl2", + run: func(b *testing.B, rnd *rand.Rand) { + for i := 0; i < b.N; i++ { + id := rnd.Int32N(numRows) + _, err := conn.ExecuteFetch(fmt.Sprintf( + "SELECT t1.id, t2.id FROM %s.mirror_tbl1 AS t1, %s.mirror_tbl2 AS t2 WHERE t1.id = %d AND t2.id = %d", + sKs1, sKs1, id, id, + ), 1, false) + if err != nil { + b.Error(err) + } + } + }, + }, + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + b.Run("mirror 0%", func(b *testing.B) { + mirrorTraffic(b, targetKeyspaces, 0) + b.ResetTimer() + tc.run(b, rand.New(pcg)) + }) + + b.Run("mirror 1%", func(b *testing.B) { + mirrorTraffic(b, targetKeyspaces, 1) + b.ResetTimer() + tc.run(b, rand.New(pcg)) + }) + + b.Run("mirror 5%", func(b *testing.B) { + mirrorTraffic(b, targetKeyspaces, 5) + b.ResetTimer() + tc.run(b, rand.New(pcg)) + }) + + b.Run("mirror 10%", func(b *testing.B) { + mirrorTraffic(b, targetKeyspaces, 10) + b.ResetTimer() + tc.run(b, rand.New(pcg)) + }) + + b.Run("mirror 25%", func(b *testing.B) { + mirrorTraffic(b, targetKeyspaces, 25) + b.ResetTimer() + tc.run(b, rand.New(pcg)) + }) + + b.Run("mirror 50%", func(b *testing.B) { + mirrorTraffic(b, targetKeyspaces, 50) + b.ResetTimer() + tc.run(b, rand.New(pcg)) + }) + + b.Run("mirror 100%", func(b *testing.B) { + mirrorTraffic(b, targetKeyspaces, 100) + b.ResetTimer() + tc.run(b, rand.New(pcg)) + }) + }) + } +} + +func mirrorTraffic(b *testing.B, targetKeyspaces []string, percent float32) { + for _, tks := range targetKeyspaces { + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "MoveTables", "--target-keyspace", tks, "--workflow", fmt.Sprintf("%s2%s", sKs1, tks), + "mirrortraffic", "--percent", fmt.Sprintf("%.02f", percent)) + require.NoError(b, err, output) + } +} diff --git a/go/test/endtoend/vtgate/queries/benchmark/main_test.go b/go/test/endtoend/vtgate/queries/benchmark/main_test.go index 6978d0b9428..40a215c8007 100644 --- a/go/test/endtoend/vtgate/queries/benchmark/main_test.go +++ b/go/test/endtoend/vtgate/queries/benchmark/main_test.go @@ -20,8 +20,10 @@ import ( "context" _ "embed" "flag" + "fmt" "os" "testing" + "time" "github.com/stretchr/testify/require" @@ -34,36 +36,34 @@ var ( clusterInstance *cluster.LocalProcessCluster vtParams mysql.ConnParams mysqlParams mysql.ConnParams - sKs = "sks" - uKs = "uks" + sKs1 = "sks1" + sKs2 = "sks2" + sKs3 = "sks3" cell = "test" - //go:embed sharded_schema.sql - sSchemaSQL string + //go:embed sharded_schema1.sql + sSchemaSQL1 string - //go:embed vschema.json - sVSchema string -) + //go:embed vschema1.json + sVSchema1 string -var ( - shards4 = []string{ - "-40", "40-80", "80-c0", "c0-", - } + //go:embed sharded_schema2.sql + sSchemaSQL2 string - shards8 = []string{ - "-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-", - } + //go:embed vschema2.json + sVSchema2 string - shards16 = []string{ - "-10", "10-20", "20-30", "30-40", "40-50", "50-60", "60-70", "70-80", "80-90", "90-a0", "a0-b0", "b0-c0", "c0-d0", "d0-e0", "e0-f0", "f0-", - } + //go:embed sharded_schema3.sql + sSchemaSQL3 string - shards32 = []string{ - "-05", "05-10", "10-15", "15-20", "20-25", "25-30", "30-35", "35-40", "40-45", "45-50", "50-55", "55-60", "60-65", "65-70", "70-75", "75-80", - "80-85", "85-90", "90-95", "95-a0", "a0-a5", "a5-b0", "b0-b5", "b5-c0", "c0-c5", "c5-d0", "d0-d5", "d5-e0", "e0-e5", "e5-f0", "f0-f5", "f5-", - } + //go:embed vschema3.json + sVSchema3 string ) +var shards4 = []string{ + "-40", "40-80", "80-c0", "c0-", +} + func TestMain(m *testing.M) { defer cluster.PanicHandler(nil) flag.Parse() @@ -78,14 +78,38 @@ func TestMain(m *testing.M) { return 1 } - // Start sharded keyspace - sKeyspace := &cluster.Keyspace{ - Name: sKs, - SchemaSQL: sSchemaSQL, - VSchema: sVSchema, + // Start sharded keyspace 1 + sKeyspace1 := &cluster.Keyspace{ + Name: sKs1, + SchemaSQL: sSchemaSQL1, + VSchema: sVSchema1, } - err = clusterInstance.StartKeyspace(*sKeyspace, shards4, 0, false) + err = clusterInstance.StartKeyspace(*sKeyspace1, shards4, 0, false) + if err != nil { + return 1 + } + + // Start sharded keyspace 2 + sKeyspace2 := &cluster.Keyspace{ + Name: sKs2, + SchemaSQL: sSchemaSQL2, + VSchema: sVSchema2, + } + + err = clusterInstance.StartKeyspace(*sKeyspace2, shards4, 0, false) + if err != nil { + return 1 + } + + // Start sharded keyspace 3 + sKeyspace3 := &cluster.Keyspace{ + Name: sKs3, + SchemaSQL: sSchemaSQL3, + VSchema: sVSchema3, + } + + err = clusterInstance.StartKeyspace(*sKeyspace3, shards4, 0, false) if err != nil { return 1 } @@ -96,7 +120,7 @@ func TestMain(m *testing.M) { return 1 } - vtParams = clusterInstance.GetVTParams(sKs) + vtParams = clusterInstance.GetVTParams("@primary") return m.Run() }() @@ -108,12 +132,38 @@ func start(b *testing.B) (*mysql.Conn, func()) { require.NoError(b, err) deleteAll := func() { - tables := []string{"tbl_no_lkp_vdx"} + tables := []string{ + fmt.Sprintf("%s.tbl_no_lkp_vdx", sKs1), + fmt.Sprintf("%s.mirror_tbl1", sKs1), + fmt.Sprintf("%s.mirror_tbl2", sKs1), + fmt.Sprintf("%s.mirror_tbl1", sKs2), + fmt.Sprintf("%s.mirror_tbl2", sKs3), + } for _, table := range tables { _, _ = utils.ExecAllowError(b, conn, "delete from "+table) } } + // Make sure all keyspaces are serving. + pending := map[string]string{ + sKs1: "mirror_tbl1", + sKs2: "mirror_tbl1", + sKs3: "mirror_tbl2", + } + for len(pending) > 0 { + for ks, tbl := range pending { + _, err := conn.ExecuteFetch( + fmt.Sprintf("SELECT COUNT(id) FROM %s.%s", ks, tbl), 1, false) + if err != nil { + b.Logf("waiting for keyspace %s to be serving; last error: %v", ks, err) + time.Sleep(1 * time.Second) + } else { + delete(pending, ks) + } + } + } + + // Delete any pre-existing data. deleteAll() return conn, func() { diff --git a/go/test/endtoend/vtgate/queries/benchmark/sharded_schema.sql b/go/test/endtoend/vtgate/queries/benchmark/sharded_schema1.sql similarity index 76% rename from go/test/endtoend/vtgate/queries/benchmark/sharded_schema.sql rename to go/test/endtoend/vtgate/queries/benchmark/sharded_schema1.sql index 92f63c40f0f..d83ffefa7ec 100644 --- a/go/test/endtoend/vtgate/queries/benchmark/sharded_schema.sql +++ b/go/test/endtoend/vtgate/queries/benchmark/sharded_schema1.sql @@ -30,4 +30,16 @@ create table user_extra not_sharding_key bigint, col varchar(50), primary key (id) -); \ No newline at end of file +); + +create table mirror_tbl1 +( + id bigint not null, + primary key(id) +) Engine = InnoDB; + +create table mirror_tbl2 +( + id bigint not null, + primary key(id) +) Engine = InnoDB; diff --git a/go/test/endtoend/vtgate/queries/benchmark/sharded_schema2.sql b/go/test/endtoend/vtgate/queries/benchmark/sharded_schema2.sql new file mode 100644 index 00000000000..4cba88b68ab --- /dev/null +++ b/go/test/endtoend/vtgate/queries/benchmark/sharded_schema2.sql @@ -0,0 +1,5 @@ +create table mirror_tbl1 +( + id bigint not null, + primary key(id) +) Engine = InnoDB; diff --git a/go/test/endtoend/vtgate/queries/benchmark/sharded_schema3.sql b/go/test/endtoend/vtgate/queries/benchmark/sharded_schema3.sql new file mode 100644 index 00000000000..e7e96f30357 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/benchmark/sharded_schema3.sql @@ -0,0 +1,5 @@ +create table mirror_tbl2 +( + id bigint not null, + primary key(id) +) Engine = InnoDB; diff --git a/go/test/endtoend/vtgate/queries/benchmark/vschema.json b/go/test/endtoend/vtgate/queries/benchmark/vschema1.json similarity index 64% rename from go/test/endtoend/vtgate/queries/benchmark/vschema.json rename to go/test/endtoend/vtgate/queries/benchmark/vschema1.json index efc854af8ff..0e5366ba89f 100644 --- a/go/test/endtoend/vtgate/queries/benchmark/vschema.json +++ b/go/test/endtoend/vtgate/queries/benchmark/vschema1.json @@ -29,6 +29,22 @@ "name": "xxhash" } ] + }, + "mirror_tbl1": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "mirror_tbl2": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] } } -} \ No newline at end of file +} diff --git a/go/test/endtoend/vtgate/queries/benchmark/vschema2.json b/go/test/endtoend/vtgate/queries/benchmark/vschema2.json new file mode 100644 index 00000000000..2ae87ad2ae4 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/benchmark/vschema2.json @@ -0,0 +1,18 @@ +{ + "sharded": true, + "vindexes": { + "xxhash": { + "type": "xxhash" + } + }, + "tables": { + "mirror_tbl1": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + } + } +} diff --git a/go/test/endtoend/vtgate/queries/benchmark/vschema3.json b/go/test/endtoend/vtgate/queries/benchmark/vschema3.json new file mode 100644 index 00000000000..6e71d6bc157 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/benchmark/vschema3.json @@ -0,0 +1,18 @@ +{ + "sharded": true, + "vindexes": { + "xxhash": { + "type": "xxhash" + } + }, + "tables": { + "mirror_tbl2": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + } + } +} diff --git a/go/test/vschemawrapper/vschema_wrapper.go b/go/test/vschemawrapper/vschema_wrapper.go index 4d1c424dda8..a1b87f5569c 100644 --- a/go/test/vschemawrapper/vschema_wrapper.go +++ b/go/test/vschemawrapper/vschema_wrapper.go @@ -213,7 +213,6 @@ func (vw *VSchemaWrapper) TargetDestination(qualifier string) (key.Destination, return nil, nil, 0, vterrors.VT05003(keyspaceName) } return vw.Dest, keyspace.Keyspace, vw.TabletType_, nil - } func (vw *VSchemaWrapper) TabletType() topodatapb.TabletType { @@ -317,7 +316,6 @@ func (vw *VSchemaWrapper) TargetString() string { } func (vw *VSchemaWrapper) WarnUnshardedOnly(_ string, _ ...any) { - } func (vw *VSchemaWrapper) ErrorIfShardedF(keyspace *vindexes.Keyspace, _, errFmt string, params ...any) error { @@ -342,3 +340,17 @@ func (vw *VSchemaWrapper) FindRoutedShard(keyspace, shard string) (string, error func (vw *VSchemaWrapper) IsViewsEnabled() bool { return vw.EnableViews } + +// FindMirrorRule finds the mirror rule for the requested keyspace, table +// name, and the tablet type in the VSchema. +func (vs *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) { + destKeyspace, destTabletType, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) + if err != nil { + return nil, err + } + mirrorRule, err := vs.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType) + if err != nil { + return nil, err + } + return mirrorRule, err +} diff --git a/go/vt/schemadiff/semantics.go b/go/vt/schemadiff/semantics.go index cbba8c79497..f12f59ef6ae 100644 --- a/go/vt/schemadiff/semantics.go +++ b/go/vt/schemadiff/semantics.go @@ -79,6 +79,11 @@ func (si *declarativeSchemaInformation) GetForeignKeyChecksState() *bool { return nil } +// FindMirrorRule implements semantics.SchemaInformation. +func (si *declarativeSchemaInformation) FindMirrorRule(tablename sqlparser.TableName) (*vindexes.MirrorRule, error) { + return nil, nil +} + // addTable adds a fake table with an empty column list func (si *declarativeSchemaInformation) addTable(tableName string) { tbl := &vindexes.Table{ diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 246d17f88b5..8fed95f5d51 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -60,6 +60,7 @@ func initialize(ctx context.Context, t *testing.T) (*vtgateconn.VTGateConn, *mys } return gconn, conn, mconn, close } + func TestVStream(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -186,7 +187,7 @@ func TestVStreamCopyBasic(t *testing.T) { Lastpk: qr, }} var shardGtids []*binlogdatapb.ShardGtid - var vgtid = &binlogdatapb.VGtid{} + vgtid := &binlogdatapb.VGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "-80", @@ -264,20 +265,14 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) { defer cancel() conn, err := mysql.Connect(ctx, &vtParams) - if err != nil { - require.NoError(t, err) - } + require.NoError(t, err) defer conn.Close() _, err = conn.ExecuteFetch("insert into t1_copy_all(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) - if err != nil { - require.NoError(t, err) - } + require.NoError(t, err) _, err = conn.ExecuteFetch("insert into t1_copy_all_ks2(id1,id2) values(10,10), (20,20)", 1, false) - if err != nil { - require.NoError(t, err) - } + require.NoError(t, err) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -343,13 +338,11 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) { gconn, conn, mconn, closeConnections := initialize(ctx, t) defer closeConnections() - var vgtid = &binlogdatapb.VGtid{} + vgtid := &binlogdatapb.VGtid{} vgtid.ShardGtids = []*binlogdatapb.ShardGtid{c.shardGtid} reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) _, _ = conn, mconn - if err != nil { - require.NoError(t, err) - } + require.NoError(t, err) require.NotNil(t, reader) var evs []*binlogdatapb.VEvent var completedEvs []*binlogdatapb.VEvent @@ -426,7 +419,7 @@ func TestVStreamCopyResume(t *testing.T) { } var shardGtids []*binlogdatapb.ShardGtid - var vgtid = &binlogdatapb.VGtid{} + vgtid := &binlogdatapb.VGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "-80", @@ -526,7 +519,7 @@ func TestVStreamCurrent(t *testing.T) { defer closeConnections() var shardGtids []*binlogdatapb.ShardGtid - var vgtid = &binlogdatapb.VGtid{} + vgtid := &binlogdatapb.VGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "-80", @@ -580,7 +573,7 @@ func TestVStreamSharded(t *testing.T) { defer closeConnections() var shardGtids []*binlogdatapb.ShardGtid - var vgtid = &binlogdatapb.VGtid{} + vgtid := &binlogdatapb.VGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "-80", @@ -665,7 +658,6 @@ func TestVStreamSharded(t *testing.T) { t.Fatalf("remote error: %v\n", err) } } - } // TestVStreamCopyTransactions tests that we are properly wrapping @@ -822,9 +814,11 @@ type VEventSorter []*binlogdatapb.VEvent func (v VEventSorter) Len() int { return len(v) } + func (v VEventSorter) Swap(i, j int) { v[i], v[j] = v[j], v[i] } + func (v VEventSorter) Less(i, j int) bool { valsI := v[i].GetRowEvent().RowChanges[0].After if valsI == nil { diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 06aa9f0d6a9..4c0d1009bd1 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1597,6 +1597,24 @@ func (cached *VitessMetadata) CachedSize(alloc bool) int64 { size += hack.RuntimeAllocSize(int64(len(cached.Value))) return size } +func (cached *percentBasedMirror) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(48) + } + // field primitive vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.primitive.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field target vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.target.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} //go:nocheckptr func (cached *shardRoute) CachedSize(alloc bool) int64 { diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 5458a384490..498c26db877 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -46,11 +46,15 @@ import ( vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) -var testMaxMemoryRows = 100 -var testIgnoreMaxMemoryRows = false +var ( + testMaxMemoryRows = 100 + testIgnoreMaxMemoryRows = false +) -var _ VCursor = (*noopVCursor)(nil) -var _ SessionActions = (*noopVCursor)(nil) +var ( + _ VCursor = (*noopVCursor)(nil) + _ SessionActions = (*noopVCursor)(nil) +) // noopVCursor is used to build other vcursors. type noopVCursor struct { @@ -112,6 +116,10 @@ func (t *noopVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { panic("implement me") } +func (t *noopVCursor) CloneForMirroring(ctx context.Context) VCursor { + panic("implement me") +} + func (t *noopVCursor) ReadTransaction(ctx context.Context, transactionID string) (*querypb.TransactionMetadata, error) { panic("implement me") } @@ -388,8 +396,10 @@ func (t *noopVCursor) GetDBDDLPluginName() string { panic("unimplemented") } -var _ VCursor = (*loggingVCursor)(nil) -var _ SessionActions = (*loggingVCursor)(nil) +var ( + _ VCursor = (*loggingVCursor)(nil) + _ SessionActions = (*loggingVCursor)(nil) +) // loggingVCursor logs requests and allows you to verify // that the correct requests were made. @@ -430,6 +440,10 @@ type loggingVCursor struct { shardSession []*srvtopo.ResolvedShard parser *sqlparser.Parser + + handleMirrorClonesFn func(context.Context) VCursor + onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool) + onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error) } func (f *loggingVCursor) HasCreatedTempTable() { @@ -545,6 +559,13 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { return f } +func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor { + if f.handleMirrorClonesFn != nil { + return f.handleMirrorClonesFn(ctx) + } + panic("no mirror clones available") +} + func (f *loggingVCursor) Execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { name := "Unknown" switch co { @@ -562,7 +583,12 @@ func (f *loggingVCursor) Execute(ctx context.Context, method string, query strin } func (f *loggingVCursor) ExecuteMultiShard(ctx context.Context, primitive Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) { + f.mu.Lock() + defer f.mu.Unlock() f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), rollbackOnError, canAutocommit)) + if f.onExecuteMultiShardFn != nil { + f.onExecuteMultiShardFn(ctx, primitive, rss, queries, rollbackOnError, canAutocommit) + } res, err := f.nextResult() if err != nil { return nil, []error{err} @@ -583,6 +609,9 @@ func (f *loggingVCursor) ExecuteStandalone(ctx context.Context, primitive Primit func (f *loggingVCursor) StreamExecuteMulti(ctx context.Context, primitive Primitive, query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, rollbackOnError bool, autocommit bool, callback func(reply *sqltypes.Result) error) []error { f.mu.Lock() f.log = append(f.log, fmt.Sprintf("StreamExecuteMulti %s %s", query, printResolvedShardsBindVars(rss, bindVars))) + if f.onStreamExecuteMultiFn != nil { + f.onStreamExecuteMultiFn(ctx, primitive, query, rss, bindVars, rollbackOnError, autocommit, callback) + } r, err := f.nextResult() f.mu.Unlock() if err != nil { @@ -734,6 +763,8 @@ func (f *loggingVCursor) ResolveDestinationsMultiCol(ctx context.Context, keyspa func (f *loggingVCursor) ExpectLog(t *testing.T, want []string) { t.Helper() + f.mu.Lock() + defer f.mu.Unlock() if len(f.log) == 0 && len(want) == 0 { return } @@ -751,6 +782,8 @@ func (f *loggingVCursor) ExpectWarnings(t *testing.T, want []*querypb.QueryWarni } func (f *loggingVCursor) Rewind() { + f.mu.Lock() + defer f.mu.Unlock() f.curShardForKsid = 0 f.curResult = 0 f.log = nil @@ -854,6 +887,7 @@ func (t *noopVCursor) DisableLogging() {} func (t *noopVCursor) GetVExplainLogs() []ExecuteEntry { return nil } + func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) { return nil, nil } diff --git a/go/vt/vtgate/engine/mirror.go b/go/vt/vtgate/engine/mirror.go new file mode 100644 index 00000000000..bfab4cee91d --- /dev/null +++ b/go/vt/vtgate/engine/mirror.go @@ -0,0 +1,152 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "math/rand/v2" + "time" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +type ( + // percentBasedMirror represents the instructions to execute an + // authoritative primitive and, based on whether a die-roll exceeds a + // percentage, to also execute a target Primitive. + percentBasedMirror struct { + percent float32 + primitive Primitive + target Primitive + } +) + +const ( + // maxMirrorTargetLag limits how long a mirror target may continue + // executing after the main primitive has finished. + maxMirrorTargetLag = 100 * time.Millisecond +) + +var _ Primitive = (*percentBasedMirror)(nil) + +// NewPercentBasedMirror creates a Mirror. +func NewPercentBasedMirror(percentage float32, primitive Primitive, target Primitive) Primitive { + return &percentBasedMirror{percentage, primitive, target} +} + +func (m *percentBasedMirror) RouteType() string { + return "Mirror" +} + +func (m *percentBasedMirror) GetKeyspaceName() string { + return m.primitive.GetKeyspaceName() +} + +func (m *percentBasedMirror) GetTableName() string { + return m.primitive.GetTableName() +} + +func (m *percentBasedMirror) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return m.primitive.GetFields(ctx, vcursor, bindVars) +} + +func (m *percentBasedMirror) NeedsTransaction() bool { + return m.primitive.NeedsTransaction() +} + +func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + if !m.percentAtLeastDieRoll() { + return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) + } + + mirrorCh := make(chan any) + mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag) + defer mirrorCtxCancel() + + go func() { + defer close(mirrorCh) + mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) + // TODO(maxeng) handle error. + _, _ = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) + }() + + r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) + + select { + case <-mirrorCh: + // Mirroring completed within the allowed time. + case <-mirrorCtx.Done(): + // Mirroring took too long and was canceled. + } + + return r, err +} + +func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + if !m.percentAtLeastDieRoll() { + return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) + } + + mirrorCh := make(chan any) + mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag) + defer mirrorCtxCancel() + + go func() { + defer close(mirrorCh) + mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) + // TODO(maxeng) handle error. + _ = mirrorVCursor.StreamExecutePrimitive( + mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result, + ) error { + return nil + }) + }() + + err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) + + select { + case <-mirrorCh: + // Mirroring completed within the allowed time. + case <-mirrorCtx.Done(): + // Mirroring took too long and was canceled. + } + + return err +} + +// Inputs is a slice containing the inputs to this Primitive. +// The returned map has additional information about the inputs, that is used in the description. +func (m *percentBasedMirror) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{m.primitive, m.target}, nil +} + +// description is the description, sans the inputs, of this Primitive. +// to get the plan description with all children, use PrimitiveToPlanDescription() +func (m *percentBasedMirror) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "Mirror", + Variant: "PercentBased", + Other: map[string]any{ + "Percent": m.percent, + }, + } +} + +func (m *percentBasedMirror) percentAtLeastDieRoll() bool { + return m.percent >= (rand.Float32() * 100.0) +} diff --git a/go/vt/vtgate/engine/mirror_test.go b/go/vt/vtgate/engine/mirror_test.go new file mode 100644 index 00000000000..b9e442df32d --- /dev/null +++ b/go/vt/vtgate/engine/mirror_test.go @@ -0,0 +1,376 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +func TestMirror(t *testing.T) { + vindex, _ := vindexes.CreateVindex("xxhash", "xxhash_vdx", nil) + + primitive := NewRoute( + Unsharded, + &vindexes.Keyspace{ + Name: "ks1", + }, + "select f.bar from foo f where f.id = 1", + "select 1 from foo f where f.id = 1 and 1 != 1", + ) + + mirrorPrimitive1 := NewRoute( + EqualUnique, + &vindexes.Keyspace{ + Name: "ks2", + Sharded: true, + }, + "select f.bar from foo f where f.id = 1", + "select 1 from foo f where f.id = 1 and 1 != 1", + ) + mirrorPrimitive1.Vindex = vindex.(vindexes.SingleColumn) + mirrorPrimitive1.Values = []evalengine.Expr{ + evalengine.NewLiteralInt(1), + } + + mirror := NewPercentBasedMirror(100, primitive, mirrorPrimitive1) + + mirrorVC := &loggingVCursor{ + shards: []string{"-20", "20-"}, + ksShardMap: map[string][]string{ + "ks2": {"-20", "20-"}, + }, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "bar", + "varchar", + ), + "hello", + ), + }, + } + + vc := &loggingVCursor{ + shards: []string{"0"}, + ksShardMap: map[string][]string{ + "ks1": {"0"}, + }, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "bar", + "varchar", + ), + "hello", + ), + }, + handleMirrorClonesFn: func(ctx context.Context) VCursor { + return mirrorVC + }, + } + + t.Run("TryExecute success", func(t *testing.T) { + defer func() { + vc.Rewind() + mirrorVC.Rewind() + }() + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, want, res) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryExecute return primitive error", func(t *testing.T) { + results := vc.results + + defer func() { + vc.Rewind() + vc.results = results + vc.resultErr = nil + mirrorVC.Rewind() + }() + + vc.results = nil + vc.resultErr = fmt.Errorf("return me") + + ctx := context.Background() + res, err := mirror.TryExecute(ctx, vc, map[string]*querypb.BindVariable{}, true) + require.Nil(t, res) + require.Error(t, err) + require.Equal(t, vc.resultErr, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryExecute ignore mirror target error", func(t *testing.T) { + results := mirrorVC.results + + defer func() { + vc.Rewind() + mirrorVC.Rewind() + mirrorVC.results = results + mirrorVC.resultErr = nil + }() + + mirrorVC.results = nil + mirrorVC.resultErr = fmt.Errorf("ignore me") + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, res, want) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryExecute slow mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onExecuteMultiShardFn = nil + mirrorVC.Rewind() + mirrorVC.onExecuteMultiShardFn = nil + }() + + primitiveLatency := maxMirrorTargetLag * 2 + vc.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + mirrorVC.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + defer wg.Done() + time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + select { + case <-ctx.Done(): + default: + require.Fail(t, "mirror target context not done") + } + } + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, res, want) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryStreamExecute success", func(t *testing.T) { + defer func() { + vc.Rewind() + mirrorVC.Rewind() + }() + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) + + t.Run("TryStreamExecute return primitive error", func(t *testing.T) { + results := vc.results + + defer func() { + vc.Rewind() + vc.results = results + vc.resultErr = nil + mirrorVC.Rewind() + }() + + vc.results = nil + vc.resultErr = fmt.Errorf("return me") + + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Nil(t, result) + return nil + }, + ) + require.Error(t, err) + require.Equal(t, vc.resultErr, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) + + t.Run("TryStreamExecute ignore mirror target error", func(t *testing.T) { + results := mirrorVC.results + + defer func() { + vc.Rewind() + mirrorVC.Rewind() + mirrorVC.results = results + mirrorVC.resultErr = nil + }() + + mirrorVC.results = nil + mirrorVC.resultErr = fmt.Errorf("ignore me") + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) + + t.Run("TryStreamExecute slow mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onStreamExecuteMultiFn = nil + mirrorVC.Rewind() + mirrorVC.onStreamExecuteMultiFn = nil + }() + + primitiveLatency := maxMirrorTargetLag * 2 + vc.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + mirrorVC.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + defer wg.Done() + time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + select { + case <-ctx.Done(): + default: + require.Fail(t, "mirror target context not done") + } + } + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) +} diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index e2cd4a5aca3..30894b99ab8 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -132,6 +132,9 @@ type ( // CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas CloneForReplicaWarming(ctx context.Context) VCursor + // CloneForMirroring clones the VCursor for re-use in mirroring queries to other keyspaces + CloneForMirroring(ctx context.Context) VCursor + // // ReadTransaction reads the state of the given transaction from the metadata manager ReadTransaction(ctx context.Context, transactionID string) (*querypb.TransactionMetadata, error) diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 51158818ec5..2e95e438c22 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -121,11 +121,13 @@ func TestSelectDBA(t *testing.T) { query, map[string]*querypb.BindVariable{}, ) require.NoError(t, err) - wantQueries = []*querypb.BoundQuery{{Sql: "select count(*) from INFORMATION_SCHEMA.`TABLES` as ist where ist.table_schema = :__vtschemaname /* VARCHAR */ and ist.table_name = :ist_table_name /* VARCHAR */", + wantQueries = []*querypb.BoundQuery{{ + Sql: "select count(*) from INFORMATION_SCHEMA.`TABLES` as ist where ist.table_schema = :__vtschemaname /* VARCHAR */ and ist.table_name = :ist_table_name /* VARCHAR */", BindVariables: map[string]*querypb.BindVariable{ "__vtschemaname": sqltypes.StringBindVariable("performance_schema"), "ist_table_name": sqltypes.StringBindVariable("foo"), - }}} + }, + }} utils.MustMatch(t, wantQueries, sbc1.Queries) sbc1.Queries = nil @@ -135,11 +137,13 @@ func TestSelectDBA(t *testing.T) { query, map[string]*querypb.BindVariable{}, ) require.NoError(t, err) - wantQueries = []*querypb.BoundQuery{{Sql: "select 1 from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */ and table_name = :table_name /* VARCHAR */", + wantQueries = []*querypb.BoundQuery{{ + Sql: "select 1 from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */ and table_name = :table_name /* VARCHAR */", BindVariables: map[string]*querypb.BindVariable{ "__vtschemaname": sqltypes.StringBindVariable("vt_ks"), "table_name": sqltypes.StringBindVariable("user"), - }}} + }, + }} utils.MustMatch(t, wantQueries, sbc1.Queries) sbc1.Queries = nil @@ -149,10 +153,12 @@ func TestSelectDBA(t *testing.T) { query, map[string]*querypb.BindVariable{}, ) require.NoError(t, err) - wantQueries = []*querypb.BoundQuery{{Sql: "select 1 from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */", + wantQueries = []*querypb.BoundQuery{{ + Sql: "select 1 from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */", BindVariables: map[string]*querypb.BindVariable{ "__vtschemaname": sqltypes.StringBindVariable("vt_ks"), - }}} + }, + }} utils.MustMatch(t, wantQueries, sbc1.Queries) } @@ -481,13 +487,15 @@ func TestGen4SelectDBA(t *testing.T) { query, map[string]*querypb.BindVariable{}, ) require.NoError(t, err) - wantQueries = []*querypb.BoundQuery{{Sql: "select count(*) from INFORMATION_SCHEMA.`TABLES` as ist where ist.table_schema = :__vtschemaname /* VARCHAR */ and ist.table_name = :ist_table_name1 /* VARCHAR */", + wantQueries = []*querypb.BoundQuery{{ + Sql: "select count(*) from INFORMATION_SCHEMA.`TABLES` as ist where ist.table_schema = :__vtschemaname /* VARCHAR */ and ist.table_name = :ist_table_name1 /* VARCHAR */", BindVariables: map[string]*querypb.BindVariable{ "ist_table_schema": sqltypes.StringBindVariable("performance_schema"), "__vtschemaname": sqltypes.StringBindVariable("performance_schema"), "ist_table_name": sqltypes.StringBindVariable("foo"), "ist_table_name1": sqltypes.StringBindVariable("foo"), - }}} + }, + }} utils.MustMatch(t, wantQueries, sbc1.Queries) sbc1.Queries = nil @@ -497,26 +505,30 @@ func TestGen4SelectDBA(t *testing.T) { query, map[string]*querypb.BindVariable{}, ) require.NoError(t, err) - wantQueries = []*querypb.BoundQuery{{Sql: "select :vtg1 /* INT64 */ from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */ and table_name = :table_name1 /* VARCHAR */", + wantQueries = []*querypb.BoundQuery{{ + Sql: "select :vtg1 /* INT64 */ from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */ and table_name = :table_name1 /* VARCHAR */", BindVariables: map[string]*querypb.BindVariable{ "vtg1": sqltypes.Int64BindVariable(1), "constraint_schema": sqltypes.StringBindVariable("vt_ks"), "table_name": sqltypes.StringBindVariable("user"), "__vtschemaname": sqltypes.StringBindVariable("vt_ks"), "table_name1": sqltypes.StringBindVariable("user"), - }}} + }, + }} utils.MustMatch(t, wantQueries, sbc1.Queries) sbc1.Queries = nil query = "select 1 from information_schema.table_constraints where constraint_schema = 'vt_ks'" _, err = executor.Execute(context.Background(), nil, "TestSelectDBA", NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}), query, map[string]*querypb.BindVariable{}) require.NoError(t, err) - wantQueries = []*querypb.BoundQuery{{Sql: "select :vtg1 /* INT64 */ from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */", + wantQueries = []*querypb.BoundQuery{{ + Sql: "select :vtg1 /* INT64 */ from information_schema.table_constraints where constraint_schema = :__vtschemaname /* VARCHAR */", BindVariables: map[string]*querypb.BindVariable{ "vtg1": sqltypes.Int64BindVariable(1), "constraint_schema": sqltypes.StringBindVariable("vt_ks"), "__vtschemaname": sqltypes.StringBindVariable("vt_ks"), - }}} + }, + }} utils.MustMatch(t, wantQueries, sbc1.Queries) sbc1.Queries = nil @@ -526,11 +538,13 @@ func TestGen4SelectDBA(t *testing.T) { query, map[string]*querypb.BindVariable{}, ) require.NoError(t, err) - wantQueries = []*querypb.BoundQuery{{Sql: "select t.table_schema, t.table_name, c.column_name, c.column_type from information_schema.`tables` as t, information_schema.`columns` as c where t.table_schema = :__vtschemaname /* VARCHAR */ and c.table_schema = :__vtschemaname /* VARCHAR */ and c.table_schema = t.table_schema and c.table_name = t.table_name order by t.table_schema asc, t.table_name asc, c.column_name asc", + wantQueries = []*querypb.BoundQuery{{ + Sql: "select t.table_schema, t.table_name, c.column_name, c.column_type from information_schema.`tables` as t, information_schema.`columns` as c where t.table_schema = :__vtschemaname /* VARCHAR */ and c.table_schema = :__vtschemaname /* VARCHAR */ and c.table_schema = t.table_schema and c.table_name = t.table_name order by t.table_schema asc, t.table_name asc, c.column_name asc", BindVariables: map[string]*querypb.BindVariable{ "t_table_schema": sqltypes.StringBindVariable("TestExecutor"), "__replacevtschemaname": sqltypes.Int64BindVariable(1), - }}} + }, + }} utils.MustMatch(t, wantQueries, sbc1.Queries) } @@ -1088,7 +1102,6 @@ func TestSelectDatabase(t *testing.T) { } require.NoError(t, err) utils.MustMatch(t, wantResult, result, "Mismatch") - } func TestSelectBindvars(t *testing.T) { @@ -3205,7 +3218,6 @@ func TestLockReserve(t *testing.T) { _, err := exec(executor, session, "select get_lock('lock name', 10) from dual") require.NoError(t, err) require.NotNil(t, session.LockSession) - } func TestSelectFromInformationSchema(t *testing.T) { @@ -4395,3 +4407,60 @@ func TestSysVarGlobalAndSession(t *testing.T) { require.NoError(t, err) require.Equal(t, `[[UINT64(20)]]`, fmt.Sprintf("%v", qr.Rows)) } + +func BenchmarkSelectMirror(b *testing.B) { + ctx := context.Background() + cell := "aa" + sql := fmt.Sprintf("select id from %s.user where id = 1", KsTestUnsharded) + + currentSandboxMirrorRules := sandboxMirrorRules + b.Cleanup(func() { + setSandboxMirrorRules(currentSandboxMirrorRules) + }) + + // Don't use createExecutorEnv. Doesn't work with benchmarks because of + // utils.EnsureNoLeak. + createBenchmarkExecutor := func(b *testing.B) (context.Context, *Executor) { + ctx, cancel := context.WithCancel(ctx) + b.Cleanup(cancel) + hc := discovery.NewFakeHealthCheck(nil) + u := createSandbox(KsTestUnsharded) + s := createSandbox(KsTestSharded) + s.VSchema = executorVSchema + u.VSchema = unshardedVSchema + serv := newSandboxForCells(ctx, []string{cell}) + resolver := newTestResolver(ctx, hc, serv, cell) + shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"} + for _, shard := range shards { + hc.AddTestTablet(cell, shard, 1, KsTestSharded, shard, topodatapb.TabletType_PRIMARY, true, 1, nil) + } + hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) + return ctx, createExecutor(ctx, serv, cell, resolver) + } + + for _, percent := range []float32{0, 1, 5, 10, 25, 50, 100} { + b.Run(fmt.Sprintf("mirror %.2f%%", percent), func(b *testing.B) { + setSandboxMirrorRules(fmt.Sprintf(`{ + "rules": [ + { + "from_table": "%s.user", + "to_table": "%s.user", + "percent": %.2f + } + ] + }`, KsTestUnsharded, KsTestSharded, percent)) + + ctx, executor := createBenchmarkExecutor(b) + session := &vtgatepb.Session{ + TargetString: "@primary", + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + executorExec(ctx, executor, session, sql, nil) + } + b.StopTimer() + }) + } +} diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index b4aaf6fc64d..458c8c5e1c3 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -79,11 +79,29 @@ func transformToPrimitive(ctx *plancontext.PlanningContext, op operators.Operato return transformDMLWithInput(ctx, op) case *operators.RecurseCTE: return transformRecurseCTE(ctx, op) + case *operators.PercentBasedMirror: + return transformPercentBasedMirror(ctx, op) } return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToPrimitive)", op)) } +func transformPercentBasedMirror(ctx *plancontext.PlanningContext, op *operators.PercentBasedMirror) (engine.Primitive, error) { + primitive, err := transformToPrimitive(ctx, op.Operator) + if err != nil { + return nil, err + } + + target, err := transformToPrimitive(ctx.UseMirror(), op.Target) + // Mirroring is best-effort. If we encounter an error while building the + // mirror target primitive, proceed without mirroring. + if err != nil { + return primitive, nil + } + + return engine.NewPercentBasedMirror(op.Percent, primitive, target), nil +} + func transformDMLWithInput(ctx *plancontext.PlanningContext, op *operators.DMLWithInput) (engine.Primitive, error) { input, err := transformToPrimitive(ctx, op.Source) if err != nil { @@ -290,7 +308,6 @@ func transformFkVerify(ctx *plancontext.PlanningContext, fkv *operators.FkVerify Verify: verify, Exec: inputLP, }, nil - } func transformAggregator(ctx *plancontext.PlanningContext, op *operators.Aggregator) (engine.Primitive, error) { @@ -453,7 +470,6 @@ func getEvalEngineExpr(ctx *plancontext.PlanningContext, pe *operators.ProjExpr) default: return nil, vterrors.VT13001("project not planned for: %s", pe.String()) } - } // newSimpleProjection creates a simple projections diff --git a/go/vt/vtgate/planbuilder/operators/ast_to_op.go b/go/vt/vtgate/planbuilder/operators/ast_to_op.go index 4c075f480d3..86d1c6197d4 100644 --- a/go/vt/vtgate/planbuilder/operators/ast_to_op.go +++ b/go/vt/vtgate/planbuilder/operators/ast_to_op.go @@ -26,8 +26,10 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" ) -const foreignKeyConstraintValues = "fkc_vals" -const foreignKeyUpdateExpr = "fkc_upd" +const ( + foreignKeyConstraintValues = "fkc_vals" + foreignKeyUpdateExpr = "fkc_upd" +) // translateQueryToOp creates an operator tree that represents the input SELECT or UNION query func translateQueryToOp(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) Operator { @@ -47,6 +49,19 @@ func translateQueryToOp(ctx *plancontext.PlanningContext, selStmt sqlparser.Stat } } +func translateQueryToOpWithMirroring(ctx *plancontext.PlanningContext, stmt sqlparser.Statement) Operator { + op := translateQueryToOp(ctx, stmt) + + if selStmt, ok := stmt.(sqlparser.SelectStatement); ok { + if mi := ctx.SemTable.GetMirrorInfo(); mi.Percent > 0 { + mirrorOp := translateQueryToOp(ctx.UseMirror(), selStmt) + op = NewPercentBasedMirror(mi.Percent, op, mirrorOp) + } + } + + return op +} + func createOperatorFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Select) Operator { op := crossJoin(ctx, sel.From) @@ -275,6 +290,20 @@ func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr qg := newQueryGraph() isInfSchema := tableInfo.IsInfSchema() + if ctx.IsMirrored() { + if mr := tableInfo.GetMirrorRule(); mr != nil { + newTbl := sqlparser.Clone(tbl) + newTbl.Qualifier = sqlparser.NewIdentifierCS(mr.Table.Keyspace.Name) + newTbl.Name = mr.Table.Name + if newTbl.Name.String() != tbl.Name.String() { + tableExpr = sqlparser.Clone(tableExpr) + tableExpr.As = tbl.Name + } + tbl = newTbl + } else { + panic(vterrors.VT13001(fmt.Sprintf("unable to find mirror rule for table: %T", tbl))) + } + } qt := &QueryTable{Alias: tableExpr, Table: tbl, ID: tableID, IsInfSchema: isInfSchema} qg.Tables = append(qg.Tables, qt) return qg diff --git a/go/vt/vtgate/planbuilder/operators/horizon.go b/go/vt/vtgate/planbuilder/operators/horizon.go index 70186c062b9..3fb72df91b4 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon.go +++ b/go/vt/vtgate/planbuilder/operators/horizon.go @@ -18,6 +18,7 @@ package operators import ( "errors" + "fmt" "slices" "vitess.io/vitess/go/vt/sqlparser" @@ -207,7 +208,7 @@ func (h *Horizon) getQP(ctx *plancontext.PlanningContext) *QueryProjection { } func (h *Horizon) ShortDescription() string { - return h.Alias + return fmt.Sprintf("Horizon (Alias: %s)", h.Alias) } func (h *Horizon) introducesTableID() semantics.TableSet { diff --git a/go/vt/vtgate/planbuilder/operators/mirror.go b/go/vt/vtgate/planbuilder/operators/mirror.go new file mode 100644 index 00000000000..3ab1d66e70d --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/mirror.go @@ -0,0 +1,104 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "fmt" + + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" +) + +type ( + PercentBasedMirror struct { + Percent float32 + Operator Operator + Target Operator + } +) + +var _ Operator = (*PercentBasedMirror)(nil) + +func NewPercentBasedMirror(percent float32, operator, target Operator) *PercentBasedMirror { + return &PercentBasedMirror{ + percent, + operator, + target, + } +} + +// Clone will return a copy of this operator, protected so changed to the original will not impact the clone +func (m *PercentBasedMirror) Clone(inputs []Operator) Operator { + cloneMirror := *m + cloneMirror.SetInputs(inputs) + return &cloneMirror +} + +// Inputs returns the inputs for this operator +func (m *PercentBasedMirror) Inputs() []Operator { + return []Operator{ + m.Operator, + m.Target, + } +} + +// SetInputs changes the inputs for this op +func (m *PercentBasedMirror) SetInputs(inputs []Operator) { + if len(inputs) < 2 { + panic(vterrors.VT13001("unexpected number of inputs for PercentBasedMirror operator")) + } + m.Operator = inputs[0] + m.Target = inputs[1] +} + +// AddPredicate is used to push predicates. It pushed it as far down as is possible in the tree. +// If we encounter a join and the predicate depends on both sides of the join, the predicate will be split into two parts, +// where data is fetched from the LHS of the join to be used in the evaluation on the RHS +// TODO: we should remove this and replace it with rewriters +func (m *PercentBasedMirror) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator { + panic(vterrors.VT13001("not supported")) +} + +func (m *PercentBasedMirror) AddColumn(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy bool, expr *sqlparser.AliasedExpr) int { + panic(vterrors.VT13001("not supported")) +} + +func (m *PercentBasedMirror) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int { + return m.Operator.FindCol(ctx, expr, underRoute) +} + +func (m *PercentBasedMirror) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr { + return m.Operator.GetColumns(ctx) +} + +func (m *PercentBasedMirror) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs { + return m.Operator.GetSelectExprs(ctx) +} + +func (m *PercentBasedMirror) ShortDescription() string { + return fmt.Sprintf("PercentBasedMirror (%.02f%%)", m.Percent) +} + +func (m *PercentBasedMirror) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { + return m.Operator.GetOrdering(ctx) +} + +// AddWSColumn implements Operator. +func (m *PercentBasedMirror) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int { + panic(vterrors.VT13001("not supported")) +} diff --git a/go/vt/vtgate/planbuilder/operators/plan_query.go b/go/vt/vtgate/planbuilder/operators/plan_query.go index 4d371942c26..40c27d03126 100644 --- a/go/vt/vtgate/planbuilder/operators/plan_query.go +++ b/go/vt/vtgate/planbuilder/operators/plan_query.go @@ -60,7 +60,7 @@ type ( func PlanQuery(ctx *plancontext.PlanningContext, stmt sqlparser.Statement) (result Operator, err error) { defer PanicHandler(&err) - op := translateQueryToOp(ctx, stmt) + op := translateQueryToOpWithMirroring(ctx, stmt) if DebugOperatorTree { fmt.Println("Initial tree:") diff --git a/go/vt/vtgate/planbuilder/operators/query_planning.go b/go/vt/vtgate/planbuilder/operators/query_planning.go index 0f2445a22e7..f930a7f4f76 100644 --- a/go/vt/vtgate/planbuilder/operators/query_planning.go +++ b/go/vt/vtgate/planbuilder/operators/query_planning.go @@ -110,6 +110,13 @@ func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator { } } + if pbm, ok := root.(*PercentBasedMirror); ok { + pbm.SetInputs([]Operator{ + runRewriters(ctx, pbm.Operator), + runRewriters(ctx.UseMirror(), pbm.Target), + }) + } + return FixedPointBottomUp(root, TableID, visitor, stopAtRoute) } @@ -736,7 +743,6 @@ func pushFilterUnderProjection(ctx *plancontext.PlanningContext, filter *Filter, } } return Swap(filter, projection, "push filter under projection") - } func tryPushDistinct(in *Distinct) (Operator, *ApplyResult) { diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index b5dbed0ceb9..79536483970 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -591,6 +591,32 @@ func (s *planTestSuite) TestOtherPlanningFromFile() { s.testFile("other_admin_cases.json", vschema, false) } +func (s *planTestSuite) TestMirrorPlanning() { + vschema := &vschemawrapper.VSchemaWrapper{ + V: loadSchema(s.T(), "vschemas/mirror_schema.json", true), + TabletType_: topodatapb.TabletType_PRIMARY, + SysVarEnabled: true, + TestBuilder: TestBuilder, + Env: vtenv.NewTestEnv(), + } + + s.testFile("mirror_cases.json", vschema, false) +} + +func (s *planTestSuite) TestOneMirror() { + reset := operators.EnableDebugPrinting() + defer reset() + vschema := &vschemawrapper.VSchemaWrapper{ + V: loadSchema(s.T(), "vschemas/mirror_schema.json", true), + TabletType_: topodatapb.TabletType_PRIMARY, + SysVarEnabled: true, + TestBuilder: TestBuilder, + Env: vtenv.NewTestEnv(), + } + + s.testFile("onecase.json", vschema, false) +} + func loadSchema(t testing.TB, filename string, setCollation bool) *vindexes.VSchema { formal, err := vindexes.LoadFormal(locateFile(filename)) require.NoError(t, err) @@ -839,6 +865,35 @@ func BenchmarkSelectVsDML(b *testing.B) { }) } +func BenchmarkBaselineVsMirrored(b *testing.B) { + baseline := loadSchema(b, "vschemas/mirror_schema.json", true) + baseline.MirrorRules = map[string]*vindexes.MirrorRule{} + baselineVschema := &vschemawrapper.VSchemaWrapper{ + V: baseline, + SysVarEnabled: true, + Version: Gen4, + Env: vtenv.NewTestEnv(), + } + + mirroredSchema := loadSchema(b, "vschemas/mirror_schema.json", true) + mirroredVschema := &vschemawrapper.VSchemaWrapper{ + V: mirroredSchema, + SysVarEnabled: true, + Version: Gen4, + Env: vtenv.NewTestEnv(), + } + + cases := readJSONTests("mirror_cases.json") + + b.Run("Baseline", func(b *testing.B) { + benchmarkPlanner(b, Gen4, cases, baselineVschema) + }) + + b.Run("Mirrored", func(b *testing.B) { + benchmarkPlanner(b, Gen4, cases, mirroredVschema) + }) +} + func benchmarkPlanner(b *testing.B, version plancontext.PlannerVersion, testCases []planTest, vschema *vschemawrapper.VSchemaWrapper) { b.ReportAllocs() for n := 0; n < b.N; n++ { diff --git a/go/vt/vtgate/planbuilder/plancontext/planning_context.go b/go/vt/vtgate/planbuilder/plancontext/planning_context.go index 00ac889c082..66be6a4c71d 100644 --- a/go/vt/vtgate/planbuilder/plancontext/planning_context.go +++ b/go/vt/vtgate/planbuilder/plancontext/planning_context.go @@ -70,6 +70,12 @@ type PlanningContext struct { // This is a stack of CTEs being built. It's used when we have CTEs inside CTEs, // to remember which is the CTE currently being assembled CurrentCTE []*ContextCTE + + // mirror contains a mirrored clone of this planning context. + mirror *PlanningContext + + // isMirrored indicates that mirrored tables should be used. + isMirrored bool } // CreatePlanningContext initializes a new PlanningContext with the given parameters. @@ -381,6 +387,10 @@ func (ctx *PlanningContext) ContainsAggr(e sqlparser.SQLNode) (hasAggr bool) { return } +func (ctx *PlanningContext) IsMirrored() bool { + return ctx.isMirrored +} + type ContextCTE struct { *semantics.CTE Id semantics.TableSet @@ -420,3 +430,30 @@ func (ctx *PlanningContext) ActiveCTE() *ContextCTE { } return ctx.CurrentCTE[len(ctx.CurrentCTE)-1] } + +func (ctx *PlanningContext) UseMirror() *PlanningContext { + if ctx.isMirrored { + panic(vterrors.VT13001("cannot mirror already mirrored planning context")) + } + if ctx.mirror != nil { + return ctx.mirror + } + ctx.mirror = &PlanningContext{ + ctx.ReservedVars, + ctx.SemTable, + ctx.VSchema, + map[sqlparser.Expr][]sqlparser.Expr{}, + map[sqlparser.Expr]any{}, + ctx.PlannerVersion, + map[sqlparser.Expr]string{}, + ctx.VerifyAllFKs, + ctx.MergedSubqueries, + ctx.CurrentPhase, + ctx.Statement, + ctx.OuterTables, + ctx.CurrentCTE, + nil, + true, + } + return ctx.mirror +} diff --git a/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go b/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go index 3ab58cba724..d7315f376b6 100644 --- a/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go +++ b/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go @@ -365,4 +365,9 @@ func (v *vschema) GetAggregateUDFs() []string { panic("implement me") } +// FindMirrorRule implements VSchema. +func (v *vschema) FindMirrorRule(tablename sqlparser.TableName) (*vindexes.MirrorRule, error) { + panic("unimplemented") +} + var _ VSchema = (*vschema)(nil) diff --git a/go/vt/vtgate/planbuilder/plancontext/vschema.go b/go/vt/vtgate/planbuilder/plancontext/vschema.go index 8ac4c57bfd7..6e92ad0d83b 100644 --- a/go/vt/vtgate/planbuilder/plancontext/vschema.go +++ b/go/vt/vtgate/planbuilder/plancontext/vschema.go @@ -96,6 +96,10 @@ type VSchema interface { // GetAggregateUDFs returns the list of aggregate UDFs. GetAggregateUDFs() []string + + // FindMirrorRule finds the mirror rule for the requested keyspace, table + // name, and the tablet type in the VSchema. + FindMirrorRule(tablename sqlparser.TableName) (*vindexes.MirrorRule, error) } // PlannerNameToVersion returns the numerical representation of the planner diff --git a/go/vt/vtgate/planbuilder/select.go b/go/vt/vtgate/planbuilder/select.go index 6927c5315ac..9cc1c8efe06 100644 --- a/go/vt/vtgate/planbuilder/select.go +++ b/go/vt/vtgate/planbuilder/select.go @@ -205,7 +205,7 @@ func newBuildSelectPlan( return nil, nil, err } - if ks, _ := ctx.SemTable.SingleUnshardedKeyspace(); ks != nil { + if ks, ok := ctx.SemTable.CanTakeSelectUnshardedShortcut(); ok { plan, tablesUsed, err = selectUnshardedShortcut(ctx, selStmt, ks) if err != nil { return nil, nil, err @@ -214,7 +214,6 @@ func newBuildSelectPlan( return plan, tablesUsed, err } - // From this point on, we know it is not an unsharded query and return the NotUnshardedErr if there is any if ctx.SemTable.NotUnshardedErr != nil { return nil, nil, ctx.SemTable.NotUnshardedErr } diff --git a/go/vt/vtgate/planbuilder/testdata/mirror_cases.json b/go/vt/vtgate/planbuilder/testdata/mirror_cases.json new file mode 100644 index 00000000000..2466b3dca12 --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/mirror_cases.json @@ -0,0 +1,397 @@ +[ + { + "comment": "select unsharded, qualified, table mirrored to unsharded table", + "query": "select t1.id from unsharded_src1.t1 where t1.id = 1", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src1.t1 where t1.id = 1", + "Instructions": { + "OperatorType": "Mirror", + "Variant": "PercentBased", + "Percent": 1, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1 where t1.id = 1", + "Table": "t1" + }, + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1 where t1.id = 1", + "Table": "t1" + } + ] + }, + "TablesUsed": [ + "unsharded_dst1.t1", + "unsharded_src1.t1" + ] + } + }, + { + "comment": "select unsharded, qualified, table mirrored to unsharded table with zero percentage", + "query": "select t3.id from unsharded_src1.t3 where t3.id = 1", + "plan": { + "QueryType": "SELECT", + "Original": "select t3.id from unsharded_src1.t3 where t3.id = 1", + "Instructions": { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t3.id from t3 where 1 != 1", + "Query": "select t3.id from t3 where t3.id = 1", + "Table": "t3" + }, + "TablesUsed": [ + "unsharded_src1.t3" + ] + } + }, + { + "comment": "select unsharded, qualified, table mirrored to sharded table", + "query": "select t2.id from unsharded_src1.t2 where t2.id = 1", + "plan": { + "QueryType": "SELECT", + "Original": "select t2.id from unsharded_src1.t2 where t2.id = 1", + "Instructions": { + "OperatorType": "Mirror", + "Variant": "PercentBased", + "Percent": 2, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t2.id from t2 where 1 != 1", + "Query": "select t2.id from t2 where t2.id = 1", + "Table": "t2" + }, + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "sharded_dst1", + "Sharded": true + }, + "FieldQuery": "select t2.id from t1 as t2 where 1 != 1", + "Query": "select t2.id from t1 as t2 where t2.id = 1", + "Table": "t1", + "Values": [ + "1" + ], + "Vindex": "xxhash" + } + ] + }, + "TablesUsed": [ + "sharded_dst1.t1", + "unsharded_src1.t2" + ] + } + }, + { + "comment": "select two unsharded, qualified, tables, one mirrored to unsharded table, other to sharded table", + "query": "select t1.id, t2.id from unsharded_src1.t1, unsharded_src1.t2 where t1.id = t2.id", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id, t2.id from unsharded_src1.t1, unsharded_src1.t2 where t1.id = t2.id", + "Instructions": { + "OperatorType": "Mirror", + "Variant": "PercentBased", + "Percent": 1, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id, t2.id from t1, t2 where 1 != 1", + "Query": "select t1.id, t2.id from t1, t2 where t1.id = t2.id", + "Table": "t1, t2" + }, + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0", + "JoinVars": { + "t1_id1": 0 + }, + "TableName": "t1_t1", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1", + "Table": "t1" + }, + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "sharded_dst1", + "Sharded": true + }, + "FieldQuery": "select t2.id from t1 as t2 where 1 != 1", + "Query": "select t2.id from t1 as t2 where t2.id = :t1_id1", + "Table": "t1", + "Values": [ + ":t1_id1" + ], + "Vindex": "xxhash" + } + ] + } + ] + }, + "TablesUsed": [ + "sharded_dst1.t1", + "unsharded_dst1.t1", + "unsharded_src1.t1", + "unsharded_src1.t2" + ] + } + }, + { + "comment": "union of selects from unsharded, qualified, tables, one mirrored to unsharded table, other to sharded table", + "query": "select t1.id from unsharded_src1.t1 union select t2.id from unsharded_src1.t2", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src1.t1 union select t2.id from unsharded_src1.t2", + "Instructions": { + "OperatorType": "Mirror", + "Variant": "PercentBased", + "Percent": 1, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1 union select t2.id from t2 where 1 != 1", + "Query": "select t1.id from t1 union select t2.id from t2", + "Table": "t1, t2" + }, + { + "OperatorType": "Distinct", + "Collations": [ + "(0:1)" + ], + "Inputs": [ + { + "OperatorType": "Concatenate", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst1", + "Sharded": false + }, + "FieldQuery": "select dt.c0 as id, weight_string(dt.c0) from (select t1.id from t1 where 1 != 1) as dt(c0) where 1 != 1", + "Query": "select dt.c0 as id, weight_string(dt.c0) from (select distinct t1.id from t1) as dt(c0)", + "Table": "t1" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "sharded_dst1", + "Sharded": true + }, + "FieldQuery": "select dt.c0 as id, weight_string(dt.c0) from (select t2.id from t1 as t2 where 1 != 1) as dt(c0) where 1 != 1", + "Query": "select dt.c0 as id, weight_string(dt.c0) from (select distinct t2.id from t1 as t2) as dt(c0)", + "Table": "t1" + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "sharded_dst1.t1", + "unsharded_dst1.t1", + "unsharded_src1.t1", + "unsharded_src1.t2" + ] + } + }, + { + "comment": "inserts are not mirrored", + "query": "insert into unsharded_src1.t1 (id) values(1)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into unsharded_src1.t1 (id) values(1)", + "Instructions": { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "insert into t1(id) values (1)", + "TableName": "t1" + }, + "TablesUsed": [ + "unsharded_src1.t1" + ] + } + }, + { + "comment": "updates are not mirrored", + "query": "update unsharded_src1.t1 set data = 'a' where id = 1", + "plan": { + "QueryType": "UPDATE", + "Original": "update unsharded_src1.t1 set data = 'a' where id = 1", + "Instructions": { + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update t1 set `data` = 'a' where id = 1", + "Table": "t1" + }, + "TablesUsed": [ + "unsharded_src1.t1" + ] + } + }, + { + "comment": "deletes are not mirrored", + "query": "delete from unsharded_src1.t1 where id = 1", + "plan": { + "QueryType": "DELETE", + "Original": "delete from unsharded_src1.t1 where id = 1", + "Instructions": { + "OperatorType": "Delete", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "delete from t1 where id = 1", + "Table": "t1" + }, + "TablesUsed": [ + "unsharded_src1.t1" + ] + } + }, + { + "comment": "self-mirror is not allowed", + "query": "select t1.id from unsharded_src2.t1", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src2.t1", + "Instructions": { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src2", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1", + "Table": "t1" + }, + "TablesUsed": [ + "unsharded_src2.t1" + ] + } + }, + { + "comment": "chained mirror is not allowed", + "query": "select t2.id from unsharded_src2.t2", + "plan": { + "QueryType": "SELECT", + "Original": "select t2.id from unsharded_src2.t2", + "Instructions": { + "OperatorType": "Mirror", + "Variant": "PercentBased", + "Percent": 4, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src2", + "Sharded": false + }, + "FieldQuery": "select t2.id from t2 where 1 != 1", + "Query": "select t2.id from t2", + "Table": "t2" + }, + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst2", + "Sharded": false + }, + "FieldQuery": "select t2.id from t2 where 1 != 1", + "Query": "select t2.id from t2", + "Table": "t2" + } + ] + }, + "TablesUsed": [ + "unsharded_dst2.t2", + "unsharded_src2.t2" + ] + } + }, + { + "comment": "circular mirror is not allowed", + "query": "select t1.id from unsharded_src3.t1", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src3.t1", + "Instructions": { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src3", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1", + "Table": "t1" + }, + "TablesUsed": [ + "unsharded_src3.t1" + ] + } + } +] diff --git a/go/vt/vtgate/planbuilder/testdata/vschemas/mirror_schema.json b/go/vt/vtgate/planbuilder/testdata/vschemas/mirror_schema.json new file mode 100644 index 00000000000..4feaa09c126 --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/vschemas/mirror_schema.json @@ -0,0 +1,103 @@ +{ + "mirror_rules": { + "rules": [ + { + "from_table": "unsharded_src1.t1", + "to_table": "unsharded_dst1.t1", + "percent": 1 + }, + { + "from_table": "unsharded_src1.t2", + "to_table": "sharded_dst1.t1", + "percent": 2 + }, + { + "from_table": "unsharded_src2.t1", + "to_table": "unsharded_src2.t1", + "percent": 3 + }, + { + "from_table": "unsharded_src2.t2", + "to_table": "unsharded_dst2.t2", + "percent": 4 + }, + { + "from_table": "unsharded_dst2.t2", + "to_table": "unsharded_dst3.t2", + "percent": 5 + }, + { + "from_table": "unsharded_src3.t1", + "to_table": "unsharded_dst4.t1", + "percent": 6 + }, + { + "from_table": "unsharded_dst4.t2", + "to_table": "unsharded_src3.t2", + "percent": 7 + }, + { + "from_table": "sharded_src1.t1", + "to_table": "sharded_dst1.t1", + "percent": 8 + }, + { + "from_table": "unsharded_src1.t3", + "to_table": "unsharded_dst1.t2", + "percent": 0 + } + ] + }, + "keyspaces": { + "main": { + "sharded": false, + "tables": {} + }, + "unsharded_src1": { + "sharded": false, + "tables": {} + }, + "unsharded_src2": { + "sharded": false, + "tables": {} + }, + "unsharded_src3": { + "sharded": false, + "tables": {} + }, + "unsharded_dst1": { + "sharded": false, + "tables": {} + }, + "unsharded_dst2": { + "sharded": false, + "tables": {} + }, + "unsharded_dst3": { + "sharded": false, + "tables": {} + }, + "unsharded_dst4": { + "sharded": false, + "tables": {} + }, + "sharded_dst1": { + "sharded": true, + "vindexes": { + "xxhash": { + "type": "xxhash" + } + }, + "tables": { + "t1": { + "columnVindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + } + } + } + } +} diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index 70b96a63126..fa5ffbbffd8 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -48,6 +48,7 @@ const ( func init() { ksToSandbox = make(map[string]*sandbox) + sandboxMirrorRules = `{"rules":[]}` createSandbox(KsTestSharded) createSandbox(KsTestUnsharded) createSandbox(KsTestBadVSchema) @@ -57,6 +58,7 @@ func init() { var sandboxMu sync.Mutex var ksToSandbox map[string]*sandbox +var sandboxMirrorRules string func createSandbox(keyspace string) *sandbox { sandboxMu.Lock() @@ -86,9 +88,20 @@ func getSandboxSrvVSchema() *vschemapb.SrvVSchema { } result.Keyspaces[keyspace] = &vs } + var mrs vschemapb.MirrorRules + if err := json2.Unmarshal([]byte(sandboxMirrorRules), &mrs); err != nil { + panic(err) + } + result.MirrorRules = &mrs return result } +func setSandboxMirrorRules(mirrorRules string) { + sandboxMu.Lock() + defer sandboxMu.Unlock() + sandboxMirrorRules = mirrorRules +} + type sandbox struct { // Use sandmu to access the variables below sandmu sync.Mutex diff --git a/go/vt/vtgate/semantics/FakeSI.go b/go/vt/vtgate/semantics/FakeSI.go index 1ca6718f1a8..cb1b9cec094 100644 --- a/go/vt/vtgate/semantics/FakeSI.go +++ b/go/vt/vtgate/semantics/FakeSI.go @@ -85,3 +85,8 @@ func (s *FakeSI) KeyspaceError(keyspace string) error { func (s *FakeSI) GetAggregateUDFs() []string { return s.UDFs } + +// FindMirrorRule implements SchemaInformation. +func (s *FakeSI) FindMirrorRule(tablename sqlparser.TableName) (*vindexes.MirrorRule, error) { + return nil, nil +} diff --git a/go/vt/vtgate/semantics/analyzer.go b/go/vt/vtgate/semantics/analyzer.go index ec42f638629..0a9d2480d9b 100644 --- a/go/vt/vtgate/semantics/analyzer.go +++ b/go/vt/vtgate/semantics/analyzer.go @@ -387,7 +387,14 @@ func (a *analyzer) reAnalyze(statement sqlparser.SQLNode) error { // canShortCut checks if we are dealing with a single unsharded keyspace and no tables that have managed foreign keys // if so, we can stop the analyzer early func (a *analyzer) canShortCut(statement sqlparser.Statement) (canShortCut bool) { - ks, _ := singleUnshardedKeyspace(a.earlyTables.Tables) + var ks *vindexes.Keyspace + switch statement.(type) { + case sqlparser.SelectStatement: + ks, canShortCut = canTakeSelectUnshardedShortcut(a.earlyTables.Tables) + default: + ks, canShortCut = canTakeUnshardedShortcut(a.earlyTables.Tables) + } + a.singleUnshardedKeyspace = ks != nil if !a.singleUnshardedKeyspace { return false diff --git a/go/vt/vtgate/semantics/cte_table.go b/go/vt/vtgate/semantics/cte_table.go index 320189ff871..498fc5076c1 100644 --- a/go/vt/vtgate/semantics/cte_table.go +++ b/go/vt/vtgate/semantics/cte_table.go @@ -143,6 +143,11 @@ func (cte *CTETable) getTableSet(org originable) TableSet { return org.tableSetFor(cte.ASTNode) } +// GetMirrorRule implements TableInfo. +func (cte *CTETable) GetMirrorRule() *vindexes.MirrorRule { + return nil +} + type CTE struct { Name string Query sqlparser.SelectStatement diff --git a/go/vt/vtgate/semantics/derived_table.go b/go/vt/vtgate/semantics/derived_table.go index 684966f8ac8..fc7e1cb391c 100644 --- a/go/vt/vtgate/semantics/derived_table.go +++ b/go/vt/vtgate/semantics/derived_table.go @@ -195,3 +195,8 @@ func (dt *DerivedTable) checkForDuplicates() error { } return nil } + +// GetMirrorRule implements TableInfo. +func (dt *DerivedTable) GetMirrorRule() *vindexes.MirrorRule { + return nil +} diff --git a/go/vt/vtgate/semantics/info_schema.go b/go/vt/vtgate/semantics/info_schema.go index 11e577f3fa7..127f4a00960 100644 --- a/go/vt/vtgate/semantics/info_schema.go +++ b/go/vt/vtgate/semantics/info_schema.go @@ -1603,11 +1603,15 @@ type infoSchemaWithColumns struct { infoSchemaData map[string][]vindexes.Column } +var _ SchemaInformation = (*infoSchemaWithColumns)(nil) + // MySQLVersion implements SchemaInformation. // We cache this information, since these are maps that are not changed -var infoSchema57 = getInfoSchema57() -var infoSchema80 = getInfoSchema80() +var ( + infoSchema57 = getInfoSchema57() + infoSchema80 = getInfoSchema80() +) // newSchemaInfo returns a SchemaInformation that has the column information for all info_schema tables func newSchemaInfo(inner SchemaInformation) SchemaInformation { @@ -1665,3 +1669,8 @@ func (i *infoSchemaWithColumns) KeyspaceError(keyspace string) error { func (i *infoSchemaWithColumns) GetAggregateUDFs() []string { return i.inner.GetAggregateUDFs() } + +// FindMirrorRule implements SchemaInformation. +func (i *infoSchemaWithColumns) FindMirrorRule(tablename sqlparser.TableName) (*vindexes.MirrorRule, error) { + return i.inner.FindMirrorRule(tablename) +} diff --git a/go/vt/vtgate/semantics/real_table.go b/go/vt/vtgate/semantics/real_table.go index 33c15b749f9..64f3ac5f3f0 100644 --- a/go/vt/vtgate/semantics/real_table.go +++ b/go/vt/vtgate/semantics/real_table.go @@ -35,6 +35,7 @@ type RealTable struct { Table *vindexes.Table CTE *CTE VindexHint *sqlparser.IndexHint + MirrorRule *vindexes.MirrorRule isInfSchema bool collationEnv *collations.Environment cache map[string]dependencies @@ -224,3 +225,8 @@ func (r *RealTable) Name() (sqlparser.TableName, error) { func (r *RealTable) matches(name sqlparser.TableName) bool { return (name.Qualifier.IsEmpty() || name.Qualifier.String() == r.dbName) && r.tableName == name.Name.String() } + +// GetMirrorRule implements TableInfo. +func (r *RealTable) GetMirrorRule() *vindexes.MirrorRule { + return r.MirrorRule +} diff --git a/go/vt/vtgate/semantics/semantic_table.go b/go/vt/vtgate/semantics/semantic_table.go index 6738546fe37..f9856a901a6 100644 --- a/go/vt/vtgate/semantics/semantic_table.go +++ b/go/vt/vtgate/semantics/semantic_table.go @@ -63,6 +63,9 @@ type ( dependencies(colName string, org originable) (dependencies, error) getExprFor(s string) (sqlparser.Expr, error) getTableSet(org originable) TableSet + + // GetMirrorRule returns the vschema mirror rule for this TableInfo + GetMirrorRule() *vindexes.MirrorRule } // ColumnInfo contains information about columns @@ -86,6 +89,12 @@ type ( RecursiveCTE bool } + // MirrorInfo stores information used to produce mirror + // operators. + MirrorInfo struct { + Percent float32 + } + // SemTable contains semantic analysis information about the query. SemTable struct { // Tables stores information about the tables in the query, including derived tables @@ -163,6 +172,7 @@ type ( GetForeignKeyChecksState() *bool KeyspaceError(keyspace string) error GetAggregateUDFs() []string + FindMirrorRule(tablename sqlparser.TableName) (*vindexes.MirrorRule, error) } shortCut = int @@ -174,10 +184,8 @@ const ( dependsOnKeyspace ) -var ( - // ErrNotSingleTable refers to an error happening when something should be used only for single tables - ErrNotSingleTable = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] should only be used for single tables") -) +// ErrNotSingleTable refers to an error happening when something should be used only for single tables +var ErrNotSingleTable = vterrors.VT13001("should only be used for single tables") // CopyDependencies copies the dependencies from one expression into the other func (st *SemTable) CopyDependencies(from, to sqlparser.Expr) { @@ -734,7 +742,6 @@ func RewriteDerivedTableExpression(expr sqlparser.Expr, vt TableInfo) sqlparser. col := *node col.Qualifier = sqlparser.TableName{} cursor.Replace(&col) - }, nil).(sqlparser.Expr) } @@ -797,6 +804,7 @@ func singleUnshardedKeyspace(tableInfos []TableInfo) (ks *vindexes.Keyspace, tab tables = append(tables, vtbl) } + return ks, tables } @@ -980,3 +988,45 @@ func (st *SemTable) NewTableId() TableSet { st.Tables = append(st.Tables, nil) return tableID } + +func (st *SemTable) CanTakeSelectUnshardedShortcut() (*vindexes.Keyspace, bool) { + return canTakeSelectUnshardedShortcut(st.Tables) +} + +func (st *SemTable) CanTakeUnshardedShortcut() (*vindexes.Keyspace, bool) { + return canTakeUnshardedShortcut(st.Tables) +} + +func canTakeUnshardedShortcut(tableInfos []TableInfo) (*vindexes.Keyspace, bool) { + uks, _ := singleUnshardedKeyspace(tableInfos) + return uks, uks != nil +} + +func canTakeSelectUnshardedShortcut(tableInfos []TableInfo) (*vindexes.Keyspace, bool) { + if mi := mirrorInfo(tableInfos); mi.Percent > 0 { + return nil, false + } + return canTakeUnshardedShortcut(tableInfos) +} + +func (st *SemTable) GetMirrorInfo() MirrorInfo { + return mirrorInfo(st.Tables) +} + +// mirrorInfo looks through all tables with mirror rules defined, and returns a +// MirrorInfo containing the lowest mirror percentage found across all rules. +// +// The idea here is that if you have two tables with mirror rules both involved +// in a query, and one of those rules is 1% while the other is 100%, to mirror +// the query with 1% chance. +func mirrorInfo(tableInfos []TableInfo) MirrorInfo { + mi := MirrorInfo{} + for _, t := range tableInfos { + if mr := t.GetMirrorRule(); mr != nil { + if mi.Percent == 0 || mr.Percent < mi.Percent { + mi.Percent = mr.Percent + } + } + } + return mi +} diff --git a/go/vt/vtgate/semantics/semantic_table_test.go b/go/vt/vtgate/semantics/semantic_table_test.go index 84f8cec6cf9..1f324215326 100644 --- a/go/vt/vtgate/semantics/semantic_table_test.go +++ b/go/vt/vtgate/semantics/semantic_table_test.go @@ -464,7 +464,7 @@ func TestRemoveParentForeignKey(t *testing.T) { }, }, }, - expectedErr: "[BUG] should only be used for single tables", + expectedErr: "VT13001: [BUG] should only be used for single tables", }, } for _, tt := range tests { @@ -716,7 +716,7 @@ func TestRemoveNonRequiredForeignKeys(t *testing.T) { SingleTableSet(0).Merge(SingleTableSet(1)): {}, }, }, - expectedErr: "[BUG] should only be used for single tables", + expectedErr: "VT13001: [BUG] should only be used for single tables", }, { name: "Error - Reading table info for child foreign keys", @@ -734,7 +734,7 @@ func TestRemoveNonRequiredForeignKeys(t *testing.T) { }, parentForeignKeysInvolved: map[TableSet][]vindexes.ParentFKInfo{}, }, - expectedErr: "[BUG] should only be used for single tables", + expectedErr: "VT13001: [BUG] should only be used for single tables", }, } for _, tt := range tests { diff --git a/go/vt/vtgate/semantics/table_collector.go b/go/vt/vtgate/semantics/table_collector.go index 16285307846..191d9c3b38e 100644 --- a/go/vt/vtgate/semantics/table_collector.go +++ b/go/vt/vtgate/semantics/table_collector.go @@ -532,11 +532,20 @@ func (etc *earlyTableCollector) createTable( return nil, err } + mr, err := etc.si.FindMirrorRule(t) + if err != nil { + // Mirroring is best effort. If we get an error while mirroring, keep going + // as if mirroring was disabled. We don't want to interrupt production work + // because of an issue with mirroring. + mr = nil + } + table := &RealTable{ tableName: alias.As.String(), ASTNode: alias, Table: tbl, VindexHint: hint, + MirrorRule: mr, isInfSchema: isInfSchema, collationEnv: etc.si.Environment().CollationEnv(), } diff --git a/go/vt/vtgate/semantics/vindex_table.go b/go/vt/vtgate/semantics/vindex_table.go index b598c93f36a..c8ef271af5d 100644 --- a/go/vt/vtgate/semantics/vindex_table.go +++ b/go/vt/vtgate/semantics/vindex_table.go @@ -84,3 +84,8 @@ func (v *VindexTable) getColumns(ignoreInvisbleCol bool) []ColumnInfo { func (v *VindexTable) IsInfSchema() bool { return v.Table.IsInfSchema() } + +// GetMirrorRule implements TableInfo. +func (v *VindexTable) GetMirrorRule() *vindexes.MirrorRule { + return nil +} diff --git a/go/vt/vtgate/semantics/vtable.go b/go/vt/vtgate/semantics/vtable.go index 14519a7e938..6cd7e34aecc 100644 --- a/go/vt/vtgate/semantics/vtable.go +++ b/go/vt/vtgate/semantics/vtable.go @@ -175,3 +175,8 @@ func selectExprsToInfos( } return } + +// GetMirrorRule implements TableInfo. +func (v *vTableInfo) GetMirrorRule() *vindexes.MirrorRule { + return nil +} diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index a2055e57557..d7a426e41d4 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -58,10 +58,12 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateservice" ) -var _ engine.VCursor = (*vcursorImpl)(nil) -var _ plancontext.VSchema = (*vcursorImpl)(nil) -var _ iExecute = (*Executor)(nil) -var _ vindexes.VCursor = (*vcursorImpl)(nil) +var ( + _ engine.VCursor = (*vcursorImpl)(nil) + _ plancontext.VSchema = (*vcursorImpl)(nil) + _ iExecute = (*Executor)(nil) + _ vindexes.VCursor = (*vcursorImpl)(nil) +) // vcursor_impl needs these facilities to be able to be able to execute queries for vindexes type iExecute interface { @@ -922,7 +924,6 @@ func (vc *vcursorImpl) SetPriority(priority string) { } else if vc.safeSession.Options != nil && vc.safeSession.Options.Priority != "" { vc.safeSession.Options.Priority = "" } - } // SetConsolidator implements the SessionActions interface @@ -1102,6 +1103,23 @@ func (vc *vcursorImpl) GetAggregateUDFs() []string { return vc.vschema.GetAggregateUDFs() } +// FindMirrorRule finds the mirror rule for the requested table name and +// VSchema tablet type. +func (vc *vcursorImpl) FindMirrorRule(name sqlparser.TableName) (*vindexes.MirrorRule, error) { + destKeyspace, destTabletType, _, err := vc.executor.ParseDestinationTarget(name.Qualifier.String()) + if err != nil { + return nil, err + } + if destKeyspace == "" { + destKeyspace = vc.keyspace + } + mirrorRule, err := vc.vschema.FindMirrorRule(destKeyspace, name.Name.String(), destTabletType) + if err != nil { + return nil, err + } + return mirrorRule, err +} + // ParseDestinationTarget parses destination target string and sets default keyspace if possible. func parseDestinationTarget(targetString string, vschema *vindexes.VSchema) (string, topodatapb.TabletType, key.Destination, error) { destKeyspace, destTabletType, dest, err := topoprotopb.ParseDestination(targetString, defaultTabletType) @@ -1162,7 +1180,6 @@ func (vc *vcursorImpl) ExecuteVSchema(ctx context.Context, keyspace string, vsch allowed := vschemaacl.Authorized(user) if !allowed { return vterrors.NewErrorf(vtrpcpb.Code_PERMISSION_DENIED, vterrors.AccessDeniedError, "User '%s' is not authorized to perform vschema operations", user.GetUsername()) - } // Resolve the keyspace either from the table qualifier or the target keyspace @@ -1179,7 +1196,6 @@ func (vc *vcursorImpl) ExecuteVSchema(ctx context.Context, keyspace string, vsch ks := srvVschema.Keyspaces[ksName] ks, err := topotools.ApplyVSchemaDDL(ksName, ks, vschemaDDL) - if err != nil { return err } @@ -1187,7 +1203,6 @@ func (vc *vcursorImpl) ExecuteVSchema(ctx context.Context, keyspace string, vsch srvVschema.Keyspaces[ksName] = ks return vc.vm.UpdateVSchema(ctx, ksName, srvVschema) - } func (vc *vcursorImpl) MessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, tableName string, callback func(*sqltypes.Result) error) error { @@ -1320,6 +1335,7 @@ func (vc *vcursorImpl) VExplainLogging() { func (vc *vcursorImpl) GetVExplainLogs() []engine.ExecuteEntry { return vc.safeSession.logging.GetLogs() } + func (vc *vcursorImpl) FindRoutedShard(keyspace, shard string) (keyspaceName string, err error) { return vc.vschema.FindRoutedShard(keyspace, shard) } @@ -1388,6 +1404,37 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso return v } +func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { + callerId := callerid.EffectiveCallerIDFromContext(ctx) + immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx) + + clonedCtx := callerid.NewContext(ctx, callerId, immediateCallerId) + + v := &vcursorImpl{ + safeSession: NewAutocommitSession(vc.safeSession.Session), + keyspace: vc.keyspace, + tabletType: vc.tabletType, + destination: vc.destination, + marginComments: vc.marginComments, + executor: vc.executor, + resolver: vc.resolver, + topoServer: vc.topoServer, + logStats: &logstats.LogStats{Ctx: clonedCtx}, + collation: vc.collation, + ignoreMaxMemoryRows: vc.ignoreMaxMemoryRows, + vschema: vc.vschema, + vm: vc.vm, + semTable: vc.semTable, + warnShardedOnly: vc.warnShardedOnly, + warnings: vc.warnings, + pv: vc.pv, + } + + v.marginComments.Trailing += "/* mirror query */" + + return v +} + // UpdateForeignKeyChecksState updates the foreign key checks state of the vcursor. func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) { // Initialize the state to unspecified. diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 19b3a9039d4..3852bbfcde3 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -63,6 +63,7 @@ const ( // VSchema represents the denormalized version of SrvVSchema, // used for building routing plans. type VSchema struct { + MirrorRules map[string]*MirrorRule `json:"mirror_rules"` RoutingRules map[string]*RoutingRule `json:"routing_rules"` // globalTables contains the name of all tables in all keyspaces. If the @@ -79,13 +80,34 @@ type VSchema struct { created time.Time } +// MirrorRule represents one mirror rule. +type MirrorRule struct { + Error error + Percent float32 `json:"percent,omitempty"` + Table *Table `json:"table,omitempty"` +} + +// MarshalJSON returns a JSON representation of MirrorRule. +func (mr *MirrorRule) MarshalJSON() ([]byte, error) { + if mr.Error != nil { + return json.Marshal(mr.Error.Error()) + } + return json.Marshal(struct { + Percent float32 + Table *Table + }{ + Percent: mr.Percent, + Table: mr.Table, + }) +} + // RoutingRule represents one routing rule. type RoutingRule struct { Tables []*Table Error error } -// MarshalJSON returns a JSON representation of Column. +// MarshalJSON returns a JSON representation of RoutingRule. func (rr *RoutingRule) MarshalJSON() ([]byte, error) { if rr.Error != nil { return json.Marshal(rr.Error.Error()) @@ -324,6 +346,7 @@ func (source *Source) String() string { // BuildVSchema builds a VSchema from a SrvVSchema. func BuildVSchema(source *vschemapb.SrvVSchema, parser *sqlparser.Parser) (vschema *VSchema) { vschema = &VSchema{ + MirrorRules: make(map[string]*MirrorRule), RoutingRules: make(map[string]*RoutingRule), globalTables: make(map[string]*Table), uniqueVindexes: make(map[string]Vindex), @@ -338,6 +361,7 @@ func BuildVSchema(source *vschemapb.SrvVSchema, parser *sqlparser.Parser) (vsche buildRoutingRule(source, vschema, parser) buildShardRoutingRule(source, vschema) buildKeyspaceRoutingRule(source, vschema) + buildMirrorRule(source, vschema, parser) // Resolve auto-increments after routing rules are built since sequence tables also obey routing rules. resolveAutoIncrement(source, vschema, parser) return vschema @@ -895,7 +919,7 @@ func escapeQualifiedTable(qualifiedTableName string) (string, error) { } func extractTableParts(tableName string, allowUnqualified bool) (string, string, error) { - errMsgFormat := "invalid table name: %s, it must be of the " + errMsgFormat := "invalid table name: '%s', it must be of the " if allowUnqualified { errMsgFormat = errMsgFormat + "unqualified form or the " } @@ -914,7 +938,6 @@ func extractTableParts(tableName string, allowUnqualified bool) (string, string, } // Using fmt.Errorf instead of vterrors here because this error is always wrapped in vterrors. return "", "", fmt.Errorf(errMsgFormat, tableName) - } func parseTable(tableName string) (sqlparser.TableName, error) { @@ -972,7 +995,6 @@ outer: } toKeyspace, toTableName, err := parser.ParseTable(toTable) - if err != nil { vschema.RoutingRules[rule.FromTable] = &RoutingRule{ Error: err, @@ -1025,6 +1047,216 @@ func buildKeyspaceRoutingRule(source *vschemapb.SrvVSchema, vschema *VSchema) { vschema.KeyspaceRoutingRules = rulesMap } +func buildMirrorRule(source *vschemapb.SrvVSchema, vschema *VSchema, parser *sqlparser.Parser) { + if source.MirrorRules == nil { + return + } + + // Used to validate no mirror chains exist. + fromTableKeyspaces := make(map[string]string) + toKeyspaces := make(map[string]struct{}) + + for _, rule := range source.MirrorRules.Rules { + toTable := rule.ToTable + + // + // Forbid duplicate FromTables expressions. + // + + if _, ok := vschema.MirrorRules[rule.FromTable]; ok { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_ALREADY_EXISTS, + "from table: duplicate rule for entry '%s'", + rule.FromTable, + ), + } + continue + } + + // + // Parse and validate FromTable. + // + + // Separate tablet-type from rest of FromTable. + fromTableParts := strings.Split(rule.FromTable, "@") + if len(fromTableParts) == 0 { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: invalid table name: '%s'", + rule.FromTable, + ), + } + } + + // Escape and parse the FromTable, without table-type specifier. + fromTable, err := escapeQualifiedTable(fromTableParts[0]) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: %s", + err.Error(), + ), + } + continue + } + fromKeyspace, fromTableName, err := parser.ParseTable(fromTable) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: invalid table name: '%s'", + err.Error(), + ), + } + continue + } + + // Find the from table. + _, err = vschema.FindTable(fromKeyspace, fromTableName) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: %s", + err.Error(), + ), + } + continue + } + + // Validate the table-type, if specified. + if len(fromTableParts) > 1 { + fromTabletTypeSuffix := "@" + strings.Join(fromTableParts[1:], "") + var ok bool + for _, tabletTypeSuffix := range TabletTypeSuffix { + if tabletTypeSuffix == fromTabletTypeSuffix { + ok = true + break + } + } + if !ok { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: invalid tablet type: '%s'", + rule.FromTable, + ), + } + continue + } + } + + // + // Parse and validate ToTable. + // + + // Forbid tablet-type specifier. + toTableParts := strings.Split(toTable, "@") + if len(toTableParts) != 1 || toTableParts[0] == "@" { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: tablet type may not be specified: '%s'", + rule.ToTable, + ), + } + continue + } + + // Escape and parse the table. + toTable, err = escapeQualifiedTable(toTable) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: %s", + err.Error(), + ), + } + continue + } + toKeyspace, toTableName, err := parser.ParseTable(toTable) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: invalid table name: '%s'", + rule.ToTable, + ), + } + continue + } + + // Forbid self-mirroring. + if fromKeyspace == toKeyspace { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: cannot reside in same keyspace as from table", + ), + } + continue + } + + // + // Find table in VSchema. + // + + t, err := vschema.FindTable(toKeyspace, toTableName) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: %s", + err.Error(), + ), + } + continue + } + + // + // Return non-error mirror rule. + // + + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Table: t, + Percent: rule.Percent, + } + + // + // Save some info for validating no mirror chains exist + // + + fromTableKeyspaces[rule.FromTable] = fromKeyspace + toKeyspaces[toKeyspace] = struct{}{} + } + + // Forbid mirror chains. Keyspaces which are the target of a mirror rule + // may not be the source of another. + for fromTable, rule := range vschema.MirrorRules { + if rule.Error != nil { + continue + } + fromKeyspace, ok := fromTableKeyspaces[fromTable] + if !ok { + rule.Error = vterrors.Errorf( + vtrpcpb.Code_INTERNAL, + "[BUG] from table: failed to determine keyspace", + ) + continue + } + if _, ok := toKeyspaces[fromKeyspace]; ok { + rule.Error = vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "mirror chaining is not allowed", + ) + } + } +} + // FindTable returns a pointer to the Table. If a keyspace is specified, only tables // from that keyspace are searched. If the specified keyspace is unsharded // and no tables matched, it's considered valid: FindTable will construct a table @@ -1325,6 +1557,28 @@ func (vschema *VSchema) GetAggregateUDFs() (udfs []string) { return } +// FindMirrorRule finds a mirror rule from the keyspace, table name and +// tablet type. +func (vschema *VSchema) FindMirrorRule(keyspace, tablename string, tabletType topodatapb.TabletType) (*MirrorRule, error) { + qualified := tablename + if keyspace != "" { + qualified = keyspace + "." + tablename + } + fqtn := qualified + TabletTypeSuffix[tabletType] + // First look for a fully qualified table name: keyspace.table@tablet_type. + // Then look for one without tablet type: keyspace.table. + for _, name := range []string{fqtn, qualified} { + mr, ok := vschema.MirrorRules[name] + if ok { + if mr.Error != nil { + return nil, mr.Error + } + return mr, nil + } + } + return nil, nil +} + // ByCost provides the interface needed for ColumnVindexes to // be sorted by cost order. type ByCost []*ColumnVindex diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index 7761b6ae8ab..25f8e135698 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -838,6 +838,7 @@ func TestVSchemaRoutingRules(t *testing.T) { Keyspace: ks2, } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{ "rt1": { Error: errors.New("table rt1 has more than one target: [ks1.t1 ks2.t2]"), @@ -852,10 +853,10 @@ func TestVSchemaRoutingRules(t *testing.T) { Error: errors.New("duplicate rule for entry dup"), }, "badname": { - Error: errors.New("invalid table name: t1.t2.t3, it must be of the qualified form . (dots are not allowed in either name)"), + Error: errors.New("invalid table name: 't1.t2.t3', it must be of the qualified form . (dots are not allowed in either name)"), }, "unqualified": { - Error: errors.New("invalid table name: t1, it must be of the qualified form . (dots are not allowed in either name)"), + Error: errors.New("invalid table name: 't1', it must be of the qualified form . (dots are not allowed in either name)"), }, "badkeyspace": { Error: errors.New("VT05003: unknown database 'ks3' in vschema"), @@ -897,6 +898,282 @@ func TestVSchemaRoutingRules(t *testing.T) { assert.Equal(t, string(wantb), string(gotb), string(gotb)) } +func TestVSchemaMirrorRules(t *testing.T) { + input := vschemapb.SrvVSchema{ + MirrorRules: &vschemapb.MirrorRules{ + Rules: []*vschemapb.MirrorRule{ + // Empty FromTable not allowed. + { + FromTable: "", + ToTable: "ks2.ks2t1", + }, + // Invalid FromTable, needs to be .[@]. + { + FromTable: "ks1", + ToTable: "ks2.ks2t1", + }, + // Invalid ToTable, needs to be .
. + { + FromTable: "ks1.ks1t1", + ToTable: "ks2", + }, + // Invalid ToTable, needs to be .
. + { + FromTable: "ks1.ks1t2", + ToTable: "ks2.ks2t2.c", + }, + // OK, unsharded => unsharded. + { + FromTable: "ks1.ks1t3", + ToTable: "ks2.ks2t3", + Percent: 50, + }, + // Invalid ToTable, needs to be .
. + { + FromTable: "ks1.ks1t4", + ToTable: "ks2.ks2t4@replica", + }, + // OK, unsharded@tablet-type => unsharded. + { + FromTable: "ks1.ks1t5@replica", + ToTable: "ks2.ks2t5", + }, + // Invalid FromTable tablet type.. + { + FromTable: "ks1.ks1t6@stone", + ToTable: "ks2.ks2t6", + }, + // OK, sharded => sharded. + { + FromTable: "ks3.ks3t1", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + // OK, unsharded => sharded. + { + FromTable: "ks1.ks1t7", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + // Destination sharded table must be defined in VSchema. + { + FromTable: "ks1.ks1t8", + ToTable: "ks4.ks4t2", + Percent: 50, + }, + // Source sharded table must be defined in VSchema. + { + FromTable: "ks3.ks3t2", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + // Keyspaces that are the target of a rule may not be the + // source of another. + { + FromTable: "ks2.ks2t9", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + }, + }, + RoutingRules: &vschemapb.RoutingRules{}, + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks1": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Tables: map[string]*vschemapb.Table{}, + }, + "ks2": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Tables: map[string]*vschemapb.Table{}, + }, + "ks3": { + Sharded: true, + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Vindexes: map[string]*vschemapb.Vindex{ + "stfu1": { + Type: "stfu", + }, + }, + Tables: map[string]*vschemapb.Table{ + "ks3t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "stfu1", + }, + }, + }, + }, + }, + "ks4": { + Sharded: true, + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Vindexes: map[string]*vschemapb.Vindex{ + "stfu1": { + Type: "stfu", + }, + }, + Tables: map[string]*vschemapb.Table{ + "ks4t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "stfu1", + }, + }, + }, + }, + }, + }, + } + got := BuildVSchema(&input, sqlparser.NewTestParser()) + + ks1 := &Keyspace{ + Name: "ks1", + Sharded: false, + } + ks2 := &Keyspace{ + Name: "ks2", + Sharded: false, + } + ks3 := &Keyspace{ + Name: "ks3", + Sharded: true, + } + ks4 := &Keyspace{ + Name: "ks4", + Sharded: true, + } + + vindex1 := &stFU{ + name: "stfu1", + } + + ks3t1 := &Table{ + Name: sqlparser.NewIdentifierCS("ks3t1"), + Keyspace: ks3, + ColumnVindexes: []*ColumnVindex{{ + Columns: []sqlparser.IdentifierCI{sqlparser.NewIdentifierCI("id")}, + Type: "stfu", + Name: "stfu1", + Vindex: vindex1, + isUnique: vindex1.IsUnique(), + cost: vindex1.Cost(), + }}, + } + ks3t1.Ordered = []*ColumnVindex{ + ks3t1.ColumnVindexes[0], + } + + ks4t1 := &Table{ + Name: sqlparser.NewIdentifierCS("ks4t1"), + Keyspace: ks4, + ColumnVindexes: []*ColumnVindex{{ + Columns: []sqlparser.IdentifierCI{sqlparser.NewIdentifierCI("id")}, + Type: "stfu", + Name: "stfu1", + Vindex: vindex1, + isUnique: vindex1.IsUnique(), + cost: vindex1.Cost(), + }}, + } + ks4t1.Ordered = []*ColumnVindex{ + ks4t1.ColumnVindexes[0], + } + + want := &VSchema{ + MirrorRules: map[string]*MirrorRule{ + "": { + Error: errors.New("from table: invalid table name: '', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1": { + Error: errors.New("from table: invalid table name: 'ks1', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1.ks1t1": { + Error: errors.New("to table: invalid table name: 'ks2', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1.ks1t2": { + Error: errors.New("to table: invalid table name: 'ks2.ks2t2.c', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1.ks1t3": { + Table: &Table{ + Name: sqlparser.NewIdentifierCS("ks2t3"), + }, + Percent: 50, + }, + "ks1.ks1t4": { + Error: errors.New("to table: tablet type may not be specified: 'ks2.ks2t4@replica'"), + }, + "ks1.ks1t5@replica": { + Table: &Table{ + Name: sqlparser.NewIdentifierCS("ks2t5"), + }, + }, + "ks1.ks1t6@stone": { + Error: errors.New("from table: invalid tablet type: 'ks1.ks1t6@stone'"), + }, + "ks3.ks3t1": { + Table: ks4t1, + Percent: 50, + }, + "ks1.ks1t7": { + Table: ks4t1, + Percent: 50, + }, + "ks1.ks1t8": { + Error: errors.New("to table: table ks4t2 not found"), + }, + "ks3.ks3t2": { + Error: errors.New("from table: table ks3t2 not found"), + }, + "ks2.ks2t9": { + Error: errors.New("mirror chaining is not allowed"), + }, + }, + RoutingRules: map[string]*RoutingRule{}, + Keyspaces: map[string]*KeyspaceSchema{ + "ks1": { + Keyspace: ks1, + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Tables: map[string]*Table{}, + Vindexes: map[string]Vindex{}, + }, + "ks2": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Keyspace: ks2, + Tables: map[string]*Table{}, + Vindexes: map[string]Vindex{}, + }, + "ks3": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Keyspace: ks3, + Tables: map[string]*Table{ + "ks3t1": ks3t1, + }, + Vindexes: map[string]Vindex{ + "stfu1": vindex1, + }, + }, + "ks4": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Keyspace: ks4, + Tables: map[string]*Table{ + "ks4t1": ks4t1, + }, + Vindexes: map[string]Vindex{ + "stfu1": vindex1, + }, + }, + }, + } + + gotb, err := json.MarshalIndent(got, "", " ") + assert.NoError(t, err) + wantb, err := json.MarshalIndent(want, "", " ") + assert.NoError(t, err) + assert.Equal(t, string(wantb), string(gotb), string(gotb)) +} + func TestChooseVindexForType(t *testing.T) { testcases := []struct { in querypb.Type @@ -1247,6 +1524,7 @@ func TestShardedVSchemaMultiColumnVindex(t *testing.T) { t1.ColumnVindexes[0], } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": t1, @@ -1323,6 +1601,7 @@ func TestShardedVSchemaNotOwned(t *testing.T) { t1.ColumnVindexes[1], t1.ColumnVindexes[0]} want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": t1, @@ -1430,6 +1709,7 @@ func TestBuildVSchemaDupSeq(t *testing.T) { Keyspace: ksb, Type: "sequence"} want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": nil, @@ -1491,6 +1771,7 @@ func TestBuildVSchemaDupTable(t *testing.T) { Keyspace: ksb, } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": nil, @@ -1620,6 +1901,7 @@ func TestBuildVSchemaDupVindex(t *testing.T) { t2.ColumnVindexes[0], } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": nil, @@ -2206,6 +2488,7 @@ func TestSequence(t *testing.T) { t2.ColumnVindexes[0], } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "seq": seq, diff --git a/go/vt/vtgate/vschema_manager_test.go b/go/vt/vtgate/vschema_manager_test.go index 8dfb889df0d..8db0c2df05b 100644 --- a/go/vt/vtgate/vschema_manager_test.go +++ b/go/vt/vtgate/vschema_manager_test.go @@ -234,6 +234,7 @@ func TestVSchemaUpdate(t *testing.T) { }, }, expected: &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{ "ks": { @@ -499,6 +500,7 @@ func TestVSchemaUDFsUpdate(t *testing.T) { }, nil) utils.MustMatchFn(".globalTables", ".uniqueVindexes")(t, &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{ "ks": { @@ -821,6 +823,7 @@ func TestVSchemaUpdateWithFKReferenceToInternalTables(t *testing.T) { }, nil) utils.MustMatchFn(".globalTables", ".uniqueVindexes")(t, &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{ "ks": { @@ -870,6 +873,7 @@ func makeTestVSchema(ks string, sharded bool, tbls map[string]*vindexes.Table) * func makeTestEmptyVSchema() *vindexes.VSchema { return &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{}, } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go index ba79d916c36..7cafcc6d485 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go @@ -106,6 +106,7 @@ func TestUpdateVSchema(t *testing.T) { expectUpdateCount(t, startCount+1) want := `{ + "mirror_rules": {}, "routing_rules": {}, "keyspaces": { "vttest": {