Skip to content

Commit

Permalink
fix: all executor test related to vcursor
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Dec 3, 2024
1 parent ce72f62 commit a5797ff
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 69 deletions.
59 changes: 44 additions & 15 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

"github.com/spf13/pflag"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cache/theine"
"vitess.io/vitess/go/mysql/capabilities"
Expand Down Expand Up @@ -135,6 +137,8 @@ type Executor struct {

warmingReadsPercent int
warmingReadsChannel chan bool

vConfig econtext.VCursorConfig
}

var executorOnce sync.Once
Expand Down Expand Up @@ -167,7 +171,6 @@ func NewExecutor(
pv plancontext.PlannerVersion,
warmingReadsPercent int,
) *Executor {
warnings.Add("WarnUnshardedOnly", 1)
e := &Executor{
env: env,
serv: serv,
Expand All @@ -185,6 +188,8 @@ func NewExecutor(
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
}
// setting the vcursor config.
e.initVConfig()

vschemaacl.Init()
// we subscribe to update from the VSchemaManager
Expand Down Expand Up @@ -1132,7 +1137,7 @@ func (e *Executor) getPlan(
bindVars,
parameterize,
vcursor.GetKeyspace(),
vcursor.GetSelectLimit(),
vcursor.SafeSession.GetSelectLimit(),
setVarComment,
vcursor.GetSystemVariablesCopy(),
vcursor.GetForeignKeyChecksState(),
Expand Down Expand Up @@ -1404,29 +1409,40 @@ func (e *Executor) prepare(ctx context.Context, safeSession *econtext.SafeSessio
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unrecognized prepare statement: %s", sql)
}

func (e *Executor) getVCursorConfig() econtext.VCursorConfig {
func (e *Executor) initVConfig() {
connCollation := collations.Unknown
if gw, isTabletGw := e.resolver.resolver.GetGateway().(*TabletGateway); isTabletGw {
connCollation = gw.DefaultConnCollation()
}
return econtext.VCursorConfig{
WarmingReadsPercent: warmingReadsPercent,
Collation: connCollation,
MaxMemoryRows: 0,
EnableShardRouting: false,
DefaultTabletType: 0,
QueryTimeout: 0,
DBDDLPlugin: "",
ForeignKeyMode: 0,
SetVarEnabled: false,
EnableViews: false,
if connCollation == collations.Unknown {
connCollation = e.env.CollationEnv().DefaultConnectionCharset()
}

e.vConfig = econtext.VCursorConfig{
Collation: connCollation,
DefaultTabletType: defaultTabletType,

QueryTimeout: queryTimeout,
MaxMemoryRows: maxMemoryRows,

SetVarEnabled: sysVarSetEnabled,
EnableViews: enableViews,
ForeignKeyMode: fkMode(foreignKeyMode),
EnableShardRouting: enableShardRouting,

DBDDLPlugin: dbDDLPlugin,

WarmingReadsPercent: e.warmingReadsPercent,
WarmingReadsTimeout: warmingReadsQueryTimeout,
WarmingReadsChannel: e.warmingReadsChannel,
}
}

func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) {
query, comments := sqlparser.SplitMarginComments(sql)

vcursor, _ := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, e.getVCursorConfig())
vcursor, _ := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv,
nullResultsObserver{}, e.vConfig)

stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
Expand Down Expand Up @@ -1662,3 +1678,16 @@ type (
func (nullErrorTransformer) TransformError(err error) error {
return err
}

func fkMode(foreignkey string) vschemapb.Keyspace_ForeignKeyMode {
switch foreignkey {
case "disallow":
return vschemapb.Keyspace_disallow
case "managed":
return vschemapb.Keyspace_managed
case "unmanaged":
return vschemapb.Keyspace_unmanaged

}
return vschemapb.Keyspace_unspecified
}
7 changes: 2 additions & 5 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,7 @@ func TestSystemVariablesMySQLBelow80(t *testing.T) {
func TestSystemVariablesWithSetVarDisabled(t *testing.T) {
executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "8.0.0")
executor.normalize = true

setVarEnabled = false
defer func() {
setVarEnabled = true
}()
executor.vConfig.SetVarEnabled = false
session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"})

sbc1.SetResults([]*sqltypes.Result{{
Expand Down Expand Up @@ -4370,6 +4366,7 @@ func TestWarmingReads(t *testing.T) {

// waitUntilQueryCount waits until the number of queries run on the tablet reach the specified count.
func waitUntilQueryCount(t *testing.T, tab *sandboxconn.SandboxConn, count int) {
t.Helper()
timeout := time.After(1 * time.Second)
for {
select {
Expand Down
34 changes: 17 additions & 17 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,13 +1299,13 @@ func TestExecutorDDLFk(t *testing.T) {
}

for _, stmt := range stmts {
for _, fkMode := range []string{"allow", "disallow"} {
t.Run(stmt+fkMode, func(t *testing.T) {
for _, mode := range []string{"allow", "disallow"} {
t.Run(stmt+mode, func(t *testing.T) {
executor, _, _, sbc, ctx := createExecutorEnv(t)
sbc.ExecCount.Store(0)
foreignKeyMode = fkMode
executor.vConfig.ForeignKeyMode = fkMode(mode)
_, err := executor.Execute(ctx, nil, mName, econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}), stmt, nil)
if fkMode == "allow" {
if mode == "allow" {
require.NoError(t, err)
require.EqualValues(t, 1, sbc.ExecCount.Load())
} else {
Expand Down Expand Up @@ -1561,8 +1561,8 @@ var pv = querypb.ExecuteOptions_Gen4

func TestGetPlanUnnormalized(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{})
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{})

query1 := "select * from music_user_map where id = 1"
plan1, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -1645,7 +1645,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
t.Run("Cache", func(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{})
query1 := "select * from music_user_map where id = 1"

_, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true)
Expand All @@ -1669,7 +1669,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _, ctx := createExecutorEnv(t)

unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig)

query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand All @@ -1680,12 +1680,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
assertCacheSize(t, r.plans, 1)

// the target string will be resolved and become part of the plan cache key, which adds a new entry
ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig)
getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)

// the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above
ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig)
getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)
})
Expand All @@ -1695,7 +1695,7 @@ func TestGetPlanCacheNormalized(t *testing.T) {
t.Run("Cache", func(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)
r.normalize = true
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig)

query1 := "select * from music_user_map where id = 1"
_, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */)
Expand All @@ -1712,7 +1712,7 @@ func TestGetPlanCacheNormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _, ctx := createExecutorEnv(t)
r.normalize = true
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig)

query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand All @@ -1723,12 +1723,12 @@ func TestGetPlanCacheNormalized(t *testing.T) {
assertCacheSize(t, r.plans, 1)

// the target string will be resolved and become part of the plan cache key, which adds a new entry
ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig)
getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)

// the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above
ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig)
getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)
})
Expand All @@ -1738,8 +1738,8 @@ func TestGetPlanNormalized(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

r.normalize = true
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{})
unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{})

query1 := "select * from music_user_map where id = 1"
query2 := "select * from music_user_map where id = 2"
Expand Down Expand Up @@ -1796,7 +1796,7 @@ func TestGetPlanPriority(t *testing.T) {

r.normalize = true
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil)
vCursor, err := econtext.NewVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{})
vCursor, err := econtext.NewVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{})
assert.NoError(t, err)

stmt, err := sqlparser.NewTestParser().Parse(testCase.sql)
Expand Down
43 changes: 23 additions & 20 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,20 @@ type (
}

VCursorConfig struct {
WarmingReadsPercent int
warmingReadsChannel chan bool
Collation collations.ID
Collation collations.ID

MaxMemoryRows int
EnableShardRouting bool
DefaultTabletType topodatapb.TabletType
QueryTimeout int
DBDDLPlugin string
ForeignKeyMode vschemapb.Keyspace_ForeignKeyMode
SetVarEnabled bool
EnableViews bool

MaxMemoryRows int
EnableShardRouting bool
DefaultTabletType topodatapb.TabletType
QueryTimeout int
DBDDLPlugin string
ForeignKeyMode vschemapb.Keyspace_ForeignKeyMode
SetVarEnabled bool
EnableViews bool
warmingReadsTimeout time.Duration
WarmingReadsPercent int
WarmingReadsTimeout time.Duration
WarmingReadsChannel chan bool
}

// vcursor_impl needs these facilities to be able to be able to execute queries for vindexes
Expand Down Expand Up @@ -175,6 +176,7 @@ func NewVCursorImpl(
serv srvtopo.Server,
warnShardedOnly bool,
pv plancontext.PlannerVersion,
observer ResultsObserver,
cfg VCursorConfig,
) (*VCursorImpl, error) {
keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema, cfg.DefaultTabletType)
Expand Down Expand Up @@ -218,6 +220,7 @@ func NewVCursorImpl(
topoServer: ts,
warnShardedOnly: warnShardedOnly,
pv: pv,
observer: observer,
}, nil
}

Expand All @@ -244,7 +247,9 @@ func (vc *VCursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor {
semTable: vc.semTable,
warnShardedOnly: vc.warnShardedOnly,
warnings: vc.warnings,
pv: vc.pv}
pv: vc.pv,
observer: vc.observer,
}

v.marginComments.Trailing += "/* mirror query */"

Expand All @@ -255,7 +260,7 @@ func (vc *VCursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso
callerId := callerid.EffectiveCallerIDFromContext(ctx)
immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx)

timedCtx, _ := context.WithTimeout(context.Background(), vc.config.warmingReadsTimeout) // nolint
timedCtx, _ := context.WithTimeout(context.Background(), vc.config.WarmingReadsTimeout) // nolint
clonedCtx := callerid.NewContext(timedCtx, callerId, immediateCallerId)

v := &VCursorImpl{
Expand All @@ -277,6 +282,8 @@ func (vc *VCursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso
warnShardedOnly: vc.warnShardedOnly,
warnings: vc.warnings,
pv: vc.pv,

observer: vc.observer,
}

v.marginComments.Trailing += "/* warming read */"
Expand All @@ -301,6 +308,7 @@ func (vc *VCursorImpl) cloneWithAutocommitSession() *VCursorImpl {
topoServer: vc.topoServer,
warnShardedOnly: vc.warnShardedOnly,
pv: vc.pv,
observer: vc.observer,
}
}

Expand Down Expand Up @@ -1046,11 +1054,6 @@ func (vc *VCursorImpl) SetSQLSelectLimit(limit int64) error {
return nil
}

// SetSQLSelectLimit implements the SessionActions interface
func (vc *VCursorImpl) GetSelectLimit() int {
return int(vc.SafeSession.GetOrCreateOptions().SqlSelectLimit)
}

// SetTransactionMode implements the SessionActions interface
func (vc *VCursorImpl) SetTransactionMode(mode vtgatepb.TransactionMode) {
vc.SafeSession.TransactionMode = mode
Expand Down Expand Up @@ -1536,7 +1539,7 @@ func (vc *VCursorImpl) GetWarmingReadsPercent() int {
}

func (vc *VCursorImpl) GetWarmingReadsChannel() chan bool {
return vc.config.warmingReadsChannel
return vc.config.WarmingReadsChannel
}

// UpdateForeignKeyChecksState updates the foreign key checks state of the vcursor.
Expand Down
Loading

0 comments on commit a5797ff

Please sign in to comment.