Skip to content

Commit

Permalink
fix bug, add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander committed Oct 2, 2024
1 parent 978c59d commit b3d21ea
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 63 deletions.
17 changes: 14 additions & 3 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,10 @@ type loggingVCursor struct {

parser *sqlparser.Parser

handleMirrorClonesFn func(context.Context) VCursor
onMirrorClonesFn func(context.Context) VCursor
onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool)
onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error)
onRecordMirrorStatsFn func(time.Duration, time.Duration, error)
}

func (f *loggingVCursor) HasCreatedTempTable() {
Expand Down Expand Up @@ -564,8 +565,8 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor {
}

func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor {
if f.handleMirrorClonesFn != nil {
return f.handleMirrorClonesFn(ctx)
if f.onMirrorClonesFn != nil {
return f.onMirrorClonesFn(ctx)
}
panic("no mirror clones available")
}
Expand Down Expand Up @@ -886,6 +887,12 @@ func (t *loggingVCursor) SQLParser() *sqlparser.Parser {
return t.parser
}

func (t *loggingVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
if t.onRecordMirrorStatsFn != nil {
t.onRecordMirrorStatsFn(sourceExecTime, targetExecTime, targetErr)
}
}

func (t *noopVCursor) VExplainLogging() {}
func (t *noopVCursor) DisableLogging() {}
func (t *noopVCursor) GetVExplainLogs() []ExecuteEntry {
Expand All @@ -896,6 +903,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) {
return nil, nil
}

// RecordMirrorStats implements VCursor.
func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
}

func expectResult(t *testing.T, result, want *sqltypes.Result) {
t.Helper()
fieldsResult := fmt.Sprintf("%v", result.Fields)
Expand Down
70 changes: 48 additions & 22 deletions go/vt/vtgate/engine/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,40 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi
return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
}

mirrorCh := make(chan any)
mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag)
mirrorCh := make(chan time.Duration)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

var (
sourceExecTime time.Duration
targetExecTime time.Duration
targetErr error
)

go func() {
defer close(mirrorCh)
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
// TODO(maxeng) handle error.
_, _ = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
targetStartTime := time.Now()
_, targetErr = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
mirrorCh <- time.Since(targetStartTime)
}()

sourceStartTime := time.Now()
r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case <-mirrorCh:
// Mirroring completed within the allowed time.
case <-mirrorCtx.Done():
// Mirroring took too long and was canceled.
case d := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = d
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)

return r, err
}

Expand All @@ -102,30 +116,42 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs
return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
}

mirrorCh := make(chan any)
mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag)
mirrorCh := make(chan time.Duration)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

var (
sourceExecTime time.Duration
targetExecTime time.Duration
targetErr error
)

go func() {
defer close(mirrorCh)
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
// TODO(maxeng) handle error.
_ = mirrorVCursor.StreamExecutePrimitive(
mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result,
) error {
return nil
})
mirrorStartTime := time.Now()
targetErr = mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error {
return nil
})
mirrorCh <- time.Since(mirrorStartTime)
}()

sourceStartTime := time.Now()
err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case <-mirrorCh:
// Mirroring completed within the allowed time.
case <-mirrorCtx.Done():
// Mirroring took too long and was canceled.
case d := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = d
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)

return err
}

Expand Down
Loading

0 comments on commit b3d21ea

Please sign in to comment.