Skip to content

Commit

Permalink
fix race, add logstats RecordMirrorStats test
Browse files Browse the repository at this point in the history
Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander committed Oct 2, 2024
1 parent b3d21ea commit aff7830
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 38 deletions.
61 changes: 39 additions & 22 deletions go/vt/vtgate/engine/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

var errMirrorTargetQueryTookTooLong = vterrors.Errorf(vtrpc.Code_ABORTED, "Mirror target query took too long")

type (
// percentBasedMirror represents the instructions to execute an
// authoritative primitive and, based on whether a die-roll exceeds a
Expand All @@ -34,6 +38,11 @@ type (
primitive Primitive
target Primitive
}

mirrorResult struct {
execTime time.Duration
err error
}
)

const (
Expand Down Expand Up @@ -74,36 +83,40 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi
return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
}

mirrorCh := make(chan time.Duration)
mirrorCh := make(chan mirrorResult, 1)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

var (
sourceExecTime time.Duration
targetExecTime time.Duration
targetErr error
)

go func() {
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
targetStartTime := time.Now()
_, targetErr = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
mirrorCh <- time.Since(targetStartTime)
_, targetErr := mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields)
mirrorCh <- mirrorResult{
execTime: time.Since(targetStartTime),
err: targetErr,
}
}()

var (
sourceExecTime, targetExecTime time.Duration
targetErr error
)

sourceStartTime := time.Now()
r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case d := <-mirrorCh:
case r := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = d
targetExecTime = r.execTime
targetErr = r.err
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
targetErr = errMirrorTargetQueryTookTooLong
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)
Expand All @@ -116,38 +129,42 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs
return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
}

mirrorCh := make(chan time.Duration)
mirrorCh := make(chan mirrorResult, 1)
mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx)
defer mirrorCtxCancel()

var (
sourceExecTime time.Duration
targetExecTime time.Duration
targetErr error
)

go func() {
mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx)
mirrorStartTime := time.Now()
targetErr = mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error {
targetErr := mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error {
return nil
})
mirrorCh <- time.Since(mirrorStartTime)
mirrorCh <- mirrorResult{
execTime: time.Since(mirrorStartTime),
err: targetErr,
}
}()

var (
sourceExecTime, targetExecTime time.Duration
targetErr error
)

sourceStartTime := time.Now()
err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback)
sourceExecTime = time.Since(sourceStartTime)

// Cancel the mirror context if it continues executing too long.
select {
case d := <-mirrorCh:
case r := <-mirrorCh:
// Mirror target finished on time.
targetExecTime = d
targetExecTime = r.execTime
targetErr = r.err
case <-time.After(maxMirrorTargetLag):
// Mirror target took too long.
mirrorCtxCancel()
targetExecTime = sourceExecTime + maxMirrorTargetLag
targetErr = errMirrorTargetQueryTookTooLong
}

vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/engine/mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestMirror(t *testing.T) {
wg.Wait()

require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load())
require.ErrorContains(t, *targetErr.Load(), "Mirror target query took too long")
})

t.Run("TryStreamExecute success", func(t *testing.T) {
Expand Down Expand Up @@ -543,5 +544,6 @@ func TestMirror(t *testing.T) {
})

require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load())
require.ErrorContains(t, *targetErr.Load(), "Mirror target query took too long")
})
}
57 changes: 41 additions & 16 deletions go/vt/vtgate/vcursor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package vtgate
import (
"context"
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -15,6 +16,7 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -37,8 +39,7 @@ func (f fakeVSchemaOperator) UpdateVSchema(ctx context.Context, ksName string, v
panic("implement me")
}

type fakeTopoServer struct {
}
type fakeTopoServer struct{}

// GetTopoServer returns the full topo.Server instance.
func (f *fakeTopoServer) GetTopoServer() (*topo.Server, error) {
Expand Down Expand Up @@ -78,7 +79,6 @@ func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace st
// the provided cell. It will call the callback when
// a new value or an error occurs.
func (f *fakeTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) {

}

func TestDestinationKeyspace(t *testing.T) {
Expand Down Expand Up @@ -106,12 +106,14 @@ func TestDestinationKeyspace(t *testing.T) {
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ks1.Name: ks1Schema,
ks2.Name: ks2Schema,
}}
},
}

vschemaWith1KS := &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ks1.Name: ks1Schema,
}}
},
}

type testCase struct {
vschema *vindexes.VSchema
Expand Down Expand Up @@ -203,20 +205,24 @@ func TestDestinationKeyspace(t *testing.T) {
}
}

var ks1 = &vindexes.Keyspace{Name: "ks1"}
var ks1Schema = &vindexes.KeyspaceSchema{Keyspace: ks1}
var ks2 = &vindexes.Keyspace{Name: "ks2"}
var ks2Schema = &vindexes.KeyspaceSchema{Keyspace: ks2}
var vschemaWith1KS = &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ks1.Name: ks1Schema,
},
}
var (
ks1 = &vindexes.Keyspace{Name: "ks1"}
ks1Schema = &vindexes.KeyspaceSchema{Keyspace: ks1}
ks2 = &vindexes.Keyspace{Name: "ks2"}
ks2Schema = &vindexes.KeyspaceSchema{Keyspace: ks2}
vschemaWith1KS = &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ks1.Name: ks1Schema,
},
}
)

var vschemaWith2KS = &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ks1.Name: ks1Schema,
ks2.Name: ks2Schema,
}}
},
}

func TestSetTarget(t *testing.T) {
type testCase struct {
Expand Down Expand Up @@ -318,7 +324,8 @@ func TestFirstSortedKeyspace(t *testing.T) {
ks1Schema.Keyspace.Name: ks1Schema,
ks2Schema.Keyspace.Name: ks2Schema,
ks3Schema.Keyspace.Name: ks3Schema,
}}
},
}

r, _, _, _, _ := createExecutorEnv(t)
vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4)
Expand Down Expand Up @@ -372,3 +379,21 @@ func TestSetExecQueryTimeout(t *testing.T) {
// this should be reset.
require.Nil(t, safeSession.Options.Timeout)
}

func TestRecordMirrorStats(t *testing.T) {
executor, _, _, _, _ := createExecutorEnv(t)
safeSession := NewSafeSession(nil)
logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil)
vc, err := newVCursorImpl(safeSession, sqlparser.MarginComments{}, executor, logStats, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4)
require.NoError(t, err)

require.Zero(t, logStats.MirrorSourceExecuteTime)
require.Zero(t, logStats.MirrorTargetExecuteTime)
require.Nil(t, logStats.MirrorTargetError)

vc.RecordMirrorStats(10*time.Millisecond, 20*time.Millisecond, errors.New("test error"))

require.Equal(t, 10*time.Millisecond, logStats.MirrorSourceExecuteTime)
require.Equal(t, 20*time.Millisecond, logStats.MirrorTargetExecuteTime)
require.ErrorContains(t, logStats.MirrorTargetError, "test error")
}

0 comments on commit aff7830

Please sign in to comment.