Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Jan 24, 2024
1 parent 3d554ee commit b83858e
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 76 deletions.
19 changes: 19 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sqlparser

import (
"fmt"
"strconv"
"strings"
"unicode"
Expand Down Expand Up @@ -418,3 +419,21 @@ func GetWorkloadNameFromStatement(statement Statement) string {

return workloadName
}

func AddMysqlOptimizerHintsComment(query string, hints map[string]any) string {
hintsSlice := make([]string, 0, len(hints))
for hint, val := range hints {
hintsSlice = append(hintsSlice, fmt.Sprintf("%s(%v)", hint, val))
}
if len(hintsSlice) > 0 {
// MySQL optimizer hints must come immediately after the 1st
// field/verb, which should always be "select" or "SELECT".
fields := strings.SplitN(query, " ", 2)
return strings.Join([]string{
fields[0],
"/*+ " + strings.Join(hintsSlice, " ") + " */",
fields[1],
}, " ")
}
return query
}
73 changes: 43 additions & 30 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -39,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
p "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
eschema "vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
Expand All @@ -65,8 +67,7 @@ type QueryExecutor struct {
}

const (
streamRowsSize = 256
queryTimeoutMysqlMaxWait = time.Second
streamRowsSize = 256
)

var (
Expand Down Expand Up @@ -99,6 +100,15 @@ func allocStreamResult() *sqltypes.Result {
return streamResultPool.Get().(*sqltypes.Result)
}

func (qre *QueryExecutor) isSelect() bool {
switch qre.plan.PlanID {
case planbuilder.PlanSelect, planbuilder.PlanSelectImpossible:
return true
default:
return false
}
}

func (qre *QueryExecutor) shouldConsolidate() bool {
co := qre.options.GetConsolidator()
switch co {
Expand Down Expand Up @@ -822,7 +832,6 @@ func (qre *QueryExecutor) generateFinalSQL(parsedQuery *sqlparser.ParsedQuery, b
if err != nil {
return "", "", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s", err)
}
query = addMysqlOptimizerHintsToQuery(qre.tsv.config, qre.plan.PlanID, query)
if qre.tsv.config.AnnotateQueries {
username := callerid.GetPrincipal(callerid.EffectiveCallerIDFromContext(qre.ctx))
if username == "" {
Expand All @@ -844,42 +853,46 @@ func (qre *QueryExecutor) generateFinalSQL(parsedQuery *sqlparser.ParsedQuery, b
return query, query, nil
}

var mysqlOptimizerHints string
if qre.isSelect() {
mysqlOptimizerHints = buildMysqlOptimizerHints(qre.tsv)
}

var buf strings.Builder
buf.Grow(len(qre.marginComments.Leading) + len(query) + len(qre.marginComments.Trailing))
buf.WriteString(qre.marginComments.Leading)
buf.WriteString(query)
buf.WriteString(qre.marginComments.Trailing)
return buf.String(), query, nil
}
if mysqlOptimizerHints != "" {
fields := strings.SplitN(query, " ", 2)
queryPrefix := fields[0] + " "
queryNonPrefix := " " + fields[1]

func addMysqlOptimizerHintsToQuery(config *tabletenv.TabletConfig, planType p.PlanType, query string) string {
if planType != p.PlanSelect {
return query
buf.Grow(len(qre.marginComments.Leading) + len(queryPrefix) + len(mysqlOptimizerHints) + len(queryNonPrefix) + len(qre.marginComments.Trailing))
buf.WriteString(qre.marginComments.Leading)
buf.WriteString(queryPrefix)
buf.WriteString(mysqlOptimizerHints)
buf.WriteString(queryNonPrefix)
buf.WriteString(qre.marginComments.Trailing)
} else {
buf.Grow(len(qre.marginComments.Leading) + len(query) + len(qre.marginComments.Trailing))
buf.WriteString(qre.marginComments.Leading)
buf.WriteString(query)
buf.WriteString(qre.marginComments.Trailing)
}
return buf.String(), query, nil
}

hints := make([]string, 0)

switch config.Oltp.QueryTimeoutMethod.String() {
case tabletenv.QueryTimeoutMethodMysql:
func buildMysqlOptimizerHints(tsv *TabletServer) string {
var buf strings.Builder
if tsv.config.Oltp.QueryTimeoutPushdown {
// The MAX_EXECUTION_TIME(N) hint sets a statement execution timeout of N milliseconds.
// https://dev.mysql.com/doc/refman/8.0/en/optimizer-hints.html#optimizer-hints-execution-time
hints = append(hints,
fmt.Sprintf("MAX_EXECUTION_TIME(%d)", config.Oltp.QueryTimeoutSeconds.Get().Milliseconds()),
)
queryTimeoutStr := strconv.FormatInt(tsv.loadQueryTimeout(), 64)
buf.Grow(len(queryTimeoutStr))
buf.WriteString(queryTimeoutStr)
}

if len(hints) > 0 {
// MySQL optimizer hints must come immediately after the 1st
// field/verb, which should always be "select" or "SELECT".
fields := strings.SplitN(query, " ", 2)
return strings.Join([]string{
fields[0],
"/*+ " + strings.Join(hints, " ") + " */",
fields[1],
}, " ")
if len(optimizerHints) == 0 {
return ""
}

return query
return "/*+ " + strings.Join(optimizerHints, " ") + " */"
}

func rewriteOUTParamError(err error) error {
Expand Down
39 changes: 16 additions & 23 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
Expand All @@ -37,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/callinfo/fakecallinfo"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/tableacl"
"vitess.io/vitess/go/vt/tableacl/simpleacl"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -1795,28 +1795,21 @@ func TestQueryExecSchemaReloadCount(t *testing.T) {
}
}

func TestAddMysqlOptimizerHintsToQuery(t *testing.T) {
config := tabletenv.NewDefaultConfig()
{
assert.Equal(t,
`select * from something`,
addMysqlOptimizerHintsToQuery(config, planbuilder.PlanSelect, "select * from something"),
)
}
{
config.Oltp.QueryTimeoutMethod.Set(tabletenv.QueryTimeoutMethodMysql)
config.Oltp.QueryTimeoutSeconds = flagutil.NewDeprecatedFloat64Seconds(t.Name(), time.Second)
assert.Equal(t,
`select /*+ MAX_EXECUTION_TIME(1000) */ * from something`,
addMysqlOptimizerHintsToQuery(config, planbuilder.PlanSelect, "select * from something"),
)
}
{
assert.Equal(t,
`insert into something (id, value) values(1, 2)`,
addMysqlOptimizerHintsToQuery(config, planbuilder.PlanInsert, "insert into something (id, value) values(1, 2)"),
)
}
func TestGenerateFinalSQL(t *testing.T) {
// generateFinalSQL(parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable)
db := setUpQueryExecutorTest(t)
defer db.Close()
tsv := newTestTabletServer(context.Background(), noFlags, db)
defer tsv.StopService()

qre := newTestQueryExecutor(context.Background(), tsv, `select * from something`, 0)
query, noComments, err := qre.generateFinalSQL(
&sqlparser.ParsedQuery{Query: `select * from something`},
map[string]*querypb.BindVariable{},
)
assert.Nil(t, err)
assert.Equal(t, `select * from something`, query)
t.Logf("noComments: %s", noComments)
}

type mockTxThrottler struct {
Expand Down
29 changes: 14 additions & 15 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ import (

// These constants represent values for various config parameters.
const (
Enable = "enable"
Disable = "disable"
Dryrun = "dryRun"
NotOnPrimary = "notOnPrimary"
Polling = "polling"
Heartbeat = "heartbeat"
QueryTimeoutMethodVttablet = "vttablet"
QueryTimeoutMethodMysql = "mysql"
Enable = "enable"
Disable = "disable"
Dryrun = "dryRun"
NotOnPrimary = "notOnPrimary"
Polling = "polling"
Heartbeat = "heartbeat"
)

var (
Expand Down Expand Up @@ -229,7 +227,8 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.BoolVar(&currentConfig.EnableViews, "queryserver-enable-views", false, "Enable views support in vttablet.")

fs.BoolVar(&currentConfig.EnablePerWorkloadTableMetrics, "enable-per-workload-table-metrics", defaultConfig.EnablePerWorkloadTableMetrics, "If true, query counts and query error metrics include a label that identifies the workload")
fs.Var(currentConfig.Oltp.QueryTimeoutMethod, "query-timeout-method", "The preferred method to timeout/kill MySQL queries, options: 'vttablet' and 'mysql'. 'vttablet' issues a MySQL KILL operation, 'mysql' pushes the kill to MySQL with a fallback to a KILL.")
fs.BoolVar(&currentConfig.Oltp.QueryTimeoutPushdown, "query-timeout-pushdown", false, "Attempt to push-down timing-out of queries to MySQL with a fallback to a MySQL KILL operation.")
fs.DurationVar(&currentConfig.Oltp.QueryTimeoutPushdownWait, "query-timeout-pushdown-wait", time.Second, "Max time to wait for MySQL to kill a query before sending a fallback KILL operation. Requires --query-timeout-pushdown")
}

var (
Expand Down Expand Up @@ -465,11 +464,12 @@ func (cfg *OlapConfig) MarshalJSON() ([]byte, error) {

// OltpConfig contains the config for oltp settings.
type OltpConfig struct {
QueryTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"queryTimeoutSeconds,omitempty"`
QueryTimeoutMethod *flagutil.StringEnum `json:"queryTimeoutMethod,omitempty"`
TxTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"txTimeoutSeconds,omitempty"`
MaxRows int `json:"maxRows,omitempty"`
WarnRows int `json:"warnRows,omitempty"`
QueryTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"queryTimeoutSeconds,omitempty"`
QueryTimeoutPushdown bool `json:"queryTimeoutPushdown,omitempty"`
QueryTimeoutPushdownWait time.Duration `json:"queryTimeoutPushdownWait,omitempty"`
TxTimeoutSeconds flagutil.DeprecatedFloat64Seconds `json:"txTimeoutSeconds,omitempty"`
MaxRows int `json:"maxRows,omitempty"`
WarnRows int `json:"warnRows,omitempty"`
}

func (cfg *OltpConfig) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -775,7 +775,6 @@ var defaultConfig = TabletConfig{
},
Oltp: OltpConfig{
QueryTimeoutSeconds: flagutil.NewDeprecatedFloat64Seconds("queryserver-config-query-timeout", 30*time.Second),
QueryTimeoutMethod: flagutil.NewStringEnum("query-timeout-method", QueryTimeoutMethodVttablet, []string{QueryTimeoutMethodVttablet, QueryTimeoutMethodMysql}),
TxTimeoutSeconds: flagutil.NewDeprecatedFloat64Seconds("queryserver-config-transaction-timeout", 30*time.Second),
MaxRows: 10000,
},
Expand Down
20 changes: 12 additions & 8 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,7 @@ func NewTabletServer(ctx context.Context, name string, config *tabletenv.TabletC
topoServer: topoServer,
alias: alias.CloneVT(),
}

queryTimeoutNanos := config.Oltp.QueryTimeoutSeconds.Get().Nanoseconds()
switch config.Oltp.QueryTimeoutMethod.String() {
case tabletenv.QueryTimeoutMethodMysql:
queryTimeoutNanos = queryTimeoutNanos + queryTimeoutMysqlMaxWait.Nanoseconds()
}
tsv.QueryTimeout.Store(queryTimeoutNanos)
tsv.QueryTimeout.Store(config.Oltp.QueryTimeoutSeconds.Get().Nanoseconds())

tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(ctx, topoServer, "TabletSrvTopo") })

Expand Down Expand Up @@ -242,6 +236,13 @@ func (tsv *TabletServer) loadQueryTimeout() time.Duration {
return time.Duration(tsv.QueryTimeout.Load())
}

func (tsv *TabletServer) loadQueryTimeoutWithPushdownWait() time.Duration {
if tsv.config.Oltp.QueryTimeoutPushdown {
return tsv.loadQueryTimeout() + tsv.config.Oltp.QueryTimeoutPushdownWait
}
return tsv.loadQueryTimeout()
}

// onlineDDLExecutorToggleTableBuffer is called by onlineDDLExecutor as a callback function. onlineDDLExecutor
// uses it to start/stop query buffering for a given table.
// It is onlineDDLExecutor's responsibility to make sure beffering is stopped after some definite amount of time.
Expand Down Expand Up @@ -494,7 +495,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti
func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, savepointQueries []string, reservedID int64, settings []string, options *querypb.ExecuteOptions) (state queryservice.TransactionState, err error) {
state.TabletAlias = tsv.alias
err = tsv.execRequest(
ctx, tsv.loadQueryTimeout(),
ctx, tsv.loadQueryTimeoutWithPushdownWait(),
"Begin", "begin", nil,
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
Expand Down Expand Up @@ -766,6 +767,9 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, reservedID int64, settings []string, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) {
allowOnShutdown := false
timeout := tsv.loadQueryTimeout()
if tsv.config.Oltp.QueryTimeoutPushdown {
return timeout + tsv.config.Oltp.QueryTimeoutPushdownWait
}
if transactionID != 0 {
allowOnShutdown = true
// Execute calls happen for OLTP only, so we can directly fetch the
Expand Down

0 comments on commit b83858e

Please sign in to comment.