Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tracing spans #133

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions go/mysql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/trace"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttls"
Expand All @@ -47,6 +48,9 @@ type connectResult struct {
// FIXME(alainjobart) once we have more of a server side, add test cases
// to cover all failure scenarios.
func Connect(ctx context.Context, params *ConnParams) (*Conn, error) {
span, ctx := trace.NewSpan(ctx, "mysql.Connect")
defer span.Finish()

if params.ConnectTimeoutMs != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(params.ConnectTimeoutMs)*time.Millisecond)
Expand Down Expand Up @@ -115,7 +119,7 @@ func Connect(ctx context.Context, params *ConnParams) (*Conn, error) {
// make any read or write just return with an error
// right away.
status <- connectResult{
err: c.clientHandshake(params),
err: c.clientHandshake(ctx, params),
}
}()

Expand Down Expand Up @@ -197,19 +201,27 @@ func (c *Conn) Ping() error {
// clientHandshake handles the client side of the handshake.
// Note the connection can be closed while this is running.
// Returns a SQLError.
func (c *Conn) clientHandshake(params *ConnParams) error {
func (c *Conn) clientHandshake(ctx context.Context, params *ConnParams) error {
span, ctx := trace.NewSpan(ctx, "Conn.clientHandshake")
defer span.Finish()

// if EnableQueryInfo is set, make sure that all queries starting with the handshake
// will actually process the INFO fields in QUERY_OK packets
if params.EnableQueryInfo {
c.enableQueryInfo = true
}

// Wait for the server initial handshake packet, and parse it.
spanReadPacket, ctx := trace.NewSpan(ctx, "Conn.clientHandshake.readPackage")
data, err := c.readPacket()
spanReadPacket.Finish()
if err != nil {
return NewSQLError(CRServerLost, "", "initial packet read failed: %v", err)
}

spanParseHandshakePakcet, ctx := trace.NewSpan(ctx, "Conn.clientHandshake.parseInitialHandshakePacket")
capabilities, salt, err := c.parseInitialHandshakePacket(data)
spanParseHandshakePakcet.Finish()
if err != nil {
return err
}
Expand All @@ -228,13 +240,16 @@ func (c *Conn) clientHandshake(params *ConnParams) error {
c.Capabilities = capabilities & (CapabilityClientDeprecateEOF)
}

spanParseConnCharset, ctx := trace.NewSpan(ctx, "Conn.clientHandshake.ParseConnectionCharset")
charset, err := collations.Local().ParseConnectionCharset(params.Charset)
spanParseConnCharset.Finish()
if err != nil {
return err
}

// Handle switch to SSL if necessary.
if params.SslEnabled() {
spanSslEnabled, _ := trace.NewSpan(ctx, "Conn.clientHandshake.spanSslEnabled")
// If client asked for SSL, but server doesn't support it,
// stop right here.
if params.SslRequired() && capabilities&CapabilityClientSSL == 0 {
Expand Down Expand Up @@ -278,6 +293,8 @@ func (c *Conn) clientHandshake(params *ConnParams) error {
c.conn = conn
c.bufferedReader.Reset(conn)
c.Capabilities |= CapabilityClientSSL

spanSslEnabled.Finish()
}

// Password encryption.
Expand Down Expand Up @@ -313,6 +330,8 @@ func (c *Conn) clientHandshake(params *ConnParams) error {
// If the server didn't support DbName in its handshake, set
// it now. This is what the 'mysql' client does.
if capabilities&CapabilityClientConnectWithDB == 0 && params.DbName != "" {
spanWriteDbName, _ := trace.NewSpan(ctx, "Conn.clientHandshake.spanWriteDbName")
defer spanWriteDbName.Finish()
// Write the packet.
if err := c.writeComInitDB(params.DbName); err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -252,6 +253,9 @@ func (rp *ResourcePool) reopen() {
// it will wait till the next resource becomes available or a timeout.
// A timeout of 0 is an indefinite wait.
func (rp *ResourcePool) Get(ctx context.Context, setting *Setting) (resource Resource, err error) {
span, ctx := trace.NewSpan(ctx, "ResourcePool.Get")
defer span.Finish()

// If ctx has already expired, avoid racing with rp's resource channel.
if ctx.Err() != nil {
return nil, ErrCtxTimeout
Expand All @@ -263,6 +267,9 @@ func (rp *ResourcePool) Get(ctx context.Context, setting *Setting) (resource Res
}

func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error) {
span, ctx := trace.NewSpan(ctx, "ResourcePool.get")
defer span.Finish()

rp.getCount.Add(1)
// Fetch
var wrapper resourceWrapper
Expand Down Expand Up @@ -295,6 +302,7 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error)

// if the resource has setting applied, we will close it and return a new one
if wrapper.resource != nil && wrapper.resource.IsSettingApplied() {
spanClose, ctx := trace.NewSpan(ctx, "ResourcePool.settingAppliedClose")
rp.resetSettingCount.Add(1)
err = wrapper.resource.ResetSetting(ctx)
if err != nil {
Expand All @@ -303,16 +311,19 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error)
wrapper.resource = nil
rp.active.Add(-1)
}
spanClose.Finish()
}

// Unwrap
if wrapper.resource == nil {
spanFactory, ctx := trace.NewSpan(ctx, "ResourcePool.wrapperNil")
wrapper.resource, err = rp.factory(ctx)
if err != nil {
rp.resources <- resourceWrapper{}
return nil, err
}
rp.active.Add(1)
spanFactory.Finish()
}
if rp.available.Add(-1) <= 0 {
rp.exhausted.Add(1)
Expand All @@ -322,6 +333,9 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error)
}

func (rp *ResourcePool) getWithSettings(ctx context.Context, setting *Setting) (Resource, error) {
span, ctx := trace.NewSpan(ctx, "ResourcePool.getWithSettings")
defer span.Finish()

rp.getSettingCount.Add(1)
var wrapper resourceWrapper
var ok bool
Expand Down
4 changes: 4 additions & 0 deletions go/vt/dbconfigs/dbconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttls"

Expand Down Expand Up @@ -172,6 +173,9 @@ func New(mcp *mysql.ConnParams) Connector {

// Connect will invoke the mysql.connect method and return a connection
func (c *Connector) Connect(ctx context.Context) (*mysql.Conn, error) {
span, ctx := trace.NewSpan(ctx, "Connector.Connect")
defer span.Finish()

params, err := c.MysqlParams()
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions go/vt/dbconnpool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/dbconfigs"
)

Expand All @@ -35,6 +36,9 @@ type DBConnection struct {
// NewDBConnection returns a new DBConnection based on the ConnParams
// and will use the provided stats to collect timing.
func NewDBConnection(ctx context.Context, info dbconfigs.Connector) (*DBConnection, error) {
span, ctx := trace.NewSpan(ctx, "connection.NewDBConnection")
defer span.Finish()

c, err := info.Connect(ctx)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -54,6 +55,8 @@ func (l *Limit) GetTableName() string {

// TryExecute satisfies the Primitive interface.
func (l *Limit) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
span, ctx := trace.NewSpan(ctx, "Limit.TryExecute")
defer span.Finish()
count, offset, err := l.getCountAndOffset(vcursor, bindVars)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -173,6 +174,9 @@ func (route *Route) SetTruncateColumnCount(count int) {

// TryExecute performs a non-streaming exec.
func (route *Route) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
span, ctx := trace.NewSpan(ctx, "Route.executePlan")
defer span.Finish()

if route.QueryTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(route.QueryTimeout)*time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/engine/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
querypb "vitess.io/vitess/go/vt/proto/query"
)

Expand Down Expand Up @@ -55,7 +56,9 @@ func (r *Rows) GetTableName() string {
}

// TryExecute implements the Primitive interface
func (r *Rows) TryExecute(context.Context, VCursor, map[string]*querypb.BindVariable, bool) (*sqltypes.Result, error) {
func (r *Rows) TryExecute(ctx context.Context, v VCursor, vars map[string]*querypb.BindVariable, wantFields bool) (*sqltypes.Result, error) {
span, _ := trace.NewSpan(ctx, "Rows.executePlan")
defer span.Finish()
return &sqltypes.Result{
Fields: r.fields,
InsertID: 0,
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/key"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -86,6 +87,8 @@ func (s *Send) GetTableName() string {

// TryExecute implements Primitive interface
func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
span, ctx := trace.NewSpan(ctx, "Send.TryExecute")
defer span.Finish()
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func saveSessionStats(safeSession *SafeSession, stmtType sqlparser.StatementType
}

func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) (sqlparser.StatementType, *sqltypes.Result, error) {
span, ctx := trace.NewSpan(ctx, "Executor.execute")
defer span.Finish()

var err error
var qr *sqltypes.Result
var stmtType sqlparser.StatementType
Expand Down Expand Up @@ -968,6 +971,9 @@ type iQueryOption interface {
// getPlan computes the plan for the given query. If one is in
// the cache, it reuses it.
func (e *Executor) getPlan(ctx context.Context, vcursor *vcursorImpl, sql string, comments sqlparser.MarginComments, bindVars map[string]*querypb.BindVariable, qo iQueryOption, logStats *logstats.LogStats) (*engine.Plan, error) {
span, ctx := trace.NewSpan(ctx, "Executor.getPlan")
defer span.Finish()

if e.VSchema() == nil {
return nil, errors.New("vschema not initialized")
}
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/vtgate/logstats"

"vitess.io/vitess/go/sqltypes"
Expand All @@ -42,6 +43,9 @@ func (e *Executor) newExecute(
execPlan planExec, // used when there is a plan to execute
recResult txResult, // used when it's something simple like begin/commit/rollback/savepoint
) error {
span, ctx := trace.NewSpan(ctx, "Executor.newExecute")
defer span.Finish()

// 1: Prepare before planning and execution

// Start an implicit transaction if necessary.
Expand Down Expand Up @@ -105,6 +109,8 @@ func (e *Executor) newExecute(

// handleTransactions deals with transactional queries: begin, commit, rollback and savepoint management
func (e *Executor) handleTransactions(ctx context.Context, safeSession *SafeSession, plan *engine.Plan, logStats *logstats.LogStats, vcursor *vcursorImpl) (*sqltypes.Result, error) {
span, ctx := trace.NewSpan(ctx, "Executor.handleTransactions")
defer span.Finish()
// We need to explicitly handle errors, and begin/commit/rollback, since these control transactions. Everything else
// will fall through and be handled through planning
switch plan.Type {
Expand Down Expand Up @@ -200,6 +206,9 @@ func (e *Executor) executePlan(
execStart time.Time,
) (*sqltypes.Result, error) {

span, ctx := trace.NewSpan(ctx, "Executor.executePlan")
defer span.Finish()

// 4: Execute!
qr, err := vcursor.ExecutePrimitive(ctx, plan.Instructions, bindVars, true)

Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"sync/atomic"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/vtgate/logstats"

"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
Expand Down Expand Up @@ -392,6 +393,8 @@ func (vc *vcursorImpl) TargetString() string {
const MaxBufferingRetries = 3

func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
span, ctx := trace.NewSpan(ctx, "vcursorImpl.ExecutePrimitive")
defer span.Finish()
for try := 0; try < MaxBufferingRetries; try++ {
res, err := primitive.TryExecute(ctx, vc, bindVars, wantfields)
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -43,6 +44,9 @@ var _ queryservicepb.QueryServer = (*query)(nil)

// Execute is part of the queryservice.QueryServer interface
func (q *query) Execute(ctx context.Context, request *querypb.ExecuteRequest) (response *querypb.ExecuteResponse, err error) {
span, ctx := trace.NewSpan(ctx, "query.Execute")
defer span.Finish()

defer q.server.HandlePanic(&err)
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.EffectiveCallerId,
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -177,6 +178,9 @@ func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.T
}

func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (qr *sqltypes.Result, err error) {
span, ctx := trace.NewSpan(ctx, "wrappedService.Execute")
defer span.Finish()

inDedicatedConn := transactionID != 0 || reservedID != 0
err = ws.wrapper(ctx, target, ws.impl, "Execute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"

Expand Down Expand Up @@ -126,6 +127,9 @@ func (tm *TabletManager) ExecuteFetchAsApp(ctx context.Context, req *tabletmanag

// ExecuteQuery submits a new online DDL request
func (tm *TabletManager) ExecuteQuery(ctx context.Context, req *tabletmanagerdatapb.ExecuteQueryRequest) (*querypb.QueryResult, error) {
span, ctx := trace.NewSpan(ctx, "TabletManager.ExecuteQuery")
defer span.Finish()

// get the db name from the tablet
tablet := tm.Tablet()
target := &querypb.Target{Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type}
Expand Down
Loading
Loading