From a5797ff83f0c814b474106fd091ecfe92126e643 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 3 Dec 2024 17:36:36 +0530 Subject: [PATCH] fix: all executor test related to vcursor Signed-off-by: Harshit Gangal --- go/vt/vtgate/executor.go | 59 ++++++++++++++----- go/vt/vtgate/executor_select_test.go | 7 +-- go/vt/vtgate/executor_test.go | 34 +++++------ go/vt/vtgate/executorcontext/vcursor_impl.go | 43 +++++++------- .../executorcontext/vcursor_impl_test.go | 43 ++++++++++---- go/vt/vtgate/plan_execute.go | 2 +- 6 files changed, 119 insertions(+), 69 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index a47b0e9d746..062ae7b8ceb 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -30,6 +30,8 @@ import ( "github.com/spf13/pflag" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/acl" "vitess.io/vitess/go/cache/theine" "vitess.io/vitess/go/mysql/capabilities" @@ -135,6 +137,8 @@ type Executor struct { warmingReadsPercent int warmingReadsChannel chan bool + + vConfig econtext.VCursorConfig } var executorOnce sync.Once @@ -167,7 +171,6 @@ func NewExecutor( pv plancontext.PlannerVersion, warmingReadsPercent int, ) *Executor { - warnings.Add("WarnUnshardedOnly", 1) e := &Executor{ env: env, serv: serv, @@ -185,6 +188,8 @@ func NewExecutor( warmingReadsPercent: warmingReadsPercent, warmingReadsChannel: make(chan bool, warmingReadsConcurrency), } + // setting the vcursor config. + e.initVConfig() vschemaacl.Init() // we subscribe to update from the VSchemaManager @@ -1132,7 +1137,7 @@ func (e *Executor) getPlan( bindVars, parameterize, vcursor.GetKeyspace(), - vcursor.GetSelectLimit(), + vcursor.SafeSession.GetSelectLimit(), setVarComment, vcursor.GetSystemVariablesCopy(), vcursor.GetForeignKeyChecksState(), @@ -1404,29 +1409,40 @@ func (e *Executor) prepare(ctx context.Context, safeSession *econtext.SafeSessio return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unrecognized prepare statement: %s", sql) } -func (e *Executor) getVCursorConfig() econtext.VCursorConfig { +func (e *Executor) initVConfig() { connCollation := collations.Unknown if gw, isTabletGw := e.resolver.resolver.GetGateway().(*TabletGateway); isTabletGw { connCollation = gw.DefaultConnCollation() } - return econtext.VCursorConfig{ - WarmingReadsPercent: warmingReadsPercent, - Collation: connCollation, - MaxMemoryRows: 0, - EnableShardRouting: false, - DefaultTabletType: 0, - QueryTimeout: 0, - DBDDLPlugin: "", - ForeignKeyMode: 0, - SetVarEnabled: false, - EnableViews: false, + if connCollation == collations.Unknown { + connCollation = e.env.CollationEnv().DefaultConnectionCharset() + } + + e.vConfig = econtext.VCursorConfig{ + Collation: connCollation, + DefaultTabletType: defaultTabletType, + + QueryTimeout: queryTimeout, + MaxMemoryRows: maxMemoryRows, + + SetVarEnabled: sysVarSetEnabled, + EnableViews: enableViews, + ForeignKeyMode: fkMode(foreignKeyMode), + EnableShardRouting: enableShardRouting, + + DBDDLPlugin: dbDDLPlugin, + + WarmingReadsPercent: e.warmingReadsPercent, + WarmingReadsTimeout: warmingReadsQueryTimeout, + WarmingReadsChannel: e.warmingReadsChannel, } } func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) { query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, e.getVCursorConfig()) + vcursor, _ := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, + nullResultsObserver{}, e.vConfig) stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser()) if err != nil { @@ -1662,3 +1678,16 @@ type ( func (nullErrorTransformer) TransformError(err error) error { return err } + +func fkMode(foreignkey string) vschemapb.Keyspace_ForeignKeyMode { + switch foreignkey { + case "disallow": + return vschemapb.Keyspace_disallow + case "managed": + return vschemapb.Keyspace_managed + case "unmanaged": + return vschemapb.Keyspace_unmanaged + + } + return vschemapb.Keyspace_unspecified +} diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 9fd112dbdb4..ff893d807c1 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -201,11 +201,7 @@ func TestSystemVariablesMySQLBelow80(t *testing.T) { func TestSystemVariablesWithSetVarDisabled(t *testing.T) { executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "8.0.0") executor.normalize = true - - setVarEnabled = false - defer func() { - setVarEnabled = true - }() + executor.vConfig.SetVarEnabled = false session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) sbc1.SetResults([]*sqltypes.Result{{ @@ -4370,6 +4366,7 @@ func TestWarmingReads(t *testing.T) { // waitUntilQueryCount waits until the number of queries run on the tablet reach the specified count. func waitUntilQueryCount(t *testing.T, tab *sandboxconn.SandboxConn, count int) { + t.Helper() timeout := time.After(1 * time.Second) for { select { diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index ddb0134a366..ba81acb354c 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1299,13 +1299,13 @@ func TestExecutorDDLFk(t *testing.T) { } for _, stmt := range stmts { - for _, fkMode := range []string{"allow", "disallow"} { - t.Run(stmt+fkMode, func(t *testing.T) { + for _, mode := range []string{"allow", "disallow"} { + t.Run(stmt+mode, func(t *testing.T) { executor, _, _, sbc, ctx := createExecutorEnv(t) sbc.ExecCount.Store(0) - foreignKeyMode = fkMode + executor.vConfig.ForeignKeyMode = fkMode(mode) _, err := executor.Execute(ctx, nil, mName, econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}), stmt, nil) - if fkMode == "allow" { + if mode == "allow" { require.NoError(t, err) require.EqualValues(t, 1, sbc.ExecCount.Load()) } else { @@ -1561,8 +1561,8 @@ var pv = querypb.ExecuteOptions_Gen4 func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{}) + unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{}) query1 := "select * from music_user_map where id = 1" plan1, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1645,7 +1645,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{}) query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true) @@ -1669,7 +1669,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _, ctx := createExecutorEnv(t) - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig) query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1680,12 +1680,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig) getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig) getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1695,7 +1695,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig) query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */) @@ -1712,7 +1712,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig) query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1723,12 +1723,12 @@ func TestGetPlanCacheNormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + ksIDVc1, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig) getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + ksIDVc2, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, r.vConfig) getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1738,8 +1738,8 @@ func TestGetPlanNormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) - unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{}) + unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{}) query1 := "select * from music_user_map where id = 1" query2 := "select * from music_user_map where id = 2" @@ -1796,7 +1796,7 @@ func TestGetPlanPriority(t *testing.T) { r.normalize = true logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) - vCursor, err := econtext.NewVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, econtext.VCursorConfig{}) + vCursor, err := econtext.NewVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, nullResultsObserver{}, econtext.VCursorConfig{}) assert.NoError(t, err) stmt, err := sqlparser.NewTestParser().Parse(testCase.sql) diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index e652930e51c..47061484ab5 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -73,19 +73,20 @@ type ( } VCursorConfig struct { - WarmingReadsPercent int - warmingReadsChannel chan bool - Collation collations.ID + Collation collations.ID + + MaxMemoryRows int + EnableShardRouting bool + DefaultTabletType topodatapb.TabletType + QueryTimeout int + DBDDLPlugin string + ForeignKeyMode vschemapb.Keyspace_ForeignKeyMode + SetVarEnabled bool + EnableViews bool - MaxMemoryRows int - EnableShardRouting bool - DefaultTabletType topodatapb.TabletType - QueryTimeout int - DBDDLPlugin string - ForeignKeyMode vschemapb.Keyspace_ForeignKeyMode - SetVarEnabled bool - EnableViews bool - warmingReadsTimeout time.Duration + WarmingReadsPercent int + WarmingReadsTimeout time.Duration + WarmingReadsChannel chan bool } // vcursor_impl needs these facilities to be able to be able to execute queries for vindexes @@ -175,6 +176,7 @@ func NewVCursorImpl( serv srvtopo.Server, warnShardedOnly bool, pv plancontext.PlannerVersion, + observer ResultsObserver, cfg VCursorConfig, ) (*VCursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema, cfg.DefaultTabletType) @@ -218,6 +220,7 @@ func NewVCursorImpl( topoServer: ts, warnShardedOnly: warnShardedOnly, pv: pv, + observer: observer, }, nil } @@ -244,7 +247,9 @@ func (vc *VCursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { semTable: vc.semTable, warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, - pv: vc.pv} + pv: vc.pv, + observer: vc.observer, + } v.marginComments.Trailing += "/* mirror query */" @@ -255,7 +260,7 @@ func (vc *VCursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso callerId := callerid.EffectiveCallerIDFromContext(ctx) immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx) - timedCtx, _ := context.WithTimeout(context.Background(), vc.config.warmingReadsTimeout) // nolint + timedCtx, _ := context.WithTimeout(context.Background(), vc.config.WarmingReadsTimeout) // nolint clonedCtx := callerid.NewContext(timedCtx, callerId, immediateCallerId) v := &VCursorImpl{ @@ -277,6 +282,8 @@ func (vc *VCursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, + + observer: vc.observer, } v.marginComments.Trailing += "/* warming read */" @@ -301,6 +308,7 @@ func (vc *VCursorImpl) cloneWithAutocommitSession() *VCursorImpl { topoServer: vc.topoServer, warnShardedOnly: vc.warnShardedOnly, pv: vc.pv, + observer: vc.observer, } } @@ -1046,11 +1054,6 @@ func (vc *VCursorImpl) SetSQLSelectLimit(limit int64) error { return nil } -// SetSQLSelectLimit implements the SessionActions interface -func (vc *VCursorImpl) GetSelectLimit() int { - return int(vc.SafeSession.GetOrCreateOptions().SqlSelectLimit) -} - // SetTransactionMode implements the SessionActions interface func (vc *VCursorImpl) SetTransactionMode(mode vtgatepb.TransactionMode) { vc.SafeSession.TransactionMode = mode @@ -1536,7 +1539,7 @@ func (vc *VCursorImpl) GetWarmingReadsPercent() int { } func (vc *VCursorImpl) GetWarmingReadsChannel() chan bool { - return vc.config.warmingReadsChannel + return vc.config.WarmingReadsChannel } // UpdateForeignKeyChecksState updates the foreign key checks state of the vcursor. diff --git a/go/vt/vtgate/executorcontext/vcursor_impl_test.go b/go/vt/vtgate/executorcontext/vcursor_impl_test.go index 82cbcbbee86..f1c24e5c422 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl_test.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl_test.go @@ -169,9 +169,11 @@ func TestDestinationKeyspace(t *testing.T) { for i, tc := range tests { t.Run(strconv.Itoa(i)+tc.targetString, func(t *testing.T) { session := NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}) - impl, _ := NewVCursorImpl(session, sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{ - DefaultTabletType: topodatapb.TabletType_PRIMARY, - }) + impl, _ := NewVCursorImpl(session, sqlparser.MarginComments{}, nil, nil, + &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, + querypb.ExecuteOptions_Gen4, fakeObserver{}, VCursorConfig{ + DefaultTabletType: topodatapb.TabletType_PRIMARY, + }) impl.vschema = tc.vschema dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) if tc.expectedError == "" { @@ -234,7 +236,9 @@ func TestSetTarget(t *testing.T) { for i, tc := range tests { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { cfg := VCursorConfig{DefaultTabletType: topodatapb.TabletType_PRIMARY} - vc, _ := NewVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, cfg) + vc, _ := NewVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, + nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, + false, querypb.ExecuteOptions_Gen4, fakeObserver{}, cfg) vc.vschema = tc.vschema err := vc.SetTarget(tc.targetString) if tc.expectedError == "" { @@ -288,7 +292,10 @@ func TestKeyForPlan(t *testing.T) { Collation: collations.CollationUtf8mb4ID, DefaultTabletType: topodatapb.TabletType_PRIMARY, } - vc, err := NewVCursorImpl(ss, sqlparser.MarginComments{}, &fakeExecutor{}, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, cfg) + vc, err := NewVCursorImpl(ss, sqlparser.MarginComments{}, &fakeExecutor{}, nil, + &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, + srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, false, + querypb.ExecuteOptions_Gen4, fakeObserver{}, cfg) require.NoError(t, err) vc.vschema = tc.vschema @@ -311,7 +318,10 @@ func TestFirstSortedKeyspace(t *testing.T) { }, } - vc, err := NewVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{}) + vc, err := NewVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, + &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, + srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, false, + querypb.ExecuteOptions_Gen4, fakeObserver{}, VCursorConfig{}) require.NoError(t, err) ks, err := vc.FirstSortedKeyspace() require.NoError(t, err) @@ -322,10 +332,12 @@ func TestFirstSortedKeyspace(t *testing.T) { // Validates the timeout value is set based on override rule. func TestSetExecQueryTimeout(t *testing.T) { safeSession := NewSafeSession(nil) - vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, nil, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{ - // flag timeout - QueryTimeout: 20, - }) + vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, nil, nil, + &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4, + fakeObserver{}, VCursorConfig{ + // flag timeout + QueryTimeout: 20, + }) require.NoError(t, err) vc.SetExecQueryTimeout(nil) @@ -366,7 +378,9 @@ func TestSetExecQueryTimeout(t *testing.T) { func TestRecordMirrorStats(t *testing.T) { safeSession := NewSafeSession(nil) logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil) - vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, logStats, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{}) + vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, logStats, nil, + &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4, fakeObserver{}, + VCursorConfig{}) require.NoError(t, err) require.Zero(t, logStats.MirrorSourceExecuteTime) @@ -482,3 +496,10 @@ func (f fakeExecutor) AddWarningCount(name string, value int64) { } var _ iExecute = (*fakeExecutor)(nil) + +type fakeObserver struct{} + +func (f fakeObserver) Observe(*sqltypes.Result) { +} + +var _ ResultsObserver = (*fakeObserver)(nil) diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index f344f922be7..4d127759040 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -119,7 +119,7 @@ func (e *Executor) newExecute( } } - vcursor, err := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, e.getVCursorConfig()) + vcursor, err := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, nullResultsObserver{}, e.vConfig) if err != nil { return err }