From aff7830793c984917bdcf53a7be8eeae75df3808 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 2 Oct 2024 13:27:01 -0400 Subject: [PATCH] fix race, add logstats RecordMirrorStats test Signed-off-by: Max Englander --- go/vt/vtgate/engine/mirror.go | 61 +++++++++++++++++++----------- go/vt/vtgate/engine/mirror_test.go | 2 + go/vt/vtgate/vcursor_impl_test.go | 57 ++++++++++++++++++++-------- 3 files changed, 82 insertions(+), 38 deletions(-) diff --git a/go/vt/vtgate/engine/mirror.go b/go/vt/vtgate/engine/mirror.go index 78004639c19..6396e4b33ec 100644 --- a/go/vt/vtgate/engine/mirror.go +++ b/go/vt/vtgate/engine/mirror.go @@ -23,8 +23,12 @@ import ( "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) +var errMirrorTargetQueryTookTooLong = vterrors.Errorf(vtrpc.Code_ABORTED, "Mirror target query took too long") + type ( // percentBasedMirror represents the instructions to execute an // authoritative primitive and, based on whether a die-roll exceeds a @@ -34,6 +38,11 @@ type ( primitive Primitive target Primitive } + + mirrorResult struct { + execTime time.Duration + err error + } ) const ( @@ -74,36 +83,40 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) } - mirrorCh := make(chan time.Duration) + mirrorCh := make(chan mirrorResult, 1) mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() - var ( - sourceExecTime time.Duration - targetExecTime time.Duration - targetErr error - ) - go func() { mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) targetStartTime := time.Now() - _, targetErr = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) - mirrorCh <- time.Since(targetStartTime) + _, targetErr := mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) + mirrorCh <- mirrorResult{ + execTime: time.Since(targetStartTime), + err: targetErr, + } }() + var ( + sourceExecTime, targetExecTime time.Duration + targetErr error + ) + sourceStartTime := time.Now() r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) sourceExecTime = time.Since(sourceStartTime) // Cancel the mirror context if it continues executing too long. select { - case d := <-mirrorCh: + case r := <-mirrorCh: // Mirror target finished on time. - targetExecTime = d + targetExecTime = r.execTime + targetErr = r.err case <-time.After(maxMirrorTargetLag): // Mirror target took too long. mirrorCtxCancel() targetExecTime = sourceExecTime + maxMirrorTargetLag + targetErr = errMirrorTargetQueryTookTooLong } vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) @@ -116,38 +129,42 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) } - mirrorCh := make(chan time.Duration) + mirrorCh := make(chan mirrorResult, 1) mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() - var ( - sourceExecTime time.Duration - targetExecTime time.Duration - targetErr error - ) - go func() { mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) mirrorStartTime := time.Now() - targetErr = mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error { + targetErr := mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error { return nil }) - mirrorCh <- time.Since(mirrorStartTime) + mirrorCh <- mirrorResult{ + execTime: time.Since(mirrorStartTime), + err: targetErr, + } }() + var ( + sourceExecTime, targetExecTime time.Duration + targetErr error + ) + sourceStartTime := time.Now() err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) sourceExecTime = time.Since(sourceStartTime) // Cancel the mirror context if it continues executing too long. select { - case d := <-mirrorCh: + case r := <-mirrorCh: // Mirror target finished on time. - targetExecTime = d + targetExecTime = r.execTime + targetErr = r.err case <-time.After(maxMirrorTargetLag): // Mirror target took too long. mirrorCtxCancel() targetExecTime = sourceExecTime + maxMirrorTargetLag + targetErr = errMirrorTargetQueryTookTooLong } vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) diff --git a/go/vt/vtgate/engine/mirror_test.go b/go/vt/vtgate/engine/mirror_test.go index db9329fa703..753b1a26429 100644 --- a/go/vt/vtgate/engine/mirror_test.go +++ b/go/vt/vtgate/engine/mirror_test.go @@ -306,6 +306,7 @@ func TestMirror(t *testing.T) { wg.Wait() require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) + require.ErrorContains(t, *targetErr.Load(), "Mirror target query took too long") }) t.Run("TryStreamExecute success", func(t *testing.T) { @@ -543,5 +544,6 @@ func TestMirror(t *testing.T) { }) require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) + require.ErrorContains(t, *targetErr.Load(), "Mirror target query took too long") }) } diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index 5d2dc2cb44a..95d9a18078d 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -3,6 +3,7 @@ package vtgate import ( "context" "encoding/hex" + "errors" "fmt" "strconv" "strings" @@ -15,6 +16,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtgate/logstats" "vitess.io/vitess/go/vt/vtgate/vindexes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -37,8 +39,7 @@ func (f fakeVSchemaOperator) UpdateVSchema(ctx context.Context, ksName string, v panic("implement me") } -type fakeTopoServer struct { -} +type fakeTopoServer struct{} // GetTopoServer returns the full topo.Server instance. func (f *fakeTopoServer) GetTopoServer() (*topo.Server, error) { @@ -78,7 +79,6 @@ func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace st // the provided cell. It will call the callback when // a new value or an error occurs. func (f *fakeTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) { - } func TestDestinationKeyspace(t *testing.T) { @@ -106,12 +106,14 @@ func TestDestinationKeyspace(t *testing.T) { Keyspaces: map[string]*vindexes.KeyspaceSchema{ ks1.Name: ks1Schema, ks2.Name: ks2Schema, - }} + }, + } vschemaWith1KS := &vindexes.VSchema{ Keyspaces: map[string]*vindexes.KeyspaceSchema{ ks1.Name: ks1Schema, - }} + }, + } type testCase struct { vschema *vindexes.VSchema @@ -203,20 +205,24 @@ func TestDestinationKeyspace(t *testing.T) { } } -var ks1 = &vindexes.Keyspace{Name: "ks1"} -var ks1Schema = &vindexes.KeyspaceSchema{Keyspace: ks1} -var ks2 = &vindexes.Keyspace{Name: "ks2"} -var ks2Schema = &vindexes.KeyspaceSchema{Keyspace: ks2} -var vschemaWith1KS = &vindexes.VSchema{ - Keyspaces: map[string]*vindexes.KeyspaceSchema{ - ks1.Name: ks1Schema, - }, -} +var ( + ks1 = &vindexes.Keyspace{Name: "ks1"} + ks1Schema = &vindexes.KeyspaceSchema{Keyspace: ks1} + ks2 = &vindexes.Keyspace{Name: "ks2"} + ks2Schema = &vindexes.KeyspaceSchema{Keyspace: ks2} + vschemaWith1KS = &vindexes.VSchema{ + Keyspaces: map[string]*vindexes.KeyspaceSchema{ + ks1.Name: ks1Schema, + }, + } +) + var vschemaWith2KS = &vindexes.VSchema{ Keyspaces: map[string]*vindexes.KeyspaceSchema{ ks1.Name: ks1Schema, ks2.Name: ks2Schema, - }} + }, +} func TestSetTarget(t *testing.T) { type testCase struct { @@ -318,7 +324,8 @@ func TestFirstSortedKeyspace(t *testing.T) { ks1Schema.Keyspace.Name: ks1Schema, ks2Schema.Keyspace.Name: ks2Schema, ks3Schema.Keyspace.Name: ks3Schema, - }} + }, + } r, _, _, _, _ := createExecutorEnv(t) vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) @@ -372,3 +379,21 @@ func TestSetExecQueryTimeout(t *testing.T) { // this should be reset. require.Nil(t, safeSession.Options.Timeout) } + +func TestRecordMirrorStats(t *testing.T) { + executor, _, _, _, _ := createExecutorEnv(t) + safeSession := NewSafeSession(nil) + logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil) + vc, err := newVCursorImpl(safeSession, sqlparser.MarginComments{}, executor, logStats, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4) + require.NoError(t, err) + + require.Zero(t, logStats.MirrorSourceExecuteTime) + require.Zero(t, logStats.MirrorTargetExecuteTime) + require.Nil(t, logStats.MirrorTargetError) + + vc.RecordMirrorStats(10*time.Millisecond, 20*time.Millisecond, errors.New("test error")) + + require.Equal(t, 10*time.Millisecond, logStats.MirrorSourceExecuteTime) + require.Equal(t, 20*time.Millisecond, logStats.MirrorTargetExecuteTime) + require.ErrorContains(t, logStats.MirrorTargetError, "test error") +}