Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor vtgate.Executor #16993

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
queryLogBufferSize := 10
plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0)
config := &vtgate.Config{
DefaultTabletType: topodatapb.TabletType_PRIMARY,
PlannerVersion: opts.PlannerVersion,
ErrorTransform: vtgate.NullErrorTransformer{},
}
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, 0, config)
vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))

return nil
Expand Down
30 changes: 18 additions & 12 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ var (
queriesRoutedByTable = stats.NewCountersWithMultiLabels("QueriesRoutedByTable", "Queries routed from vtgate to vttablet by plan type, keyspace and table", []string{"Plan", "Keyspace", "Table"})

exceedMemoryRowsLogger = logutil.NewThrottledLogger("ExceedMemoryRows", 1*time.Minute)

errorTransform errorTransformer = nullErrorTransformer{}
)

const (
Expand All @@ -97,6 +95,13 @@ func init() {
servenv.OnParseFor("vtexplain", registerTabletTypeFlag)
}

// Config contains the shared global configuration of the vtgate server.
type Config struct {
DefaultTabletType topodatapb.TabletType
PlannerVersion plancontext.PlannerVersion
ErrorTransform errorTransformer
}

// Executor is the engine that executes queries by utilizing
// the abilities of the underlying vttablets.
type Executor struct {
Expand All @@ -106,7 +111,6 @@ type Executor struct {
resolver *Resolver
scatterConn *ScatterConn
txConn *TxConn
pv plancontext.PlannerVersion

mu sync.Mutex
vschema *vindexes.VSchema
Expand All @@ -130,6 +134,8 @@ type Executor struct {

warmingReadsPercent int
warmingReadsChannel chan bool

config *Config
}

var executorOnce sync.Once
Expand Down Expand Up @@ -159,8 +165,8 @@ func NewExecutor(
plans *PlanCache,
schemaTracker SchemaInfo,
noScatter bool,
pv plancontext.PlannerVersion,
warmingReadsPercent int,
config *Config,
) *Executor {
e := &Executor{
env: env,
Expand All @@ -174,10 +180,10 @@ func NewExecutor(
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
plans: plans,
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
config: config,
}

vschemaacl.Init()
Expand Down Expand Up @@ -249,7 +255,7 @@ func (e *Executor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
logStats.SaveEndTime()
e.queryLogger.Send(logStats)

err = errorTransform.TransformError(err)
err = e.config.ErrorTransform.TransformError(err)
err = vterrors.TruncateError(err, truncateErrorLen)

return result, err
Expand Down Expand Up @@ -391,7 +397,7 @@ func (e *Executor) StreamExecute(
logStats.SaveEndTime()
e.queryLogger.Send(logStats)

err = errorTransform.TransformError(err)
err = e.config.ErrorTransform.TransformError(err)
err = vterrors.TruncateError(err, truncateErrorLen)

return err
Expand Down Expand Up @@ -1073,7 +1079,7 @@ func (e *Executor) SaveVSchema(vschema *vindexes.VSchema, stats *VSchemaStats) {

// ParseDestinationTarget parses destination target string and sets default keyspace if possible.
func (e *Executor) ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) {
destKeyspace, destTabletType, dest, err := topoproto.ParseDestination(targetString, defaultTabletType)
destKeyspace, destTabletType, dest, err := topoproto.ParseDestination(targetString, e.config.DefaultTabletType)
// Set default keyspace
if destKeyspace == "" && len(e.VSchema().Keyspaces) == 1 {
for k := range e.VSchema().Keyspaces {
Expand Down Expand Up @@ -1362,7 +1368,7 @@ func (e *Executor) Prepare(ctx context.Context, method string, safeSession *Safe
e.queryLogger.Send(logStats)
}

err = errorTransform.TransformError(err)
err = e.config.ErrorTransform.TransformError(err)
err = vterrors.TruncateError(err, truncateErrorLen)

return fld, err
Expand Down Expand Up @@ -1406,7 +1412,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st

func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) {
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.config)

stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
Expand Down Expand Up @@ -1632,9 +1638,9 @@ type (
errorTransformer interface {
TransformError(err error) error
}
nullErrorTransformer struct{}
NullErrorTransformer struct{}
)

func (nullErrorTransformer) TransformError(err error) error {
func (NullErrorTransformer) TransformError(err error) error {
return err
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestUpdateEqual(t *testing.T) {
func TestUpdateFromSubQuery(t *testing.T) {
executor, sbc1, sbc2, _, ctx := createExecutorEnv(t)

executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4
logChan := executor.queryLogger.Subscribe("Test")
defer executor.queryLogger.Unsubscribe(logChan)

Expand Down
17 changes: 12 additions & 5 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta
// that sometimes can cause a plan to not be cached the very first time it's seen, to prevent
// one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness.
plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false)

executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, 0, defaultConfig())
executor.SetQueryLogger(queryLogger)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
Expand All @@ -195,6 +194,14 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta
return executor, ctx
}

func defaultConfig() *Config {
return &Config{
DefaultTabletType: defaultTabletType,
PlannerVersion: querypb.ExecuteOptions_Gen4,
ErrorTransform: NullErrorTransformer{},
}
}

func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) {
executor, ctx = createExecutorEnvCallback(t, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) {
switch {
Expand Down Expand Up @@ -230,7 +237,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex
plans := DefaultPlanCache()
env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion})
require.NoError(t, err)
executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, 0, defaultConfig())
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down Expand Up @@ -267,7 +274,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, 0, defaultConfig())
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand All @@ -292,7 +299,7 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context,
replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil)

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, warmingReadsPercent, defaultConfig())
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down
40 changes: 20 additions & 20 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestCreateTableValidTimestamp(t *testing.T) {
func TestGen4SelectDBA(t *testing.T) {
executor, sbc1, _, _, _ := createExecutorEnv(t)
executor.normalize = true
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

query := "select * from INFORMATION_SCHEMA.TABLE_CONSTRAINTS"
_, err := executor.Execute(context.Background(), nil, "TestSelectDBA",
Expand Down Expand Up @@ -1283,7 +1283,7 @@ func TestSelectEqual(t *testing.T) {

func TestSelectINFromOR(t *testing.T) {
executor, sbc1, _, _, ctx := createExecutorEnv(t)
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

session := &vtgatepb.Session{
TargetString: "@primary",
Expand Down Expand Up @@ -1648,7 +1648,7 @@ func TestSelectListArg(t *testing.T) {
func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, 0, defaultConfig())
ex.SetQueryLogger(queryLogger)
return ex
}
Expand Down Expand Up @@ -3269,7 +3269,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
}
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, 0, defaultConfig())
executor.SetQueryLogger(queryLogger)
defer executor.Close()
// some sleep for all goroutines to start
Expand Down Expand Up @@ -3349,7 +3349,7 @@ func TestSelectScatterFails(t *testing.T) {
func TestGen4SelectStraightJoin(t *testing.T) {
executor, sbc1, _, _, _ := createExecutorEnv(t)
executor.normalize = true
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4
session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"})
query := "select u.id from user u straight_join user2 u2 on u.id = u2.id"
_, err := executor.Execute(context.Background(), nil,
Expand All @@ -3371,7 +3371,7 @@ func TestGen4SelectStraightJoin(t *testing.T) {
func TestGen4MultiColumnVindexEqual(t *testing.T) {
executor, sbc1, sbc2, _, _ := createExecutorEnv(t)
executor.normalize = true
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"})
query := "select * from user_region where cola = 1 and colb = 2"
Expand Down Expand Up @@ -3410,7 +3410,7 @@ func TestGen4MultiColumnVindexEqual(t *testing.T) {
func TestGen4MultiColumnVindexIn(t *testing.T) {
executor, sbc1, sbc2, _, _ := createExecutorEnv(t)
executor.normalize = true
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"})
query := "select * from user_region where cola IN (1,17984) and colb IN (2,3,4)"
Expand Down Expand Up @@ -3449,7 +3449,7 @@ func TestGen4MultiColumnVindexIn(t *testing.T) {
func TestGen4MultiColMixedColComparision(t *testing.T) {
executor, sbc1, sbc2, _, _ := createExecutorEnv(t)
executor.normalize = true
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"})
query := "select * from user_region where colb = 2 and cola IN (1,17984)"
Expand Down Expand Up @@ -3486,7 +3486,7 @@ func TestGen4MultiColMixedColComparision(t *testing.T) {
func TestGen4MultiColBestVindexSel(t *testing.T) {
executor, sbc1, sbc2, _, _ := createExecutorEnv(t)
executor.normalize = true
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"})
query := "select * from user_region where colb = 2 and cola IN (1,17984) and cola = 1"
Expand Down Expand Up @@ -3532,7 +3532,7 @@ func TestGen4MultiColBestVindexSel(t *testing.T) {
func TestGen4MultiColMultiEqual(t *testing.T) {
executor, sbc1, sbc2, _, _ := createExecutorEnv(t)
executor.normalize = true
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"})
query := "select * from user_region where (cola,colb) in ((17984,2),(17984,3))"
Expand All @@ -3554,7 +3554,7 @@ func TestGen4MultiColMultiEqual(t *testing.T) {

func TestGen4SelectUnqualifiedReferenceTable(t *testing.T) {
executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t)
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

query := "select * from zip_detail"
session := &vtgatepb.Session{
Expand All @@ -3575,7 +3575,7 @@ func TestGen4SelectUnqualifiedReferenceTable(t *testing.T) {

func TestGen4SelectQualifiedReferenceTable(t *testing.T) {
executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t)
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

query := fmt.Sprintf("select * from %s.zip_detail", KsTestSharded)
session := &vtgatepb.Session{
Expand All @@ -3596,7 +3596,7 @@ func TestGen4SelectQualifiedReferenceTable(t *testing.T) {

func TestGen4JoinUnqualifiedReferenceTable(t *testing.T) {
executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t)
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

query := "select * from user join zip_detail on user.zip_detail_id = zip_detail.id"
session := &vtgatepb.Session{
Expand Down Expand Up @@ -3633,7 +3633,7 @@ func TestGen4JoinUnqualifiedReferenceTable(t *testing.T) {

func TestGen4CrossShardJoinQualifiedReferenceTable(t *testing.T) {
executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t)
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

query := "select user.id from user join TestUnsharded.zip_detail on user.zip_detail_id = TestUnsharded.zip_detail.id"
session := &vtgatepb.Session{
Expand Down Expand Up @@ -3690,7 +3690,7 @@ func TestRegionRange(t *testing.T) {
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

tcases := []struct {
regionID int
Expand Down Expand Up @@ -3740,7 +3740,7 @@ func TestMultiCol(t *testing.T) {
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

tcases := []struct {
cola, colb, colc int
Expand Down Expand Up @@ -3821,7 +3821,7 @@ func TestMultiColPartial(t *testing.T) {
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

tcases := []struct {
where string
Expand Down Expand Up @@ -3885,7 +3885,7 @@ func TestSelectAggregationNoData(t *testing.T) {
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

tcases := []struct {
sql string
Expand Down Expand Up @@ -3977,7 +3977,7 @@ func TestSelectAggregationData(t *testing.T) {
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4

tcases := []struct {
sql string
Expand Down Expand Up @@ -4135,7 +4135,7 @@ func TestSelectAggregationRandom(t *testing.T) {

executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()
executor.pv = querypb.ExecuteOptions_Gen4
executor.config.PlannerVersion = querypb.ExecuteOptions_Gen4
session := NewAutocommitSession(&vtgatepb.Session{})

rs, err := executor.Execute(context.Background(), nil, "TestSelectCFC", session, "select /*vt+ PLANNER=gen4 */ A.a, A.b, (A.a / A.b) as c from (select sum(a) as a, sum(b) as b from user) A", nil)
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/vtenv"
Expand Down Expand Up @@ -67,7 +66,7 @@ func TestStreamSQLSharded(t *testing.T) {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()

executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, 0, defaultConfig())
executor.SetQueryLogger(queryLogger)

defer executor.Close()
Expand Down
Loading
Loading