From cee8bd33303d3e41d4c4132a001e202d972076cf Mon Sep 17 00:00:00 2001
From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com>
Date: Mon, 19 Feb 2024 09:38:18 -0600
Subject: [PATCH] [release-17.0] vtexplain: Ensure memory topo is set up for
 throttler (#15279) (#15283)

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
---
 go/cmd/vtexplain/vtexplain.go                 |  4 +-
 go/vt/srvtopo/watch.go                        |  7 ++-
 go/vt/vtadmin/api.go                          |  4 +-
 go/vt/vtexplain/vtexplain.go                  |  7 ++-
 go/vt/vtexplain/vtexplain_test.go             | 20 +++++--
 go/vt/vtexplain/vtexplain_vtgate.go           | 60 +++++++++++++------
 go/vt/vtexplain/vtexplain_vttablet.go         |  9 ++-
 go/vt/vtexplain/vtexplain_vttablet_test.go    | 18 ++++--
 .../tabletserver/throttle/throttler.go        |  6 +-
 9 files changed, 93 insertions(+), 42 deletions(-)

diff --git a/go/cmd/vtexplain/vtexplain.go b/go/cmd/vtexplain/vtexplain.go
index d5f60a893ba..3fce2954069 100644
--- a/go/cmd/vtexplain/vtexplain.go
+++ b/go/cmd/vtexplain/vtexplain.go
@@ -24,6 +24,7 @@ import (
 	"vitess.io/vitess/go/exit"
 	"vitess.io/vitess/go/vt/logutil"
 	"vitess.io/vitess/go/vt/servenv"
+	"vitess.io/vitess/go/vt/topo/memorytopo"
 	"vitess.io/vitess/go/vt/vtexplain"
 	"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
 
@@ -147,7 +148,8 @@ func parseAndRun() error {
 		Target:          dbName,
 	}
 
-	vte, err := vtexplain.Init(vschema, schema, ksShardMap, opts)
+	ts := memorytopo.NewServer(vtexplain.Cell)
+	vte, err := vtexplain.Init(ts, vschema, schema, ksShardMap, opts)
 	if err != nil {
 		return err
 	}
diff --git a/go/vt/srvtopo/watch.go b/go/vt/srvtopo/watch.go
index 2d470327c4e..8c46c936ea5 100644
--- a/go/vt/srvtopo/watch.go
+++ b/go/vt/srvtopo/watch.go
@@ -200,8 +200,11 @@ func (entry *watchEntry) onErrorLocked(err error, init bool) {
 			entry.value = nil
 		}
 	} else {
-		entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err)
-		log.Errorf("%v", entry.lastError)
+		if !topo.IsErrType(err, topo.Interrupted) {
+			// No need to log if we're explicitly interrupted.
+			entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err)
+			log.Errorf("%v", entry.lastError)
+		}
 
 		// Even though we didn't get a new value, update the lastValueTime
 		// here since the watch was successfully running before and we want
diff --git a/go/vt/vtadmin/api.go b/go/vt/vtadmin/api.go
index de973fd283f..ea6b9a9b988 100644
--- a/go/vt/vtadmin/api.go
+++ b/go/vt/vtadmin/api.go
@@ -37,6 +37,7 @@ import (
 	"vitess.io/vitess/go/vt/concurrency"
 	"vitess.io/vitess/go/vt/log"
 	"vitess.io/vitess/go/vt/topo"
+	"vitess.io/vitess/go/vt/topo/memorytopo"
 	"vitess.io/vitess/go/vt/topo/topoproto"
 	"vitess.io/vitess/go/vt/vtadmin/cluster"
 	"vitess.io/vitess/go/vt/vtadmin/cluster/dynamic"
@@ -2148,7 +2149,8 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
 		return nil, er.Error()
 	}
 
-	vte, err := vtexplain.Init(srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
+	ts := memorytopo.NewServer(vtexplain.Cell)
+	vte, err := vtexplain.Init(ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
 	if err != nil {
 		return nil, fmt.Errorf("error initilaizing vtexplain: %w", err)
 	}
diff --git a/go/vt/vtexplain/vtexplain.go b/go/vt/vtexplain/vtexplain.go
index 74810dc618f..f92d793739c 100644
--- a/go/vt/vtexplain/vtexplain.go
+++ b/go/vt/vtexplain/vtexplain.go
@@ -30,6 +30,7 @@ import (
 
 	"vitess.io/vitess/go/vt/discovery"
 	"vitess.io/vitess/go/vt/servenv"
+	"vitess.io/vitess/go/vt/topo"
 	"vitess.io/vitess/go/vt/vtgate"
 
 	"vitess.io/vitess/go/jsonutil"
@@ -53,7 +54,7 @@ func init() {
 }
 
 const (
-	vtexplainCell = "explainCell"
+	Cell = "explainCell"
 
 	// ModeMulti is the default mode with autocommit implemented at vtgate
 	ModeMulti = "multi"
@@ -180,7 +181,7 @@ type TabletActions struct {
 }
 
 // Init sets up the fake execution environment
-func Init(vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) {
+func Init(ts *topo.Server, vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) {
 	// Verify options
 	if opts.ReplicationMode != "ROW" && opts.ReplicationMode != "STATEMENT" {
 		return nil, fmt.Errorf("invalid replication mode \"%s\"", opts.ReplicationMode)
@@ -200,7 +201,7 @@ func Init(vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplai
 		Autocommit:   true,
 	}}
 	vte.setGlobalTabletEnv(tabletEnv)
-	err = vte.initVtgateExecutor(vSchemaStr, ksShardMapStr, opts)
+	err = vte.initVtgateExecutor(ts, vSchemaStr, ksShardMapStr, opts)
 	if err != nil {
 		return nil, fmt.Errorf("initVtgateExecutor: %v", err.Error())
 	}
diff --git a/go/vt/vtexplain/vtexplain_test.go b/go/vt/vtexplain/vtexplain_test.go
index 30fd289b671..5edc036baad 100644
--- a/go/vt/vtexplain/vtexplain_test.go
+++ b/go/vt/vtexplain/vtexplain_test.go
@@ -30,6 +30,7 @@ import (
 	"vitess.io/vitess/go/vt/key"
 	"vitess.io/vitess/go/vt/proto/topodata"
 	"vitess.io/vitess/go/vt/topo"
+	"vitess.io/vitess/go/vt/topo/memorytopo"
 	"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv/tabletenvtest"
 
 	querypb "vitess.io/vitess/go/vt/proto/query"
@@ -48,7 +49,7 @@ type testopts struct {
 	shardmap map[string]map[string]*topo.ShardInfo
 }
 
-func initTest(mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain {
+func initTest(ts *topo.Server, mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain {
 	schema, err := os.ReadFile("testdata/test-schema.sql")
 	require.NoError(t, err)
 
@@ -64,7 +65,7 @@ func initTest(mode string, opts *Options, topts *testopts, t *testing.T) *VTExpl
 	}
 
 	opts.ExecutionMode = mode
-	vte, err := Init(string(vSchema), string(schema), shardmap, opts)
+	vte, err := Init(ts, string(vSchema), string(schema), shardmap, opts)
 	require.NoError(t, err, "vtexplain Init error\n%s", string(schema))
 	return vte
 }
@@ -85,7 +86,9 @@ func testExplain(testcase string, opts *Options, t *testing.T) {
 
 func runTestCase(testcase, mode string, opts *Options, topts *testopts, t *testing.T) {
 	t.Run(testcase, func(t *testing.T) {
-		vte := initTest(mode, opts, topts, t)
+		ts := memorytopo.NewServer(Cell)
+		vte := initTest(ts, mode, opts, topts, t)
+		defer vte.Stop()
 
 		sqlFile := fmt.Sprintf("testdata/%s-queries.sql", testcase)
 		sql, err := os.ReadFile(sqlFile)
@@ -171,7 +174,9 @@ func TestExplain(t *testing.T) {
 }
 
 func TestErrors(t *testing.T) {
-	vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t)
+	ts := memorytopo.NewServer(Cell)
+	vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
+	defer vte.Stop()
 
 	tests := []struct {
 		SQL string
@@ -208,7 +213,9 @@ func TestErrors(t *testing.T) {
 }
 
 func TestJSONOutput(t *testing.T) {
-	vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t)
+	ts := memorytopo.NewServer(Cell)
+	vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
+	defer vte.Stop()
 	sql := "select 1 from user where id = 1"
 	explains, err := vte.Run(sql)
 	require.NoError(t, err, "vtexplain error")
@@ -353,7 +360,8 @@ func TestInit(t *testing.T) {
   }
 }`
 	schema := "create table table_missing_primary_vindex (id int primary key)"
-	_, err := Init(vschema, schema, "", defaultTestOpts())
+	ts := memorytopo.NewServer(Cell)
+	_, err := Init(ts, vschema, schema, "", defaultTestOpts())
 	require.Error(t, err)
 	require.Contains(t, err.Error(), "missing primary col vindex")
 }
diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go
index 676e9757266..f59ec832af4 100644
--- a/go/vt/vtexplain/vtexplain_vtgate.go
+++ b/go/vt/vtexplain/vtexplain_vtgate.go
@@ -22,26 +22,23 @@ package vtexplain
 import (
 	"context"
 	"fmt"
+	"path"
 	"sort"
 	"strings"
 
-	"vitess.io/vitess/go/vt/vtgate/logstats"
-	"vitess.io/vitess/go/vt/vtgate/vindexes"
-
 	"vitess.io/vitess/go/cache"
-	"vitess.io/vitess/go/vt/topo"
-	"vitess.io/vitess/go/vt/topo/memorytopo"
-
-	"vitess.io/vitess/go/vt/vterrors"
-
 	"vitess.io/vitess/go/json2"
 	"vitess.io/vitess/go/streamlog"
 	"vitess.io/vitess/go/vt/discovery"
 	"vitess.io/vitess/go/vt/key"
 	"vitess.io/vitess/go/vt/log"
 	"vitess.io/vitess/go/vt/srvtopo"
+	"vitess.io/vitess/go/vt/topo"
+	"vitess.io/vitess/go/vt/vterrors"
 	"vitess.io/vitess/go/vt/vtgate"
 	"vitess.io/vitess/go/vt/vtgate/engine"
+	"vitess.io/vitess/go/vt/vtgate/logstats"
+	"vitess.io/vitess/go/vt/vtgate/vindexes"
 	"vitess.io/vitess/go/vt/vttablet/queryservice"
 
 	querypb "vitess.io/vitess/go/vt/proto/query"
@@ -50,14 +47,14 @@ import (
 	vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
 )
 
-func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {
+func (vte *VTExplain) initVtgateExecutor(ts *topo.Server, vSchemaStr, ksShardMapStr string, opts *Options) error {
 	vte.explainTopo = &ExplainTopo{NumShards: opts.NumShards}
-	vte.explainTopo.TopoServer = memorytopo.NewServer(vtexplainCell)
+	vte.explainTopo.TopoServer = ts
 	vte.healthCheck = discovery.NewFakeHealthCheck(nil)
 
-	resolver := vte.newFakeResolver(opts, vte.explainTopo, vtexplainCell)
+	resolver := vte.newFakeResolver(opts, vte.explainTopo, Cell)
 
-	err := vte.buildTopology(opts, vSchemaStr, ksShardMapStr, opts.NumShards)
+	err := vte.buildTopology(ts, opts, vSchemaStr, ksShardMapStr, opts.NumShards)
 	if err != nil {
 		return err
 	}
@@ -73,7 +70,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts
 
 	streamSize := 10
 	var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
-	vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
+	vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
 
 	queryLogBufferSize := 10
 	vtgate.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))
@@ -96,7 +93,7 @@ func (vte *VTExplain) newFakeResolver(opts *Options, serv srvtopo.Server, cell s
 	return vtgate.NewResolver(srvResolver, serv, cell, sc)
 }
 
-func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error {
+func (vte *VTExplain) buildTopology(ts *topo.Server, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error {
 	vte.explainTopo.Lock.Lock()
 	defer vte.explainTopo.Lock.Unlock()
 
@@ -121,6 +118,10 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap
 		return err
 	}
 
+	conn, err := ts.ConnForCell(context.Background(), Cell)
+	if err != nil {
+		return err
+	}
 	vte.explainTopo.TabletConns = make(map[string]*explainTablet)
 	vte.explainTopo.KeyspaceShards = make(map[string]map[string]*topodatapb.ShardReference)
 	for ks, vschema := range vte.explainTopo.Keyspaces {
@@ -131,6 +132,32 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap
 
 		vte.explainTopo.KeyspaceShards[ks] = make(map[string]*topodatapb.ShardReference)
 
+		srvPath := path.Join(topo.KeyspacesPath, ks, topo.SrvKeyspaceFile)
+		srvKeyspace := &topodatapb.SrvKeyspace{
+			Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
+				{
+					ServedType:      topodatapb.TabletType_PRIMARY,
+					ShardReferences: shards,
+				},
+				{
+					ServedType:      topodatapb.TabletType_REPLICA,
+					ShardReferences: shards,
+				},
+				{
+					ServedType:      topodatapb.TabletType_RDONLY,
+					ShardReferences: shards,
+				},
+			},
+		}
+		data, err := srvKeyspace.MarshalVT()
+		if err != nil {
+			return err
+		}
+		_, err = conn.Update(context.Background(), srvPath, data, nil)
+		if err != nil {
+			return err
+		}
+
 		for _, shard := range shards {
 			// If the topology is in the middle of a reshard, there can be two shards covering the same key range (e.g.
 			// both source shard 80- and target shard 80-c0 cover the keyrange 80-c0). For the purposes of explain, we
@@ -143,14 +170,13 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap
 			hostname := fmt.Sprintf("%s/%s", ks, shard.Name)
 			log.Infof("registering test tablet %s for keyspace %s shard %s", hostname, ks, shard.Name)
 
-			tablet := vte.healthCheck.AddFakeTablet(vtexplainCell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService {
-				return vte.newTablet(opts, t)
+			tablet := vte.healthCheck.AddFakeTablet(Cell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService {
+				return vte.newTablet(opts, t, ts)
 			})
 			vte.explainTopo.TabletConns[hostname] = tablet.(*explainTablet)
 			vte.explainTopo.KeyspaceShards[ks][shard.Name] = shard
 		}
 	}
-
 	return err
 }
 
diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go
index 7135e3eb07d..24c17b8f969 100644
--- a/go/vt/vtexplain/vtexplain_vttablet.go
+++ b/go/vt/vtexplain/vtexplain_vttablet.go
@@ -23,8 +23,6 @@ import (
 	"strings"
 	"sync"
 
-	"vitess.io/vitess/go/vt/sidecardb"
-
 	"vitess.io/vitess/go/mysql"
 	"vitess.io/vitess/go/mysql/collations"
 	"vitess.io/vitess/go/mysql/fakesqldb"
@@ -33,8 +31,9 @@ import (
 	"vitess.io/vitess/go/vt/dbconfigs"
 	"vitess.io/vitess/go/vt/log"
 	"vitess.io/vitess/go/vt/mysqlctl"
+	"vitess.io/vitess/go/vt/sidecardb"
 	"vitess.io/vitess/go/vt/sqlparser"
-	"vitess.io/vitess/go/vt/topo/memorytopo"
+	"vitess.io/vitess/go/vt/topo"
 	"vitess.io/vitess/go/vt/topo/topoproto"
 	"vitess.io/vitess/go/vt/vtgate/evalengine"
 
@@ -102,7 +101,7 @@ type explainTablet struct {
 
 var _ queryservice.QueryService = (*explainTablet)(nil)
 
-func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTablet {
+func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet, ts *topo.Server) *explainTablet {
 	db := fakesqldb.New(nil)
 	sidecardb.AddSchemaInitQueries(db, true)
 
@@ -117,7 +116,7 @@ func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTab
 	config.EnableTableGC = false
 
 	// XXX much of this is cloned from the tabletserver tests
-	tsv := tabletserver.NewTabletServer(topoproto.TabletAliasString(t.Alias), config, memorytopo.NewServer(""), t.Alias)
+	tsv := tabletserver.NewTabletServer(topoproto.TabletAliasString(t.Alias), config, ts, t.Alias)
 
 	tablet := explainTablet{db: db, tsv: tsv, vte: vte}
 	db.Handler = &tablet
diff --git a/go/vt/vtexplain/vtexplain_vttablet_test.go b/go/vt/vtexplain/vtexplain_vttablet_test.go
index 15e17a22580..6cc0498752a 100644
--- a/go/vt/vtexplain/vtexplain_vttablet_test.go
+++ b/go/vt/vtexplain/vtexplain_vttablet_test.go
@@ -19,10 +19,12 @@ package vtexplain
 import (
 	"encoding/json"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
+	"vitess.io/vitess/go/vt/topo/memorytopo"
 	"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
 
 	topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@@ -67,7 +69,8 @@ create table t2 (
 		NumShards:       2,
 	}
 
-	vte, err := Init(testVSchema, testSchema, "", opts)
+	ts := memorytopo.NewServer(Cell)
+	vte, err := Init(ts, testVSchema, testSchema, "", opts)
 	require.NoError(t, err)
 	defer vte.Stop()
 
@@ -123,16 +126,21 @@ create table test_partitioned (
 	if err != nil {
 		t.Fatalf("parseSchema: %v", err)
 	}
-	vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t)
+	ts := memorytopo.NewServer(Cell)
+	vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
 
 	tabletEnv, _ := newTabletEnvironment(ddls, defaultTestOpts())
 	vte.setGlobalTabletEnv(tabletEnv)
 
 	tablet := vte.newTablet(defaultTestOpts(), &topodatapb.Tablet{
-		Keyspace: "test_keyspace",
+		Keyspace: "ks_sharded",
 		Shard:    "-80",
-		Alias:    &topodatapb.TabletAlias{},
-	})
+		Alias: &topodatapb.TabletAlias{
+			Cell: Cell,
+		},
+	}, ts)
+
+	time.Sleep(10 * time.Millisecond)
 	se := tablet.tsv.SchemaEngine()
 	tables := se.GetSchema()
 
diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go
index 3770375a35f..be984b7ea3f 100644
--- a/go/vt/vttablet/tabletserver/throttle/throttler.go
+++ b/go/vt/vttablet/tabletserver/throttle/throttler.go
@@ -316,7 +316,9 @@ func (throttler *Throttler) normalizeThrottlerConfig(thottlerConfig *topodatapb.
 func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspace, err error) bool {
 	log.Infof("Throttler: WatchSrvKeyspaceCallback called with: %+v", srvks)
 	if err != nil {
-		log.Errorf("WatchSrvKeyspaceCallback error: %v", err)
+		if !topo.IsErrType(err, topo.Interrupted) && !errors.Is(err, context.Canceled) {
+			log.Errorf("WatchSrvKeyspaceCallback error: %v", err)
+		}
 		return false
 	}
 	throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig)
@@ -477,7 +479,7 @@ func (throttler *Throttler) Open() error {
 
 				throttlerConfig, err := throttler.readThrottlerConfig(ctx)
 				if err == nil {
-					log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
+					log.Infof("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
 					// It's possible that during a retry-sleep, the throttler is closed and opened again, leading
 					// to two (or more) instances of this goroutine. That's not a big problem; it's fine if all
 					// attempt to read the throttler config; but we just want to ensure they don't step on each other