Skip to content

Commit

Permalink
Account for the amount of shards we have hit from the routes
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui authored and systay committed Sep 19, 2024
1 parent 56c39b2 commit 6f6d33e
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 26 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (t *noopVCursor) UnresolvedTransactions(ctx context.Context, keyspace strin
panic("implement me")
}

func (t *noopVCursor) StartPrimitiveTrace() func() map[Primitive]RowsReceived {
func (t *noopVCursor) StartPrimitiveTrace() func() Stats {
panic("implement me")
}

Expand Down
26 changes: 19 additions & 7 deletions go/vt/vtgate/engine/plan_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type PrimitiveDescription struct {
InputName string
Inputs []PrimitiveDescription

Stats RowsReceived
RowsReceived RowsReceived
ShardsQueried *ShardsQueried
}

// MarshalJSON serializes the PlanDescription into a JSON representation.
Expand Down Expand Up @@ -92,15 +93,20 @@ func (pd PrimitiveDescription) MarshalJSON() ([]byte, error) {
return nil, err
}
}
if len(pd.Stats) > 0 {
if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.Stats)); err != nil {
if len(pd.RowsReceived) > 0 {
if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.RowsReceived)); err != nil {
return nil, err
}

if err := marshalAdd(prepend, buf, "AvgNumberOfRows", average(pd.Stats)); err != nil {
if err := marshalAdd(prepend, buf, "AvgNumberOfRows", average(pd.RowsReceived)); err != nil {
return nil, err
}
if err := marshalAdd(prepend, buf, "MedianNumberOfRows", median(pd.Stats)); err != nil {
if err := marshalAdd(prepend, buf, "MedianNumberOfRows", median(pd.RowsReceived)); err != nil {
return nil, err
}
}
if pd.ShardsQueried != nil {
if err := marshalAdd(prepend, buf, "ShardsQueried", pd.ShardsQueried); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -219,10 +225,16 @@ func marshalAdd(prepend string, buf *bytes.Buffer, name string, obj any) error {

// PrimitiveToPlanDescription transforms a primitive tree into a corresponding PlanDescription tree
// If stats is not nil, it will be used to populate the stats field of the PlanDescription
func PrimitiveToPlanDescription(in Primitive, stats map[Primitive]RowsReceived) PrimitiveDescription {
func PrimitiveToPlanDescription(in Primitive, stats *Stats) PrimitiveDescription {
this := in.description()
if stats != nil {
this.Stats = stats[in]
this.RowsReceived = stats.InterOpStats[in]

// Only applies to Route primitive
v, ok := stats.ShardsStats[in]
if ok {
this.ShardsQueried = &v
}
}

inputs, infos := in.Inputs()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type (

// StartPrimitiveTrace starts a trace for the given primitive,
// and returns a function to get the trace logs after the primitive execution.
StartPrimitiveTrace() func() map[Primitive]RowsReceived
StartPrimitiveTrace() func() Stats
}

// SessionActions gives primitives ability to interact with the session state
Expand Down
19 changes: 13 additions & 6 deletions go/vt/vtgate/engine/vexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ type (
Type sqlparser.VExplainType
}

RowsReceived []int
ShardsQueried int
RowsReceived []int

Stats struct {
InterOpStats map[Primitive]RowsReceived
ShardsStats map[Primitive]ShardsQueried
}
)

var _ Primitive = (*VExplain)(nil)
Expand Down Expand Up @@ -111,7 +117,7 @@ func (v *VExplain) NeedsTransaction() bool {

// TryExecute implements the Primitive interface
func (v *VExplain) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
var stats func() map[Primitive]RowsReceived
var stats func() Stats
if v.Type == sqlparser.TraceVExplainType {
stats = vcursor.StartPrimitiveTrace()
} else {
Expand All @@ -130,7 +136,7 @@ func noOpCallback(*sqltypes.Result) error {

// TryStreamExecute implements the Primitive interface
func (v *VExplain) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var stats func() map[Primitive]RowsReceived
var stats func() Stats
if v.Type == sqlparser.TraceVExplainType {
stats = vcursor.StartPrimitiveTrace()
} else {
Expand All @@ -148,7 +154,7 @@ func (v *VExplain) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVa
return callback(result)
}

func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats func() map[Primitive]RowsReceived) (*sqltypes.Result, error) {
func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats func() Stats) (*sqltypes.Result, error) {
switch v.Type {
case sqlparser.QueriesVExplainType:
result := convertToVExplainQueriesResult(vcursor.Session().GetVExplainLogs())
Expand All @@ -163,8 +169,9 @@ func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats f
}
}

func (v *VExplain) getExplainTraceOutput(getOpStats func() map[Primitive]RowsReceived) (*sqltypes.Result, error) {
description := PrimitiveToPlanDescription(v.Input, getOpStats())
func (v *VExplain) getExplainTraceOutput(getOpStats func() Stats) (*sqltypes.Result, error) {
stats := getOpStats()
description := PrimitiveToPlanDescription(v.Input, &stats)

output, err := json.MarshalIndent(description, "", "\t")
if err != nil {
Expand Down
33 changes: 22 additions & 11 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type (

// this is a map of the number of rows that every primitive has returned
// if this field is nil, it means that we are not logging operator traffic
primitiveStats map[engine.Primitive]engine.RowsReceived
interOpStats map[engine.Primitive]engine.RowsReceived
shardsStats map[engine.Primitive]engine.ShardsQueried
}
)

Expand Down Expand Up @@ -285,10 +286,14 @@ func (vc *vcursorImpl) UnresolvedTransactions(ctx context.Context, keyspace stri
return vc.executor.UnresolvedTransactions(ctx, targets)
}

func (vc *vcursorImpl) StartPrimitiveTrace() func() map[engine.Primitive]engine.RowsReceived {
vc.primitiveStats = make(map[engine.Primitive]engine.RowsReceived)
return func() map[engine.Primitive]engine.RowsReceived {
return vc.primitiveStats
func (vc *vcursorImpl) StartPrimitiveTrace() func() engine.Stats {
vc.interOpStats = make(map[engine.Primitive]engine.RowsReceived)
vc.shardsStats = make(map[engine.Primitive]engine.ShardsQueried)
return func() engine.Stats {
return engine.Stats{
InterOpStats: vc.interOpStats,
ShardsStats: vc.shardsStats,
}
}
}

Expand Down Expand Up @@ -532,14 +537,20 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr
}

func (vc *vcursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) {
if vc.primitiveStats != nil {
rows := vc.primitiveStats[primitive]
if vc.interOpStats != nil {
rows := vc.interOpStats[primitive]
if res == nil {
rows = append(rows, 0)
} else {
rows = append(rows, len(res.Rows))
}
vc.primitiveStats[primitive] = rows
vc.interOpStats[primitive] = rows
}
}

func (vc *vcursorImpl) logShardsQueried(primitive engine.Primitive, shardsNb int) {
if vc.shardsStats != nil {
vc.shardsStats[primitive] += engine.ShardsQueried(shardsNb)
}
}

Expand All @@ -558,7 +569,7 @@ func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive
}

func (vc *vcursorImpl) wrapCallback(callback func(*sqltypes.Result) error, primitive engine.Primitive) func(*sqltypes.Result) error {
if vc.primitiveStats == nil {
if vc.interOpStats == nil {
return callback
}

Expand Down Expand Up @@ -650,7 +661,7 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P

qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
vc.logOpTraffic(primitive, qr)
vc.logShardsQueried(primitive, len(rss))
return qr, errs
}

Expand Down Expand Up @@ -689,7 +700,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P
// The autocommit flag is always set to false because we currently don't
// execute DMLs through ExecuteStandalone.
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.logOpTraffic(primitive, qr)
vc.logShardsQueried(primitive, len(rss))
return qr, vterrors.Aggregate(errs)
}

Expand Down

0 comments on commit 6f6d33e

Please sign in to comment.