diff --git a/go/test/endtoend/vtgate/queries/timeout/timeout_test.go b/go/test/endtoend/vtgate/queries/timeout/timeout_test.go index d5e116e155b..c03ffe25998 100644 --- a/go/test/endtoend/vtgate/queries/timeout/timeout_test.go +++ b/go/test/endtoend/vtgate/queries/timeout/timeout_test.go @@ -94,10 +94,41 @@ func TestQueryTimeoutWithTables(t *testing.T) { utils.Exec(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=500 */ sleep(0.1) from t1 where id1 = 1") _, err = utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=20 */ sleep(0.1) from t1 where id1 = 1") require.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") + if utils.BinaryIsAtLeastAtVersion(21, "vtgate") { + assert.ErrorContains(t, err, "Query execution was interrupted, maximum statement execution time exceeded") + } else { + assert.Contains(t, err.Error(), "context deadline exceeded") + } assert.Contains(t, err.Error(), "(errno 1317) (sqlstate 70100)") } +// TestOverallQueryTimeout tests that the query timeout is applied to the overall execution of a query +// and not just individual routes. +func TestOverallQueryTimeout(t *testing.T) { + utils.SkipIfBinaryIsBelowVersion(t, 21, "vtgate") + mcmp, closer := start(t) + defer closer() + + mcmp.Exec("insert into t1(id1, id2) values (2,2),(3,3)") + // After inserting the rows above, if we run the following query, we will end up doing join on vtgate + // that issues one select query on the left side and 2 on the right side. The queries on the right side + // take 2 and 3 seconds each to run. If we have an overall timeout for 4 seconds, then it should fail. + + _, err := utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=4000 */ sleep(u2.id2), u1.id2 from t1 u1 join t1 u2 where u1.id2 = u2.id1") + assert.Error(t, err) + assert.ErrorContains(t, err, "Query execution was interrupted, maximum statement execution time exceeded") + + // Let's also check that setting the session variable also works. + utils.Exec(t, mcmp.VtConn, "set query_timeout=4000") + _, err = utils.ExecAllowError(t, mcmp.VtConn, "select sleep(u2.id2), u1.id2 from t1 u1 join t1 u2 where u1.id2 = u2.id1") + assert.Error(t, err) + assert.ErrorContains(t, err, "Query execution was interrupted, maximum statement execution time exceeded") + + // Increasing the timeout should pass the query. + utils.Exec(t, mcmp.VtConn, "set query_timeout=10000") + _ = utils.Exec(t, mcmp.VtConn, "select sleep(u2.id2), u1.id2 from t1 u1 join t1 u2 where u1.id2 = u2.id1") +} + // TestQueryTimeoutWithShardTargeting tests the query timeout with shard targeting. func TestQueryTimeoutWithShardTargeting(t *testing.T) { mcmp, closer := start(t) diff --git a/go/test/vschemawrapper/vschema_wrapper.go b/go/test/vschemawrapper/vschema_wrapper.go index 4d1c424dda8..36d533130f2 100644 --- a/go/test/vschemawrapper/vschema_wrapper.go +++ b/go/test/vschemawrapper/vschema_wrapper.go @@ -47,6 +47,7 @@ type VSchemaWrapper struct { Dest key.Destination SysVarEnabled bool ForeignKeyChecksState *bool + SessionQueryTimeout int Version plancontext.PlannerVersion EnableViews bool TestBuilder func(query string, vschema plancontext.VSchema, keyspace string) (*engine.Plan, error) @@ -132,6 +133,13 @@ func (vw *VSchemaWrapper) Environment() *vtenv.Environment { return vw.Env } +func (vw *VSchemaWrapper) GetQueryTimeout(queryTimeoutFromComments int) int { + if queryTimeoutFromComments != 0 { + return queryTimeoutFromComments + } + return vw.SessionQueryTimeout +} + func (vw *VSchemaWrapper) PlannerWarning(_ string) { } diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index 31c98cef280..5158fe2a1a3 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -119,6 +119,8 @@ var ( VT14004 = errorWithoutState("VT14004", vtrpcpb.Code_UNAVAILABLE, "cannot find keyspace for: %s", "The specified keyspace could not be found.") VT14005 = errorWithoutState("VT14005", vtrpcpb.Code_UNAVAILABLE, "cannot lookup sidecar database for keyspace: %s", "Failed to read sidecar database identifier.") + VT15001 = errorWithoutState("VT15001", vtrpcpb.Code_DEADLINE_EXCEEDED, "Query execution was interrupted, maximum statement execution time exceeded", "Query execution was interrupted, maximum statement execution time exceeded") + // Errors is a list of errors that must match all the variables // defined above to enable auto-documentation of error codes. Errors = []func(args ...any) *VitessError{ diff --git a/go/vt/vtgate/engine/fake_primitive_test.go b/go/vt/vtgate/engine/fake_primitive_test.go index 6ab54fe9e7b..20585be7c88 100644 --- a/go/vt/vtgate/engine/fake_primitive_test.go +++ b/go/vt/vtgate/engine/fake_primitive_test.go @@ -22,6 +22,7 @@ import ( "reflect" "strings" "testing" + "time" "golang.org/x/sync/errgroup" @@ -41,6 +42,9 @@ type fakePrimitive struct { log []string + // sleepTime is the time for which the fake primitive sleeps before returning the results. + sleepTime time.Duration + allResultsInOneCall bool async bool @@ -71,6 +75,9 @@ func (f *fakePrimitive) GetTableName() string { func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields)) + if f.sleepTime != 0 { + time.Sleep(f.sleepTime) + } if f.results == nil { return nil, f.sendErr } @@ -85,6 +92,9 @@ func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVar func (f *fakePrimitive) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", printBindVars(bindVars), wantfields)) + if f.sleepTime != 0 { + time.Sleep(f.sleepTime) + } if f.results == nil { return f.sendErr } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 5458a384490..3d0bc9e35f1 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -54,7 +54,8 @@ var _ SessionActions = (*noopVCursor)(nil) // noopVCursor is used to build other vcursors. type noopVCursor struct { - inTx bool + inTx bool + queryTimeout int } // MySQLVersion implements VCursor. @@ -298,7 +299,10 @@ func (t *noopVCursor) SetQueryTimeout(maxExecutionTime int64) { } func (t *noopVCursor) GetQueryTimeout(queryTimeoutFromComments int) int { - return queryTimeoutFromComments + if queryTimeoutFromComments != 0 { + return queryTimeoutFromComments + } + return t.queryTimeout } func (t *noopVCursor) SetSkipQueryPlanCache(context.Context, bool) error { diff --git a/go/vt/vtgate/engine/timeout_handler.go b/go/vt/vtgate/engine/timeout_handler.go new file mode 100644 index 00000000000..f24ffaec130 --- /dev/null +++ b/go/vt/vtgate/engine/timeout_handler.go @@ -0,0 +1,104 @@ +package engine + +import ( + "context" + "time" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vterrors" +) + +// TimeoutHandler is a primitive that adds a timeout to the execution of a query. +type TimeoutHandler struct { + Timeout int + Input Primitive +} + +var _ Primitive = (*TimeoutHandler)(nil) + +// NewTimeoutHandler creates a new timeout handler. +func NewTimeoutHandler(input Primitive, timeout int) *TimeoutHandler { + return &TimeoutHandler{ + Timeout: timeout, + Input: input, + } +} + +// RouteType is part of the Primitive interface +func (t *TimeoutHandler) RouteType() string { + return t.Input.RouteType() +} + +// GetKeyspaceName is part of the Primitive interface +func (t *TimeoutHandler) GetKeyspaceName() string { + return t.Input.GetKeyspaceName() +} + +// GetTableName is part of the Primitive interface +func (t *TimeoutHandler) GetTableName() string { + return t.Input.GetTableName() +} + +// GetFields is part of the Primitive interface +func (t *TimeoutHandler) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return t.Input.GetFields(ctx, vcursor, bindVars) +} + +// NeedsTransaction is part of the Primitive interface +func (t *TimeoutHandler) NeedsTransaction() bool { + return t.Input.NeedsTransaction() +} + +// TryExecute is part of the Primitive interface +func (t *TimeoutHandler) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (res *sqltypes.Result, err error) { + ctx, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Millisecond) + defer cancel() + + complete := make(chan any) + go func() { + res, err = t.Input.TryExecute(ctx, vcursor, bindVars, wantfields) + close(complete) + }() + + select { + case <-ctx.Done(): + return nil, vterrors.VT15001() + case <-complete: + return res, err + } +} + +// TryStreamExecute is part of the Primitive interface +func (t *TimeoutHandler) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) (err error) { + ctx, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Millisecond) + defer cancel() + + complete := make(chan any) + go func() { + err = t.Input.TryStreamExecute(ctx, vcursor, bindVars, wantfields, callback) + close(complete) + }() + + select { + case <-ctx.Done(): + return vterrors.VT15001() + case <-complete: + return err + } +} + +// Inputs is part of the Primitive interface +func (t *TimeoutHandler) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{t.Input}, nil +} + +// description is part of the Primitive interface +func (t *TimeoutHandler) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "TimeoutHandler", + Other: map[string]any{ + "Timeout": t.Timeout, + }, + } +} diff --git a/go/vt/vtgate/engine/timeout_handler_test.go b/go/vt/vtgate/engine/timeout_handler_test.go new file mode 100644 index 00000000000..7cc4861954e --- /dev/null +++ b/go/vt/vtgate/engine/timeout_handler_test.go @@ -0,0 +1,59 @@ +package engine + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" +) + +// TestTimeoutHandler tests timeout handler primitive. +func TestTimeoutHandler(t *testing.T) { + tests := []struct { + name string + sleepTime time.Duration + timeout int + wantErr string + }{ + { + name: "Timeout without failure", + sleepTime: 100 * time.Millisecond, + timeout: 1000, + wantErr: "", + }, { + name: "Timeout with failure", + sleepTime: 2 * time.Second, + timeout: 100, + wantErr: "VT15001: Query execution was interrupted, maximum statement execution time exceeded", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := createTimeoutHandlerForTesting(tt.timeout, tt.sleepTime).TryExecute(context.Background(), &noopVCursor{}, nil, false) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + err = createTimeoutHandlerForTesting(tt.timeout, tt.sleepTime).TryStreamExecute(context.Background(), &noopVCursor{}, nil, false, func(result *sqltypes.Result) error { + return nil + }) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + }) + } +} + +// createTimeoutHandlerForTesting creates a TimeoutHandler for testing that has a fakePrimitive as an input. +func createTimeoutHandlerForTesting(timeout int, sleepTime time.Duration) *TimeoutHandler { + return NewTimeoutHandler(&fakePrimitive{ + results: nil, + sleepTime: sleepTime, + }, timeout) +} diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index 5d1d4ecd622..6bd965d4040 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -153,13 +153,7 @@ func buildRoutePlan(stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVa func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) { switch stmt := stmt.(type) { - case *sqlparser.Select, *sqlparser.Insert, *sqlparser.Update, *sqlparser.Delete: - configuredPlanner, err := getConfiguredPlanner(vschema, stmt, query) - if err != nil { - return nil, err - } - return buildRoutePlan(stmt, reservedVars, vschema, configuredPlanner) - case *sqlparser.Union: + case *sqlparser.Select, *sqlparser.Insert, *sqlparser.Update, *sqlparser.Delete, *sqlparser.Union: configuredPlanner, err := getConfiguredPlanner(vschema, stmt, query) if err != nil { return nil, err diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index f49994d37b2..bfd7f189a66 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -260,6 +260,16 @@ func (s *planTestSuite) addPKsProvided(vschema *vindexes.VSchema, ks string, tbl } } +func (s *planTestSuite) TestQueryTimeout() { + vschemaWrapper := &vschemawrapper.VSchemaWrapper{ + V: loadSchema(s.T(), "vschemas/schema.json", true), + Env: vtenv.NewTestEnv(), + SessionQueryTimeout: 200, + } + + s.testFile("query_timeout_cases.json", vschemaWrapper, false) +} + func (s *planTestSuite) TestSystemTables57() { // first we move everything to use 5.7 logic env, err := vtenv.New(vtenv.Options{ @@ -683,7 +693,7 @@ func (s *planTestSuite) testFile(filename string, vschema *vschemawrapper.VSchem if tcase.Skip { t.Skip(message) } else { - t.Errorf(message) + t.Error(message) } } else if tcase.Skip { t.Errorf("query is correct even though it is skipped:\n %s", tcase.Query) diff --git a/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go b/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go index 3ab58cba724..fb6175d0e4b 100644 --- a/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go +++ b/go/vt/vtgate/planbuilder/plancontext/planning_context_test.go @@ -280,6 +280,10 @@ func (v *vschema) Environment() *vtenv.Environment { return vtenv.NewTestEnv() } +func (v *vschema) GetQueryTimeout(queryTimeoutFromComments int) int { + return queryTimeoutFromComments +} + func (v *vschema) ErrorIfShardedF(keyspace *vindexes.Keyspace, warn, errFmt string, params ...any) error { // TODO implement me panic("implement me") diff --git a/go/vt/vtgate/planbuilder/plancontext/vschema.go b/go/vt/vtgate/planbuilder/plancontext/vschema.go index 8ac4c57bfd7..f2fa3623575 100644 --- a/go/vt/vtgate/planbuilder/plancontext/vschema.go +++ b/go/vt/vtgate/planbuilder/plancontext/vschema.go @@ -43,6 +43,7 @@ type VSchema interface { SetPlannerVersion(pv PlannerVersion) ConnCollation() collations.ID Environment() *vtenv.Environment + GetQueryTimeout(queryTimeoutFromComments int) int // ErrorIfShardedF will return an error if the keyspace is sharded, // and produce a warning if the vtgate if configured to do so diff --git a/go/vt/vtgate/planbuilder/select.go b/go/vt/vtgate/planbuilder/select.go index 6927c5315ac..6f14bb425ec 100644 --- a/go/vt/vtgate/planbuilder/select.go +++ b/go/vt/vtgate/planbuilder/select.go @@ -229,9 +229,23 @@ func newBuildSelectPlan( return nil, nil, err } + plan = handleTimeout(plan, queryTimeout(selStmt.GetParsedComments().Directives()), vschema) + return plan, operators.TablesUsed(op), nil } +// handleTimeout checks if there is a timeout that needs to be added to the query. If there is one +// then we wrap the plan in a TimeoutHandler primitive. If there is no timeout, we return the plan as is. +// Because we are accessing the query timeout stored in the session state, we have to also add this value to the plan key +// so that we don't end up using the same plan when the session variable changes. +func handleTimeout(plan engine.Primitive, queryTimeoutComment int, vschema plancontext.VSchema) engine.Primitive { + finalQueryTimeout := vschema.GetQueryTimeout(queryTimeoutComment) + if finalQueryTimeout == 0 { + return plan + } + return engine.NewTimeoutHandler(plan, finalQueryTimeout) +} + func createSelectOperator(ctx *plancontext.PlanningContext, selStmt sqlparser.SelectStatement, reservedVars *sqlparser.ReservedVars) (operators.Operator, error) { err := queryRewrite(ctx, selStmt) if err != nil { diff --git a/go/vt/vtgate/planbuilder/testdata/query_timeout_cases.json b/go/vt/vtgate/planbuilder/testdata/query_timeout_cases.json new file mode 100644 index 00000000000..a80e0d7b655 --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/query_timeout_cases.json @@ -0,0 +1,59 @@ +[ + { + "comment": "select with timeout directive sets QueryTimeout", + "query": "select /*vt+ QUERY_TIMEOUT_MS=1004 */ * from user", + "plan": { + "QueryType": "SELECT", + "Original": "select /*vt+ QUERY_TIMEOUT_MS=1004 */ * from user", + "Instructions": { + "OperatorType": "TimeoutHandler", + "Timeout": 1004, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select * from `user` where 1 != 1", + "Query": "select /*vt+ QUERY_TIMEOUT_MS=1004 */ * from `user`", + "QueryTimeout": 1004, + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, + { + "comment": "select with no timeout directive", + "query": "select * from user", + "plan": { + "QueryType": "SELECT", + "Original": "select * from user", + "Instructions": { + "OperatorType": "TimeoutHandler", + "Timeout": 200, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select * from `user` where 1 != 1", + "Query": "select * from `user`", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + } +] \ No newline at end of file diff --git a/go/vt/vtgate/planbuilder/testdata/reference_cases.json b/go/vt/vtgate/planbuilder/testdata/reference_cases.json index 6aa01355934..be28b66274d 100644 --- a/go/vt/vtgate/planbuilder/testdata/reference_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/reference_cases.json @@ -700,19 +700,19 @@ "QueryType": "SELECT", "Original": "select * from user.ref_with_source ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2", "Instructions": { - "FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1", "OperatorType": "Route", "Variant": "EqualUnique", - "Vindex": "user_index", "Keyspace": { "Name": "user", "Sharded": true }, + "FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1", "Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id", "Table": "`user`, ref_with_source", "Values": [ "2" - ] + ], + "Vindex": "user_index" }, "TablesUsed": [ "user.ref_with_source", @@ -727,19 +727,19 @@ "QueryType": "SELECT", "Original": "select * from source_of_ref ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2", "Instructions": { - "FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1", "OperatorType": "Route", "Variant": "EqualUnique", - "Vindex": "user_index", "Keyspace": { "Name": "user", "Sharded": true }, + "FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1", "Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id", "Table": "`user`, ref_with_source", "Values": [ "2" - ] + ], + "Vindex": "user_index" }, "TablesUsed": [ "user.ref_with_source", diff --git a/go/vt/vtgate/planbuilder/testdata/select_cases.json b/go/vt/vtgate/planbuilder/testdata/select_cases.json index f06a6a50d45..a9611d878a8 100644 --- a/go/vt/vtgate/planbuilder/testdata/select_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/select_cases.json @@ -72,16 +72,22 @@ "QueryType": "SELECT", "Original": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from user", "Instructions": { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select * from `user` where 1 != 1", - "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from `user`", - "QueryTimeout": 1000, - "Table": "`user`" + "OperatorType": "TimeoutHandler", + "Timeout": 1000, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select * from `user` where 1 != 1", + "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from `user`", + "QueryTimeout": 1000, + "Table": "`user`" + } + ] }, "TablesUsed": [ "user.user" @@ -95,21 +101,27 @@ "QueryType": "SELECT", "Original": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ count(*) from user", "Instructions": { - "OperatorType": "Aggregate", - "Variant": "Scalar", - "Aggregates": "sum_count_star(0) AS count(*)", + "OperatorType": "TimeoutHandler", + "Timeout": 1000, "Inputs": [ { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select count(*) from `user` where 1 != 1", - "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ count(*) from `user`", - "QueryTimeout": 1000, - "Table": "`user`" + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "sum_count_star(0) AS count(*)", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(*) from `user` where 1 != 1", + "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ count(*) from `user`", + "QueryTimeout": 1000, + "Table": "`user`" + } + ] } ] }, @@ -125,20 +137,26 @@ "QueryType": "SELECT", "Original": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from user limit 10", "Instructions": { - "OperatorType": "Limit", - "Count": "10", + "OperatorType": "TimeoutHandler", + "Timeout": 1000, "Inputs": [ { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select * from `user` where 1 != 1", - "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from `user` limit 10", - "QueryTimeout": 1000, - "Table": "`user`" + "OperatorType": "Limit", + "Count": "10", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select * from `user` where 1 != 1", + "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from `user` limit 10", + "QueryTimeout": 1000, + "Table": "`user`" + } + ] } ] }, @@ -1750,16 +1768,22 @@ "QueryType": "SELECT", "Original": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from route2", "Instructions": { - "OperatorType": "Route", - "Variant": "Unsharded", - "Keyspace": { - "Name": "main", - "Sharded": false - }, - "FieldQuery": "select * from unsharded as route2 where 1 != 1", - "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from unsharded as route2", - "QueryTimeout": 1000, - "Table": "unsharded" + "OperatorType": "TimeoutHandler", + "Timeout": 1000, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "FieldQuery": "select * from unsharded as route2 where 1 != 1", + "Query": "select /*vt+ QUERY_TIMEOUT_MS=1000 */ * from unsharded as route2", + "QueryTimeout": 1000, + "Table": "unsharded" + } + ] }, "TablesUsed": [ "main.unsharded" diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index a2055e57557..b3d816cbbbd 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -1119,6 +1119,10 @@ func (vc *vcursorImpl) keyForPlan(ctx context.Context, query string, buf io.Stri _, _ = buf.WriteString(vindexes.TabletTypeSuffix[vc.tabletType]) _, _ = buf.WriteString("+Collate:") _, _ = buf.WriteString(vc.Environment().CollationEnv().LookupName(vc.collation)) + sessionQueryTimeout := vc.GetQueryTimeout(0) + if sessionQueryTimeout != 0 { + _, _ = buf.WriteString(fmt.Sprintf("+QueryTimeout:%d", sessionQueryTimeout)) + } if vc.destination != nil { switch vc.destination.(type) { diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index b8e4a0d3a0a..dcbe32e1267 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -262,6 +262,7 @@ func TestSetTarget(t *testing.T) { func TestKeyForPlan(t *testing.T) { type testCase struct { vschema *vindexes.VSchema + sessionQueryTimeout int targetString string expectedPlanPrefixKey string } @@ -290,6 +291,11 @@ func TestKeyForPlan(t *testing.T) { vschema: vschemaWith1KS, targetString: "ks1@replica", expectedPlanPrefixKey: "ks1@replica+Collate:utf8mb4_0900_ai_ci+Query:SELECT 1", + }, { + vschema: vschemaWith1KS, + targetString: "", + sessionQueryTimeout: 100, + expectedPlanPrefixKey: "ks1@primary+Collate:utf8mb4_0900_ai_ci+QueryTimeout:100+Query:SELECT 1", }} r, _, _, _, _ := createExecutorEnv(t) @@ -297,6 +303,7 @@ func TestKeyForPlan(t *testing.T) { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { ss := NewSafeSession(&vtgatepb.Session{InTransaction: false}) ss.SetTargetString(tc.targetString) + ss.SetQueryTimeout(int64(tc.sessionQueryTimeout)) vc, err := newVCursorImpl(ss, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) require.NoError(t, err) vc.vschema = tc.vschema