Skip to content

Commit

Permalink
feat: only add timeout handler when required and use session query ti…
Browse files Browse the repository at this point in the history
…meout in cache key

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 21, 2024
1 parent c4585d7 commit 38a6a10
Show file tree
Hide file tree
Showing 38 changed files with 20,932 additions and 25,281 deletions.
8 changes: 8 additions & 0 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type VSchemaWrapper struct {
Dest key.Destination
SysVarEnabled bool
ForeignKeyChecksState *bool
SessionQueryTimeout int
Version plancontext.PlannerVersion
EnableViews bool
TestBuilder func(query string, vschema plancontext.VSchema, keyspace string) (*engine.Plan, error)
Expand Down Expand Up @@ -132,6 +133,13 @@ func (vw *VSchemaWrapper) Environment() *vtenv.Environment {
return vw.Env
}

func (vw *VSchemaWrapper) GetQueryTimeout(queryTimeoutFromComments int) int {
if queryTimeoutFromComments != 0 {
return queryTimeoutFromComments
}
return vw.SessionQueryTimeout
}

func (vw *VSchemaWrapper) PlannerWarning(_ string) {
}

Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/engine/timeout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"time"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -51,7 +52,7 @@ func (t *TimeoutHandler) NeedsTransaction() bool {

// TryExecute is part of the Primitive interface
func (t *TimeoutHandler) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (res *sqltypes.Result, err error) {
ctx, cancel := addQueryTimeout(ctx, vcursor, t.Timeout)
ctx, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Millisecond)
defer cancel()

complete := make(chan any)
Expand All @@ -70,7 +71,7 @@ func (t *TimeoutHandler) TryExecute(ctx context.Context, vcursor VCursor, bindVa

// TryStreamExecute is part of the Primitive interface
func (t *TimeoutHandler) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) (err error) {
ctx, cancel := addQueryTimeout(ctx, vcursor, t.Timeout)
ctx, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Millisecond)
defer cancel()

complete := make(chan any)
Expand Down
37 changes: 3 additions & 34 deletions go/vt/vtgate/engine/timeout_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,33 @@ func TestTimeoutHandler(t *testing.T) {
tests := []struct {
name string
input *TimeoutHandler
vc VCursor
wantErr string
}{
{
name: "No timeout",
input: NewTimeoutHandler(&fakePrimitive{
results: nil,
sleepTime: 100 * time.Millisecond,
}, 0),
vc: &noopVCursor{},
wantErr: "",
}, {
name: "Timeout without failure",
input: NewTimeoutHandler(&fakePrimitive{
results: nil,
sleepTime: 100 * time.Millisecond,
}, 1000),
vc: &noopVCursor{},
wantErr: "",
}, {
name: "Timeout in session",
input: NewTimeoutHandler(&fakePrimitive{
results: nil,
sleepTime: 2 * time.Second,
}, 0),
vc: &noopVCursor{
queryTimeout: 100,
},
wantErr: "VT15001: Query execution was interrupted, maximum statement execution time exceeded",
}, {
name: "Timeout in comments",
input: NewTimeoutHandler(&fakePrimitive{
results: nil,
sleepTime: 2 * time.Second,
}, 100),
vc: &noopVCursor{},
wantErr: "VT15001: Query execution was interrupted, maximum statement execution time exceeded",
}, {
name: "Timeout in both",
name: "Timeout with failure",
input: NewTimeoutHandler(&fakePrimitive{
results: nil,
sleepTime: 2 * time.Second,
}, 100),
vc: &noopVCursor{
queryTimeout: 4000,
},
wantErr: "VT15001: Query execution was interrupted, maximum statement execution time exceeded",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := tt.input.TryExecute(context.Background(), tt.vc, nil, false)
_, err := tt.input.TryExecute(context.Background(), &noopVCursor{}, nil, false)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
err = tt.input.TryStreamExecute(context.Background(), tt.vc, nil, false, func(result *sqltypes.Result) error {
err = tt.input.TryStreamExecute(context.Background(), &noopVCursor{}, nil, false, func(result *sqltypes.Result) error {
return nil
})
if tt.wantErr != "" {
Expand Down
12 changes: 11 additions & 1 deletion go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ func (s *planTestSuite) addPKsProvided(vschema *vindexes.VSchema, ks string, tbl
}
}

func (s *planTestSuite) TestQueryTimeout() {
vschemaWrapper := &vschemawrapper.VSchemaWrapper{
V: loadSchema(s.T(), "vschemas/schema.json", true),
Env: vtenv.NewTestEnv(),
SessionQueryTimeout: 200,
}

s.testFile("query_timeout_cases.json", vschemaWrapper, false)
}

func (s *planTestSuite) TestSystemTables57() {
// first we move everything to use 5.7 logic
env, err := vtenv.New(vtenv.Options{
Expand Down Expand Up @@ -683,7 +693,7 @@ func (s *planTestSuite) testFile(filename string, vschema *vschemawrapper.VSchem
if tcase.Skip {
t.Skip(message)
} else {
t.Errorf(message)
t.Error(message)
}
} else if tcase.Skip {
t.Errorf("query is correct even though it is skipped:\n %s", tcase.Query)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/plancontext/planning_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ func (v *vschema) Environment() *vtenv.Environment {
return vtenv.NewTestEnv()
}

func (v *vschema) GetQueryTimeout(queryTimeoutFromComments int) int {
return queryTimeoutFromComments
}

func (v *vschema) ErrorIfShardedF(keyspace *vindexes.Keyspace, warn, errFmt string, params ...any) error {
// TODO implement me
panic("implement me")
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/planbuilder/plancontext/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type VSchema interface {
SetPlannerVersion(pv PlannerVersion)
ConnCollation() collations.ID
Environment() *vtenv.Environment
GetQueryTimeout(queryTimeoutFromComments int) int

// ErrorIfShardedF will return an error if the keyspace is sharded,
// and produce a warning if the vtgate if configured to do so
Expand Down
14 changes: 13 additions & 1 deletion go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,23 @@ func newBuildSelectPlan(
return nil, nil, err
}

plan = engine.NewTimeoutHandler(plan, queryTimeout(selStmt.GetParsedComments().Directives()))
plan = handleTimeout(plan, queryTimeout(selStmt.GetParsedComments().Directives()), vschema)

return plan, operators.TablesUsed(op), nil
}

// handleTimeout checks if there is a timeout that needs to be added to the query. If there is one
// then we wrap the plan in a TimeoutHandler primitive. If there is no timeout, we return the plan as is.
// Because we are accessing the query timeout stored in the session state, we have to also add this value to the plan key
// so that we don't end up using the same plan when the session variable changes.
func handleTimeout(plan engine.Primitive, queryTimeoutComment int, vschema plancontext.VSchema) engine.Primitive {
finalQueryTimeout := vschema.GetQueryTimeout(queryTimeoutComment)
if finalQueryTimeout == 0 {
return plan
}
return engine.NewTimeoutHandler(plan, finalQueryTimeout)
}

func createSelectOperator(ctx *plancontext.PlanningContext, selStmt sqlparser.SelectStatement, reservedVars *sqlparser.ReservedVars) (operators.Operator, error) {
err := queryRewrite(ctx, selStmt)
if err != nil {
Expand Down
Loading

0 comments on commit 38a6a10

Please sign in to comment.