Skip to content

Commit

Permalink
feat: add timeout handler executor
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 20, 2024
1 parent 687ac40 commit d6b9361
Show file tree
Hide file tree
Showing 31 changed files with 25,339 additions and 20,784 deletions.
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ var (
VT14004 = errorWithoutState("VT14004", vtrpcpb.Code_UNAVAILABLE, "cannot find keyspace for: %s", "The specified keyspace could not be found.")
VT14005 = errorWithoutState("VT14005", vtrpcpb.Code_UNAVAILABLE, "cannot lookup sidecar database for keyspace: %s", "Failed to read sidecar database identifier.")

VT15001 = errorWithoutState("VT15001", vtrpcpb.Code_DEADLINE_EXCEEDED, "Query execution was interrupted, maximum statement execution time exceeded", "Query execution was interrupted, maximum statement execution time exceeded")

// Errors is a list of errors that must match all the variables
// defined above to enable auto-documentation of error codes.
Errors = []func(args ...any) *VitessError{
Expand Down
103 changes: 103 additions & 0 deletions go/vt/vtgate/engine/timeout_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package engine

import (
"context"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vterrors"
)

// TimeoutHandler is a primitive that adds a timeout to the execution of a query.
type TimeoutHandler struct {
Timeout int
Input Primitive
}

var _ Primitive = (*TimeoutHandler)(nil)

// NewTimeoutHandler creates a new timeout handler.
func NewTimeoutHandler(input Primitive, timeout int) *TimeoutHandler {
return &TimeoutHandler{
Timeout: timeout,
Input: input,
}
}

// RouteType is part of the Primitive interface
func (t *TimeoutHandler) RouteType() string {
return t.Input.RouteType()
}

// GetKeyspaceName is part of the Primitive interface
func (t *TimeoutHandler) GetKeyspaceName() string {
return t.Input.GetKeyspaceName()
}

// GetTableName is part of the Primitive interface
func (t *TimeoutHandler) GetTableName() string {
return t.Input.GetTableName()
}

// GetFields is part of the Primitive interface
func (t *TimeoutHandler) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return t.Input.GetFields(ctx, vcursor, bindVars)
}

// NeedsTransaction is part of the Primitive interface
func (t *TimeoutHandler) NeedsTransaction() bool {
return t.Input.NeedsTransaction()
}

// TryExecute is part of the Primitive interface
func (t *TimeoutHandler) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (res *sqltypes.Result, err error) {
ctx, cancel := addQueryTimeout(ctx, vcursor, t.Timeout)
defer cancel()

var complete chan any
go func() {
res, err = t.Input.TryExecute(ctx, vcursor, bindVars, wantfields)
close(complete)
}()

select {
case <-ctx.Done():
return nil, vterrors.VT15001()
case <-complete:
return res, err
}
}

// TryStreamExecute is part of the Primitive interface
func (t *TimeoutHandler) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) (err error) {
ctx, cancel := addQueryTimeout(ctx, vcursor, t.Timeout)
defer cancel()

var complete chan any
go func() {
err = t.Input.TryStreamExecute(ctx, vcursor, bindVars, wantfields, callback)
close(complete)
}()

select {
case <-ctx.Done():
return vterrors.VT15001()
case <-complete:
return err
}
}

// Inputs is part of the Primitive interface
func (t *TimeoutHandler) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{t.Input}, nil
}

// description is part of the Primitive interface
func (t *TimeoutHandler) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "TimeoutHandler",
Other: map[string]any{
"Timeout": t.Timeout,
},
}
}
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func newBuildSelectPlan(
return nil, nil, err
}

plan = engine.NewTimeoutHandler(plan, queryTimeout(selStmt.GetParsedComments().Directives()))

return plan, operators.TablesUsed(op), nil
}

Expand Down
Loading

0 comments on commit d6b9361

Please sign in to comment.