Skip to content

Commit

Permalink
use mysql execution options
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Nov 5, 2024
1 parent ed6e267 commit 6313ead
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 26 deletions.
73 changes: 73 additions & 0 deletions go/mysql/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysql

import (
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
)

// ParseResult converts the raw packets in a QueryResult to a sqltypes.Result.
func ParseResult(qr *querypb.QueryResult, wantfields bool) (*sqltypes.Result, error) {
if qr.RawPackets == nil {
return sqltypes.Proto3ToResult(qr), nil
}

var colcount int
for i, p := range qr.RawPackets {
if len(p) == 0 {
colcount = i
break
}
}

var err error
fieldArray := make([]querypb.Field, colcount)
fieldPackets := qr.RawPackets[:colcount]
rowPackets := qr.RawPackets[colcount+1:]

result := &sqltypes.Result{
RowsAffected: qr.RowsAffected,
InsertID: qr.InsertId,
SessionStateChanges: qr.SessionStateChanges,
Info: qr.Info,
Fields: make([]*querypb.Field, len(fieldPackets)),
Rows: make([]sqltypes.Row, 0, len(rowPackets)),
}

for i, fieldpkt := range fieldPackets {
result.Fields[i] = &fieldArray[i]
if wantfields {
err = parseColumnDefinition(fieldpkt, result.Fields[i], i)
} else {
err = parseColumnDefinitionType(fieldpkt, result.Fields[i], i)
}
if err != nil {
return nil, err
}
}

for _, rowpkt := range rowPackets {
r, err := parseRow(rowpkt, result.Fields, readLenEncStringAsBytes, nil)
if err != nil {
return nil, err
}
result.Rows = append(result.Rows, r)
}

return result, nil
}
3 changes: 3 additions & 0 deletions go/sqltypes/proto3.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func Proto3ToResult(qr *querypb.QueryResult) *Result {
if qr == nil {
return nil
}
if qr.RawPackets != nil {
panic("Proto3ToResult with raw mysql packets")
}
return &Result{
Fields: qr.Fields,
RowsAffected: qr.RowsAffected,
Expand Down
10 changes: 10 additions & 0 deletions go/vt/dbconnpool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func (dbc *DBConnection) ExecuteFetch(query string, maxrows int, wantfields bool
return mqr, nil
}

// ExecuteFetchOpt overwrites mysql.Conn.ExecuteFetchOpt.
func (dbc *DBConnection) ExecuteFetchOpt(query string, opt mysql.ExecuteOptions) (*sqltypes.Result, error) {
mqr, err := dbc.Conn.ExecuteFetchOpt(query, opt)
if err != nil {
dbc.handleError(err)
return nil, err
}
return mqr, nil
}

// ExecuteStreamFetch overwrites mysql.Conn.ExecuteStreamFetch.
func (dbc *DBConnection) ExecuteStreamFetch(query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error {

Expand Down
22 changes: 14 additions & 8 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ import (
"github.com/spf13/pflag"
"google.golang.org/grpc"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
queryservicepb "vitess.io/vitess/go/vt/proto/queryservice"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
)

const protocolName = "grpc"
Expand Down Expand Up @@ -117,6 +117,9 @@ func (conn *gRPCQueryClient) Execute(ctx context.Context, target *querypb.Target
if conn.cc == nil {
return nil, tabletconn.ConnClosed
}
if options != nil {
options.RawMysqlPackets = true
}

req := &querypb.ExecuteRequest{
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
Expand All @@ -134,7 +137,7 @@ func (conn *gRPCQueryClient) Execute(ctx context.Context, target *querypb.Target
if err != nil {
return nil, tabletconn.ErrorFromGRPC(err)
}
return sqltypes.Proto3ToResult(er.Result), nil
return mysql.ParseResult(er.Result, true)
}

// StreamExecute executes the query and streams results back through callback.
Expand Down Expand Up @@ -489,7 +492,8 @@ func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.T
if reply.Error != nil {
return state, nil, tabletconn.ErrorFromVTRPC(reply.Error)
}
return state, sqltypes.Proto3ToResult(reply.Result), nil
result, err = mysql.ParseResult(reply.Result, true)
return state, result, err
}

// BeginStreamExecute starts a transaction and runs an Execute.
Expand Down Expand Up @@ -889,7 +893,8 @@ func (conn *gRPCQueryClient) ReserveBeginExecute(ctx context.Context, target *qu
return state, nil, tabletconn.ErrorFromVTRPC(reply.Error)
}

return state, sqltypes.Proto3ToResult(reply.Result), nil
result, err = mysql.ParseResult(reply.Result, true)
return state, result, err
}

// ReserveBeginStreamExecute implements the queryservice interface
Expand Down Expand Up @@ -1003,7 +1008,8 @@ func (conn *gRPCQueryClient) ReserveExecute(ctx context.Context, target *querypb
return state, nil, tabletconn.ErrorFromVTRPC(reply.Error)
}

return state, sqltypes.Proto3ToResult(reply.Result), nil
result, err = mysql.ParseResult(reply.Result, true)
return state, result, err
}

// ReserveStreamExecute implements the queryservice interface
Expand Down
27 changes: 16 additions & 11 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -121,11 +122,15 @@ func (dbc *Conn) Err() error {
// Exec executes the specified query. If there is a connection error, it will reconnect
// and retry. A failed reconnect will trigger a CheckMySQL.
func (dbc *Conn) Exec(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
return dbc.ExecOpt(ctx, query, mysql.ExecuteOptions{MaxRows: maxrows, WantFields: wantfields})
}

func (dbc *Conn) ExecOpt(ctx context.Context, query string, opt mysql.ExecuteOptions) (*sqltypes.Result, error) {
span, ctx := trace.NewSpan(ctx, "DBConn.Exec")
defer span.Finish()

for attempt := 1; attempt <= 2; attempt++ {
r, err := dbc.execOnce(ctx, query, maxrows, wantfields, false)
r, err := dbc.execOnceOpt(ctx, query, opt, false)
switch {
case err == nil:
// Success.
Expand Down Expand Up @@ -159,7 +164,12 @@ func (dbc *Conn) Exec(ctx context.Context, query string, maxrows int, wantfields
panic("unreachable")
}

func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool, insideTxn bool) (*sqltypes.Result, error) {
// ExecOnce executes the specified query, but does not retry on connection errors.
func (dbc *Conn) ExecOnceOpt(ctx context.Context, query string, opt mysql.ExecuteOptions) (*sqltypes.Result, error) {
return dbc.execOnceOpt(ctx, query, opt, true /* Once means we are in a txn*/)
}

func (dbc *Conn) execOnceOpt(ctx context.Context, query string, opt mysql.ExecuteOptions, insideTxn bool) (*sqltypes.Result, error) {
dbc.current.Store(&query)
defer dbc.current.Store(nil)

Expand All @@ -179,7 +189,7 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi

ch := make(chan execResult)
go func() {
result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
result, err := dbc.conn.ExecuteFetchOpt(query, opt)
ch <- execResult{result, err}
close(ch)
}()
Expand Down Expand Up @@ -225,11 +235,6 @@ func (dbc *Conn) terminate(ctx context.Context, insideTxn bool, now time.Time) {
}
}

// ExecOnce executes the specified query, but does not retry on connection errors.
func (dbc *Conn) ExecOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
return dbc.execOnce(ctx, query, maxrows, wantfields, true /* Once means we are in a txn*/)
}

// FetchNext returns the next result set.
func (dbc *Conn) FetchNext(ctx context.Context, maxrows int, wantfields bool) (*sqltypes.Result, error) {
// Check if the context is already past its deadline before
Expand Down Expand Up @@ -412,7 +417,7 @@ func (dbc *Conn) Close() {

// ApplySetting implements the pools.Resource interface.
func (dbc *Conn) ApplySetting(ctx context.Context, setting *smartconnpool.Setting) error {
if _, err := dbc.execOnce(ctx, setting.ApplyQuery(), 1, false, false); err != nil {
if _, err := dbc.execOnceOpt(ctx, setting.ApplyQuery(), mysql.ExecuteOptions{MaxRows: 1}, false); err != nil {
return err
}
dbc.setting = setting
Expand All @@ -421,7 +426,7 @@ func (dbc *Conn) ApplySetting(ctx context.Context, setting *smartconnpool.Settin

// ResetSetting implements the pools.Resource interface.
func (dbc *Conn) ResetSetting(ctx context.Context) error {
if _, err := dbc.execOnce(ctx, dbc.setting.ResetQuery(), 1, false, false); err != nil {
if _, err := dbc.execOnceOpt(ctx, dbc.setting.ResetQuery(), mysql.ExecuteOptions{MaxRows: 1}, false); err != nil {
return err
}
dbc.setting = nil
Expand Down Expand Up @@ -601,7 +606,7 @@ func (dbc *Conn) CurrentForLogging() string {
}

func (dbc *Conn) applySameSetting(ctx context.Context) error {
_, err := dbc.execOnce(ctx, dbc.setting.ApplyQuery(), 1, false, false)
_, err := dbc.execOnceOpt(ctx, dbc.setting.ApplyQuery(), mysql.ExecuteOptions{MaxRows: 1}, false)
return err
}

Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/connpool/dbconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"
Expand Down Expand Up @@ -306,7 +307,7 @@ func TestDBConnCtxError(t *testing.T) {
}

execOnce := func(ctx context.Context, query string, dbconn *Conn) error {
_, err := dbconn.ExecOnce(ctx, query, 1, false)
_, err := dbconn.ExecOnceOpt(ctx, query, mysql.ExecuteOptions{MaxRows: 1})
return err
}

Expand Down Expand Up @@ -649,7 +650,7 @@ func TestDBConnReApplySetting(t *testing.T) {

func TestDBExecOnceKillTimeout(t *testing.T) {
executeWithTimeout(t, `kill \d+`, 150*time.Millisecond, func(ctx context.Context, dbConn *Conn) (*sqltypes.Result, error) {
return dbConn.ExecOnce(ctx, "select 1", 1, false)
return dbConn.ExecOnceOpt(ctx, "select 1", mysql.ExecuteOptions{MaxRows: 1})
})
}

Expand Down
19 changes: 17 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func allocStreamResult() *sqltypes.Result {
}

func (qre *QueryExecutor) shouldConsolidate() bool {
// TODO (harshit): This is a temporary implementation to test the feature.
// This should ideally work with consolidator.
if !qre.options.RawMysqlPackets {
return false
}
co := qre.options.GetConsolidator()
switch co {
case querypb.ExecuteOptions_CONSOLIDATOR_DISABLED:
Expand Down Expand Up @@ -1119,7 +1124,12 @@ func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields
}
defer qre.tsv.statelessql.Remove(qd)

return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
opt := mysql.ExecuteOptions{
MaxRows: int(qre.tsv.qe.maxResultSize.Load()),
WantFields: wantfields,
RawPackets: qre.options.RawMysqlPackets,
}
return conn.ExecOpt(ctx, sql, opt)
}

func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string, wantfields bool) (*sqltypes.Result, error) {
Expand All @@ -1135,7 +1145,12 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string,
}
defer qre.tsv.statefulql.Remove(qd)

return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
opt := mysql.ExecuteOptions{
MaxRows: int(qre.tsv.qe.maxResultSize.Load()),
WantFields: wantfields,
RawPackets: qre.options.RawMysqlPackets,
}
return conn.ExecOpt(ctx, sql, opt)
}

func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction bool, sql string, callback func(*sqltypes.Result) error) error {
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletserver/stateful_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -87,15 +88,20 @@ func (sc *StatefulConnection) ElapsedTimeout() bool {
return sc.expiryTime.Before(time.Now())
}

// Exec executes the statement in the dedicated connection
// Exec executes the statement in the dedicated connection.
func (sc *StatefulConnection) Exec(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
return sc.ExecOpt(ctx, query, mysql.ExecuteOptions{MaxRows: maxrows, WantFields: wantfields})
}

// ExecOpt executes the statement in the dedicated connection with the given options.
func (sc *StatefulConnection) ExecOpt(ctx context.Context, query string, opt mysql.ExecuteOptions) (*sqltypes.Result, error) {
if sc.IsClosed() {
if sc.IsInTransaction() {
return nil, vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction was aborted: %v", sc.txProps.Conclusion)
}
return nil, vterrors.New(vtrpcpb.Code_ABORTED, "connection was aborted")
}
r, err := sc.dbConn.Conn.ExecOnce(ctx, query, maxrows, wantfields)
r, err := sc.dbConn.Conn.ExecOnceOpt(ctx, query, opt)
if err != nil {
if sqlerror.IsConnErr(err) {
select {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"
"vitess.io/vitess/go/timer"
Expand Down Expand Up @@ -733,7 +734,7 @@ func (te *TxEngine) beginNewDbaConnection(ctx context.Context, settingsQuery str

// If we have a settings query that we need to apply, we do that before starting the transaction.
if settingsQuery != "" {
if _, err = dbConn.ExecOnce(ctx, settingsQuery, 1, false); err != nil {
if _, err = dbConn.ExecOnceOpt(ctx, settingsQuery, mysql.ExecuteOptions{MaxRows: 1}); err != nil {
return nil, err
}
}
Expand Down

0 comments on commit 6313ead

Please sign in to comment.