Skip to content

Commit

Permalink
feat: set query timeout on execute options and refactor query hints
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 9, 2024
1 parent 5a693ff commit 4451119
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 73 deletions.
132 changes: 75 additions & 57 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,27 +554,6 @@ func AllowScatterDirective(stmt Statement) bool {
return checkDirective(stmt, DirectiveAllowScatter)
}

// ForeignKeyChecksState returns the state of foreign_key_checks variable if it is part of a SET_VAR optimizer hint in the comments.
func ForeignKeyChecksState(stmt Statement) *bool {
cmt, ok := stmt.(Commented)
if ok {
fkChecksVal := cmt.GetParsedComments().GetMySQLSetVarValue(sysvars.ForeignKeyChecks)
// If the value of the `foreign_key_checks` optimizer hint is something that doesn't make sense,
// then MySQL just ignores it and treats it like the case, where it is unspecified. We are choosing
// to have the same behaviour here. If the value doesn't match any of the acceptable values, we return nil,
// that signifies that no value was specified.
switch strings.ToLower(fkChecksVal) {
case "on", "1":
fkState := true
return &fkState
case "off", "0":
fkState := false
return &fkState
}
}
return nil
}

func checkDirective(stmt Statement, key string) bool {
cmt, ok := stmt.(Commented)
if ok {
Expand All @@ -583,42 +562,43 @@ func checkDirective(stmt Statement, key string) bool {
return false
}

// GetPriorityFromStatement gets the priority from the provided Statement, using DirectivePriority
func GetPriorityFromStatement(statement Statement) (string, error) {
commentedStatement, ok := statement.(Commented)
// This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to
// empty priority
type QueryHints struct {
IgnoreMaxMemoryRows bool
Consolidator querypb.ExecuteOptions_Consolidator
Workload string
ForeignKeyChecks *bool
Priority string
Timeout *int
}

func BuildQueryHints(stmt Statement) (qh QueryHints, err error) {
qh = QueryHints{}

comment, ok := stmt.(Commented)
if !ok {
return "", nil
return qh, nil
}

directives := commentedStatement.GetParsedComments().Directives()
priority, ok := directives.GetString(DirectivePriority, "")
if !ok || priority == "" {
return "", nil
}
directives := comment.GetParsedComments().Directives()

intPriority, err := strconv.Atoi(priority)
if err != nil || intPriority < 0 || intPriority > MaxPriorityValue {
return "", ErrInvalidPriority
qh.Priority, err = getPriority(directives)
if err != nil {
return qh, err
}
qh.IgnoreMaxMemoryRows = directives.IsSet(DirectiveIgnoreMaxMemoryRows)
qh.Consolidator = getConsolidator(stmt, directives)
qh.Workload = getWorkload(directives)
qh.ForeignKeyChecks = getForeignKeyChecksState(comment)
qh.Timeout = getQueryTimeout(directives)

return priority, nil
return qh, nil
}

// Consolidator returns the consolidator option.
func Consolidator(stmt Statement) querypb.ExecuteOptions_Consolidator {
var comments *ParsedComments
switch stmt := stmt.(type) {
case *Select:
comments = stmt.Comments
default:
return querypb.ExecuteOptions_CONSOLIDATOR_UNSPECIFIED
}
if comments == nil {
// getConsolidator returns the consolidator option.
func getConsolidator(stmt Statement, directives *CommentDirectives) querypb.ExecuteOptions_Consolidator {
if _, isSelect := stmt.(SelectStatement); !isSelect {
return querypb.ExecuteOptions_CONSOLIDATOR_UNSPECIFIED
}
directives := comments.Directives()
strv, isSet := directives.GetString(DirectiveConsolidator, "")
if !isSet {
return querypb.ExecuteOptions_CONSOLIDATOR_UNSPECIFIED
Expand All @@ -629,18 +609,56 @@ func Consolidator(stmt Statement) querypb.ExecuteOptions_Consolidator {
return querypb.ExecuteOptions_CONSOLIDATOR_UNSPECIFIED
}

// GetWorkloadNameFromStatement gets the workload name from the provided Statement, using workloadLabel as the name of
// getWorkload gets the workload name from the provided Statement, using workloadLabel as the name of
// the query directive that specifies it.
func GetWorkloadNameFromStatement(statement Statement) string {
commentedStatement, ok := statement.(Commented)
// This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to
// empty workload name
if !ok {
return ""
func getWorkload(directives *CommentDirectives) string {
workloadName, _ := directives.GetString(DirectiveWorkloadName, "")
return workloadName
}

// getForeignKeyChecksState returns the state of foreign_key_checks variable if it is part of a SET_VAR optimizer hint in the comments.
func getForeignKeyChecksState(cmt Commented) *bool {
fkChecksVal := cmt.GetParsedComments().GetMySQLSetVarValue(sysvars.ForeignKeyChecks)
// If the value of the `foreign_key_checks` optimizer hint is something that doesn't make sense,
// then MySQL just ignores it and treats it like the case, where it is unspecified. We are choosing
// to have the same behaviour here. If the value doesn't match any of the acceptable values, we return nil,
// that signifies that no value was specified.
switch strings.ToLower(fkChecksVal) {
case "on", "1":
fkState := true
return &fkState
case "off", "0":
fkState := false
return &fkState
}
return nil
}

directives := commentedStatement.GetParsedComments().Directives()
workloadName, _ := directives.GetString(DirectiveWorkloadName, "")
// getPriority gets the priority from the provided Statement, using DirectivePriority
func getPriority(directives *CommentDirectives) (string, error) {
priority, ok := directives.GetString(DirectivePriority, "")
if !ok || priority == "" {
return "", nil
}

return workloadName
intPriority, err := strconv.Atoi(priority)
if err != nil || intPriority < 0 || intPriority > MaxPriorityValue {
return "", ErrInvalidPriority
}

return priority, nil
}

// getQueryTimeout gets the query timeout from the provided Statement, using DirectiveQueryTimeout
func getQueryTimeout(directives *CommentDirectives) *int {
timeoutString, ok := directives.GetString(DirectiveQueryTimeout, "")
if !ok || timeoutString == "" {
return nil
}

timeout, err := strconv.Atoi(timeoutString)
if err != nil || timeout < 0 {
return nil
}
return &timeout
}
47 changes: 42 additions & 5 deletions go/vt/sqlparser/comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,10 @@ func TestConsolidator(t *testing.T) {
for _, test := range testCases {
t.Run(test.query, func(t *testing.T) {
stmt, _ := parser.Parse(test.query)
got := Consolidator(stmt)
assert.Equalf(t, test.expected, got, fmt.Sprintf("Consolidator(stmt) returned %v but expected %v", got, test.expected))
qh, err := BuildQueryHints(stmt)
require.NoError(t, err)
assert.Equalf(t, test.expected, qh.Consolidator,
"Consolidator(stmt) returned %v but expected %v", qh.Consolidator, test.expected)
})
}
}
Expand Down Expand Up @@ -534,12 +536,12 @@ func TestGetPriorityFromStatement(t *testing.T) {
t.Parallel()
stmt, err := parser.Parse(testCase.query)
assert.NoError(t, err)
actualPriority, actualError := GetPriorityFromStatement(stmt)
qh, err := BuildQueryHints(stmt)
if testCase.expectedError != nil {
assert.ErrorIs(t, actualError, testCase.expectedError)
assert.ErrorIs(t, err, testCase.expectedError)
} else {
assert.NoError(t, err)
assert.Equal(t, testCase.expectedPriority, actualPriority)
assert.Equal(t, testCase.expectedPriority, qh.Priority)
}
})
}
Expand Down Expand Up @@ -661,3 +663,38 @@ func TestSetMySQLSetVarValue(t *testing.T) {
})
}
}

// TestQueryTimeout tests the extraction of Query_Timeout_MS from the comments.
func TestQueryTimeout(t *testing.T) {
testCases := []struct {
query string
expTimeout int
noTimeout bool
}{{
query: "select * from a_table",
noTimeout: true,
}, {
query: "select /*vt+ QUERY_TIMEOUT_MS=21 */ * from another_table",
expTimeout: 21,
}, {
query: "select /*vt+ QUERY_TIMEOUT_MS=0 */ * from another_table",
expTimeout: 0,
}, {
query: "select /*vt+ PRIORITY=-42 */ * from another_table",
noTimeout: true,
}}

parser := NewTestParser()
for _, tc := range testCases {
t.Run(tc.query, func(t *testing.T) {
stmt, err := parser.Parse(tc.query)
assert.NoError(t, err)
qh, _ := BuildQueryHints(stmt)
if tc.noTimeout {
assert.Nil(t, qh.Timeout)
} else {
assert.Equal(t, tc.expTimeout, *qh.Timeout)
}
})
}
}
4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type noopVCursor struct {
inTx bool
}

func (t *noopVCursor) SetExecQueryTimeout(timeout *int) {
panic("implement me")
}

// MySQLVersion implements VCursor.
func (t *noopVCursor) Commit(ctx context.Context) error {
return nil
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type (
SetConsolidator(querypb.ExecuteOptions_Consolidator)
SetWorkloadName(string)
SetPriority(string)
SetExecQueryTimeout(timeout *int)
SetFoundRows(uint64)

SetDDLStrategy(string)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (route *Route) TryExecute(ctx context.Context, vcursor VCursor, bindVars ma
// addQueryTimeout adds a query timeout to the context it receives and returns the modified context along with the cancel function.
func addQueryTimeout(ctx context.Context, vcursor VCursor, queryTimeout int) (context.Context, context.CancelFunc) {
timeout := vcursor.Session().GetQueryTimeout(queryTimeout)
if timeout != 0 {
if timeout > 0 {
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
}
return ctx, func() {}
Expand Down
13 changes: 7 additions & 6 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,15 +1105,16 @@ func (e *Executor) getPlan(
return nil, vterrors.VT13001("vschema not initialized")
}

vcursor.SetIgnoreMaxMemoryRows(sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt))
vcursor.SetConsolidator(sqlparser.Consolidator(stmt))
vcursor.SetWorkloadName(sqlparser.GetWorkloadNameFromStatement(stmt))
vcursor.UpdateForeignKeyChecksState(sqlparser.ForeignKeyChecksState(stmt))
priority, err := sqlparser.GetPriorityFromStatement(stmt)
qh, err := sqlparser.BuildQueryHints(stmt)
if err != nil {
return nil, err
}
vcursor.SetPriority(priority)
vcursor.SetIgnoreMaxMemoryRows(qh.IgnoreMaxMemoryRows)
vcursor.SetConsolidator(qh.Consolidator)
vcursor.SetWorkloadName(qh.Workload)
vcursor.UpdateForeignKeyChecksState(qh.ForeignKeyChecks)
vcursor.SetPriority(qh.Priority)
vcursor.SetExecQueryTimeout(qh.Timeout)

setVarComment, err := prepareSetVarComment(vcursor, stmt)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,8 @@ func TestGetPlanPriority(t *testing.T) {

stmt, err := sqlparser.NewTestParser().Parse(testCase.sql)
assert.NoError(t, err)
crticalityFromStatement, _ := sqlparser.GetPriorityFromStatement(stmt)
qh, _ := sqlparser.BuildQueryHints(stmt)
crticalityFromStatement := qh.Priority

_, err = r.getPlan(context.Background(), vCursor, testCase.sql, stmt, makeComments("/* some comment */"), map[string]*querypb.BindVariable{}, nil, true, logStats)
if testCase.expectedError != nil {
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@ func TestBuilder(query string, vschema plancontext.VSchema, keyspace string) (*e
// Store the foreign key mode like we do for vcursor.
vw, isVw := vschema.(*vschemawrapper.VSchemaWrapper)
if isVw {
fkState := sqlparser.ForeignKeyChecksState(stmt)
if fkState != nil {
qh, err := sqlparser.BuildQueryHints(stmt)
if err != nil {
return nil, err
}
if qh.ForeignKeyChecks != nil {
// Restore the old volue of ForeignKeyChecksState to not interfere with the next test cases.
oldVal := vw.ForeignKeyChecksState
vw.ForeignKeyChecksState = fkState
vw.ForeignKeyChecksState = qh.ForeignKeyChecks
defer func() {
vw.ForeignKeyChecksState = oldVal
}()
Expand Down
13 changes: 13 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,19 @@ func (vc *vcursorImpl) SetPriority(priority string) {
}
}

func (vc *vcursorImpl) SetExecQueryTimeout(timeout *int) {
if timeout == nil {
if vc.safeSession.GetOptions() == nil {
return
}
vc.safeSession.GetOrCreateOptions().Timeout = nil
return
}
vc.safeSession.GetOrCreateOptions().Timeout = &querypb.ExecuteOptions_AuthoritativeTimeout{
AuthoritativeTimeout: int64(*timeout),
}
}

// SetConsolidator implements the SessionActions interface
func (vc *vcursorImpl) SetConsolidator(consolidator querypb.ExecuteOptions_Consolidator) {
// Avoid creating session Options when they do not yet exist and the
Expand Down

0 comments on commit 4451119

Please sign in to comment.