Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query serving: incorporate mirror query results in log stats, fix mirror query max lag bug #16879

Merged
merged 2 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,10 @@ type loggingVCursor struct {

parser *sqlparser.Parser

handleMirrorClonesFn func(context.Context) VCursor
onMirrorClonesFn func(context.Context) VCursor
onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool)
onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error)
onRecordMirrorStatsFn func(time.Duration, time.Duration, error)
}

func (f *loggingVCursor) HasCreatedTempTable() {
Expand Down Expand Up @@ -564,8 +565,8 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor {
}

func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor {
if f.handleMirrorClonesFn != nil {
return f.handleMirrorClonesFn(ctx)
if f.onMirrorClonesFn != nil {
return f.onMirrorClonesFn(ctx)
}
panic("no mirror clones available")
}
Expand Down Expand Up @@ -886,6 +887,12 @@ func (t *loggingVCursor) SQLParser() *sqlparser.Parser {
return t.parser
}

func (t *loggingVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
if t.onRecordMirrorStatsFn != nil {
t.onRecordMirrorStatsFn(sourceExecTime, targetExecTime, targetErr)
}
}

func (t *noopVCursor) VExplainLogging() {}
func (t *noopVCursor) DisableLogging() {}
func (t *noopVCursor) GetVExplainLogs() []ExecuteEntry {
Expand All @@ -896,6 +903,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) {
return nil, nil
}

// RecordMirrorStats implements VCursor.
func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
}

func expectResult(t *testing.T, result, want *sqltypes.Result) {
t.Helper()
fieldsResult := fmt.Sprintf("%v", result.Fields)
Expand Down
87 changes: 65 additions & 22 deletions go/vt/vtgate/engine/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

var errMirrorTargetQueryTookTooLong = vterrors.Errorf(vtrpc.Code_ABORTED, "Mirror target query took too long")

type (
// percentBasedMirror represents the instructions to execute an
// authoritative primitive and, based on whether a die-roll exceeds a
Expand All @@ -34,6 +38,11 @@ type (
primitive Primitive
target Primitive
}

mirrorResult struct {
execTime time.Duration
err error
}
)

const (
Expand Down Expand Up @@ -74,26 +83,44 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi
return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
}

mirrorCh := make(chan any)
mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag)
mirrorCh := make(chan mirrorResult, 1)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

go func() {
defer close(mirrorCh)
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
// TODO(maxeng) handle error.
_, _ = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
targetStartTime := time.Now()
_, targetErr := mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
mirrorCh <- mirrorResult{
execTime: time.Since(targetStartTime),
err: targetErr,
}
}()

var (
sourceExecTime, targetExecTime time.Duration
targetErr error
)

sourceStartTime := time.Now()
r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case <-mirrorCh:
// Mirroring completed within the allowed time.
case <-mirrorCtx.Done():
// Mirroring took too long and was canceled.
case r := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = r.execTime
targetErr = r.err
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
targetErr = errMirrorTargetQueryTookTooLong
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)

return r, err
}

Expand All @@ -102,30 +129,46 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs
return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
}

mirrorCh := make(chan any)
mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag)
mirrorCh := make(chan mirrorResult, 1)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

go func() {
defer close(mirrorCh)
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
// TODO(maxeng) handle error.
_ = mirrorVCursor.StreamExecutePrimitive(
mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result,
) error {
return nil
})
mirrorStartTime := time.Now()
targetErr := mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error {
return nil
})
mirrorCh <- mirrorResult{
execTime: time.Since(mirrorStartTime),
err: targetErr,
}
}()

var (
sourceExecTime, targetExecTime time.Duration
targetErr error
)

sourceStartTime := time.Now()
err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case <-mirrorCh:
// Mirroring completed within the allowed time.
case <-mirrorCtx.Done():
// Mirroring took too long and was canceled.
case r := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = r.execTime
targetErr = r.err
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
targetErr = errMirrorTargetQueryTookTooLong
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)

return err
}

Expand Down
Loading
Loading