diff --git a/go/cmd/vtexplain/vtexplain.go b/go/cmd/vtexplain/vtexplain.go index d5f60a893ba..3fce2954069 100644 --- a/go/cmd/vtexplain/vtexplain.go +++ b/go/cmd/vtexplain/vtexplain.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/exit" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vtexplain" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" @@ -147,7 +148,8 @@ func parseAndRun() error { Target: dbName, } - vte, err := vtexplain.Init(vschema, schema, ksShardMap, opts) + ts := memorytopo.NewServer(vtexplain.Cell) + vte, err := vtexplain.Init(ts, vschema, schema, ksShardMap, opts) if err != nil { return err } diff --git a/go/vt/srvtopo/watch.go b/go/vt/srvtopo/watch.go index 2d470327c4e..8c46c936ea5 100644 --- a/go/vt/srvtopo/watch.go +++ b/go/vt/srvtopo/watch.go @@ -200,8 +200,11 @@ func (entry *watchEntry) onErrorLocked(err error, init bool) { entry.value = nil } } else { - entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err) - log.Errorf("%v", entry.lastError) + if !topo.IsErrType(err, topo.Interrupted) { + // No need to log if we're explicitly interrupted. + entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err) + log.Errorf("%v", entry.lastError) + } // Even though we didn't get a new value, update the lastValueTime // here since the watch was successfully running before and we want diff --git a/go/vt/vtadmin/api.go b/go/vt/vtadmin/api.go index de973fd283f..ea6b9a9b988 100644 --- a/go/vt/vtadmin/api.go +++ b/go/vt/vtadmin/api.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtadmin/cluster" "vitess.io/vitess/go/vt/vtadmin/cluster/dynamic" @@ -2148,7 +2149,8 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest) return nil, er.Error() } - vte, err := vtexplain.Init(srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"}) + ts := memorytopo.NewServer(vtexplain.Cell) + vte, err := vtexplain.Init(ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"}) if err != nil { return nil, fmt.Errorf("error initilaizing vtexplain: %w", err) } diff --git a/go/vt/vtexplain/vtexplain.go b/go/vt/vtexplain/vtexplain.go index 74810dc618f..f92d793739c 100644 --- a/go/vt/vtexplain/vtexplain.go +++ b/go/vt/vtexplain/vtexplain.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate" "vitess.io/vitess/go/jsonutil" @@ -53,7 +54,7 @@ func init() { } const ( - vtexplainCell = "explainCell" + Cell = "explainCell" // ModeMulti is the default mode with autocommit implemented at vtgate ModeMulti = "multi" @@ -180,7 +181,7 @@ type TabletActions struct { } // Init sets up the fake execution environment -func Init(vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) { +func Init(ts *topo.Server, vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) { // Verify options if opts.ReplicationMode != "ROW" && opts.ReplicationMode != "STATEMENT" { return nil, fmt.Errorf("invalid replication mode \"%s\"", opts.ReplicationMode) @@ -200,7 +201,7 @@ func Init(vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplai Autocommit: true, }} vte.setGlobalTabletEnv(tabletEnv) - err = vte.initVtgateExecutor(vSchemaStr, ksShardMapStr, opts) + err = vte.initVtgateExecutor(ts, vSchemaStr, ksShardMapStr, opts) if err != nil { return nil, fmt.Errorf("initVtgateExecutor: %v", err.Error()) } diff --git a/go/vt/vtexplain/vtexplain_test.go b/go/vt/vtexplain/vtexplain_test.go index 30fd289b671..5edc036baad 100644 --- a/go/vt/vtexplain/vtexplain_test.go +++ b/go/vt/vtexplain/vtexplain_test.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv/tabletenvtest" querypb "vitess.io/vitess/go/vt/proto/query" @@ -48,7 +49,7 @@ type testopts struct { shardmap map[string]map[string]*topo.ShardInfo } -func initTest(mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain { +func initTest(ts *topo.Server, mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain { schema, err := os.ReadFile("testdata/test-schema.sql") require.NoError(t, err) @@ -64,7 +65,7 @@ func initTest(mode string, opts *Options, topts *testopts, t *testing.T) *VTExpl } opts.ExecutionMode = mode - vte, err := Init(string(vSchema), string(schema), shardmap, opts) + vte, err := Init(ts, string(vSchema), string(schema), shardmap, opts) require.NoError(t, err, "vtexplain Init error\n%s", string(schema)) return vte } @@ -85,7 +86,9 @@ func testExplain(testcase string, opts *Options, t *testing.T) { func runTestCase(testcase, mode string, opts *Options, topts *testopts, t *testing.T) { t.Run(testcase, func(t *testing.T) { - vte := initTest(mode, opts, topts, t) + ts := memorytopo.NewServer(Cell) + vte := initTest(ts, mode, opts, topts, t) + defer vte.Stop() sqlFile := fmt.Sprintf("testdata/%s-queries.sql", testcase) sql, err := os.ReadFile(sqlFile) @@ -171,7 +174,9 @@ func TestExplain(t *testing.T) { } func TestErrors(t *testing.T) { - vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t) + ts := memorytopo.NewServer(Cell) + vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t) + defer vte.Stop() tests := []struct { SQL string @@ -208,7 +213,9 @@ func TestErrors(t *testing.T) { } func TestJSONOutput(t *testing.T) { - vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t) + ts := memorytopo.NewServer(Cell) + vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t) + defer vte.Stop() sql := "select 1 from user where id = 1" explains, err := vte.Run(sql) require.NoError(t, err, "vtexplain error") @@ -353,7 +360,8 @@ func TestInit(t *testing.T) { } }` schema := "create table table_missing_primary_vindex (id int primary key)" - _, err := Init(vschema, schema, "", defaultTestOpts()) + ts := memorytopo.NewServer(Cell) + _, err := Init(ts, vschema, schema, "", defaultTestOpts()) require.Error(t, err) require.Contains(t, err.Error(), "missing primary col vindex") } diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 676e9757266..f59ec832af4 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -22,26 +22,23 @@ package vtexplain import ( "context" "fmt" + "path" "sort" "strings" - "vitess.io/vitess/go/vt/vtgate/logstats" - "vitess.io/vitess/go/vt/vtgate/vindexes" - "vitess.io/vitess/go/cache" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/memorytopo" - - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/logstats" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/queryservice" querypb "vitess.io/vitess/go/vt/proto/query" @@ -50,14 +47,14 @@ import ( vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) -func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error { +func (vte *VTExplain) initVtgateExecutor(ts *topo.Server, vSchemaStr, ksShardMapStr string, opts *Options) error { vte.explainTopo = &ExplainTopo{NumShards: opts.NumShards} - vte.explainTopo.TopoServer = memorytopo.NewServer(vtexplainCell) + vte.explainTopo.TopoServer = ts vte.healthCheck = discovery.NewFakeHealthCheck(nil) - resolver := vte.newFakeResolver(opts, vte.explainTopo, vtexplainCell) + resolver := vte.newFakeResolver(opts, vte.explainTopo, Cell) - err := vte.buildTopology(opts, vSchemaStr, ksShardMapStr, opts.NumShards) + err := vte.buildTopology(ts, opts, vSchemaStr, ksShardMapStr, opts.NumShards) if err != nil { return err } @@ -73,7 +70,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts streamSize := 10 var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests - vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion) + vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion) queryLogBufferSize := 10 vtgate.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) @@ -96,7 +93,7 @@ func (vte *VTExplain) newFakeResolver(opts *Options, serv srvtopo.Server, cell s return vtgate.NewResolver(srvResolver, serv, cell, sc) } -func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error { +func (vte *VTExplain) buildTopology(ts *topo.Server, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error { vte.explainTopo.Lock.Lock() defer vte.explainTopo.Lock.Unlock() @@ -121,6 +118,10 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap return err } + conn, err := ts.ConnForCell(context.Background(), Cell) + if err != nil { + return err + } vte.explainTopo.TabletConns = make(map[string]*explainTablet) vte.explainTopo.KeyspaceShards = make(map[string]map[string]*topodatapb.ShardReference) for ks, vschema := range vte.explainTopo.Keyspaces { @@ -131,6 +132,32 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap vte.explainTopo.KeyspaceShards[ks] = make(map[string]*topodatapb.ShardReference) + srvPath := path.Join(topo.KeyspacesPath, ks, topo.SrvKeyspaceFile) + srvKeyspace := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodatapb.TabletType_PRIMARY, + ShardReferences: shards, + }, + { + ServedType: topodatapb.TabletType_REPLICA, + ShardReferences: shards, + }, + { + ServedType: topodatapb.TabletType_RDONLY, + ShardReferences: shards, + }, + }, + } + data, err := srvKeyspace.MarshalVT() + if err != nil { + return err + } + _, err = conn.Update(context.Background(), srvPath, data, nil) + if err != nil { + return err + } + for _, shard := range shards { // If the topology is in the middle of a reshard, there can be two shards covering the same key range (e.g. // both source shard 80- and target shard 80-c0 cover the keyrange 80-c0). For the purposes of explain, we @@ -143,14 +170,13 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap hostname := fmt.Sprintf("%s/%s", ks, shard.Name) log.Infof("registering test tablet %s for keyspace %s shard %s", hostname, ks, shard.Name) - tablet := vte.healthCheck.AddFakeTablet(vtexplainCell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService { - return vte.newTablet(opts, t) + tablet := vte.healthCheck.AddFakeTablet(Cell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService { + return vte.newTablet(opts, t, ts) }) vte.explainTopo.TabletConns[hostname] = tablet.(*explainTablet) vte.explainTopo.KeyspaceShards[ks][shard.Name] = shard } } - return err } diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index 7135e3eb07d..24c17b8f969 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -23,8 +23,6 @@ import ( "strings" "sync" - "vitess.io/vitess/go/vt/sidecardb" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/fakesqldb" @@ -33,8 +31,9 @@ import ( "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -102,7 +101,7 @@ type explainTablet struct { var _ queryservice.QueryService = (*explainTablet)(nil) -func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTablet { +func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet, ts *topo.Server) *explainTablet { db := fakesqldb.New(nil) sidecardb.AddSchemaInitQueries(db, true) @@ -117,7 +116,7 @@ func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTab config.EnableTableGC = false // XXX much of this is cloned from the tabletserver tests - tsv := tabletserver.NewTabletServer(topoproto.TabletAliasString(t.Alias), config, memorytopo.NewServer(""), t.Alias) + tsv := tabletserver.NewTabletServer(topoproto.TabletAliasString(t.Alias), config, ts, t.Alias) tablet := explainTablet{db: db, tsv: tsv, vte: vte} db.Handler = &tablet diff --git a/go/vt/vtexplain/vtexplain_vttablet_test.go b/go/vt/vtexplain/vtexplain_vttablet_test.go index 15e17a22580..6cc0498752a 100644 --- a/go/vt/vtexplain/vtexplain_vttablet_test.go +++ b/go/vt/vtexplain/vtexplain_vttablet_test.go @@ -19,10 +19,12 @@ package vtexplain import ( "encoding/json" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -67,7 +69,8 @@ create table t2 ( NumShards: 2, } - vte, err := Init(testVSchema, testSchema, "", opts) + ts := memorytopo.NewServer(Cell) + vte, err := Init(ts, testVSchema, testSchema, "", opts) require.NoError(t, err) defer vte.Stop() @@ -123,16 +126,21 @@ create table test_partitioned ( if err != nil { t.Fatalf("parseSchema: %v", err) } - vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t) + ts := memorytopo.NewServer(Cell) + vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t) tabletEnv, _ := newTabletEnvironment(ddls, defaultTestOpts()) vte.setGlobalTabletEnv(tabletEnv) tablet := vte.newTablet(defaultTestOpts(), &topodatapb.Tablet{ - Keyspace: "test_keyspace", + Keyspace: "ks_sharded", Shard: "-80", - Alias: &topodatapb.TabletAlias{}, - }) + Alias: &topodatapb.TabletAlias{ + Cell: Cell, + }, + }, ts) + + time.Sleep(10 * time.Millisecond) se := tablet.tsv.SchemaEngine() tables := se.GetSchema() diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 3770375a35f..be984b7ea3f 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -316,7 +316,9 @@ func (throttler *Throttler) normalizeThrottlerConfig(thottlerConfig *topodatapb. func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspace, err error) bool { log.Infof("Throttler: WatchSrvKeyspaceCallback called with: %+v", srvks) if err != nil { - log.Errorf("WatchSrvKeyspaceCallback error: %v", err) + if !topo.IsErrType(err, topo.Interrupted) && !errors.Is(err, context.Canceled) { + log.Errorf("WatchSrvKeyspaceCallback error: %v", err) + } return false } throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig) @@ -477,7 +479,7 @@ func (throttler *Throttler) Open() error { throttlerConfig, err := throttler.readThrottlerConfig(ctx) if err == nil { - log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig) + log.Infof("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig) // It's possible that during a retry-sleep, the throttler is closed and opened again, leading // to two (or more) instances of this goroutine. That's not a big problem; it's fine if all // attempt to read the throttler config; but we just want to ensure they don't step on each other