From b3d21ea1d65580a061793ac7951669063d0255f9 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 2 Oct 2024 11:19:12 -0400 Subject: [PATCH 1/2] fix bug, add tests Signed-off-by: Max Englander --- go/vt/vtgate/engine/fake_vcursor_test.go | 17 ++- go/vt/vtgate/engine/mirror.go | 70 ++++++--- go/vt/vtgate/engine/mirror_test.go | 185 ++++++++++++++++++++++- go/vt/vtgate/engine/primitive.go | 3 + go/vt/vtgate/logstats/logstats.go | 55 ++++--- go/vt/vtgate/logstats/logstats_test.go | 36 +++-- go/vt/vtgate/vcursor_impl.go | 7 + 7 files changed, 310 insertions(+), 63 deletions(-) diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index f0b11b7a3ac..9ba4fdc6a6b 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -445,9 +445,10 @@ type loggingVCursor struct { parser *sqlparser.Parser - handleMirrorClonesFn func(context.Context) VCursor + onMirrorClonesFn func(context.Context) VCursor onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool) onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error) + onRecordMirrorStatsFn func(time.Duration, time.Duration, error) } func (f *loggingVCursor) HasCreatedTempTable() { @@ -564,8 +565,8 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { } func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor { - if f.handleMirrorClonesFn != nil { - return f.handleMirrorClonesFn(ctx) + if f.onMirrorClonesFn != nil { + return f.onMirrorClonesFn(ctx) } panic("no mirror clones available") } @@ -886,6 +887,12 @@ func (t *loggingVCursor) SQLParser() *sqlparser.Parser { return t.parser } +func (t *loggingVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) { + if t.onRecordMirrorStatsFn != nil { + t.onRecordMirrorStatsFn(sourceExecTime, targetExecTime, targetErr) + } +} + func (t *noopVCursor) VExplainLogging() {} func (t *noopVCursor) DisableLogging() {} func (t *noopVCursor) GetVExplainLogs() []ExecuteEntry { @@ -896,6 +903,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) { return nil, nil } +// RecordMirrorStats implements VCursor. +func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) { +} + func expectResult(t *testing.T, result, want *sqltypes.Result) { t.Helper() fieldsResult := fmt.Sprintf("%v", result.Fields) diff --git a/go/vt/vtgate/engine/mirror.go b/go/vt/vtgate/engine/mirror.go index 89e70ebd695..78004639c19 100644 --- a/go/vt/vtgate/engine/mirror.go +++ b/go/vt/vtgate/engine/mirror.go @@ -74,26 +74,40 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) } - mirrorCh := make(chan any) - mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag) + mirrorCh := make(chan time.Duration) + mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() + var ( + sourceExecTime time.Duration + targetExecTime time.Duration + targetErr error + ) + go func() { - defer close(mirrorCh) mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) - // TODO(maxeng) handle error. - _, _ = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) + targetStartTime := time.Now() + _, targetErr = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) + mirrorCh <- time.Since(targetStartTime) }() + sourceStartTime := time.Now() r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) + sourceExecTime = time.Since(sourceStartTime) + // Cancel the mirror context if it continues executing too long. select { - case <-mirrorCh: - // Mirroring completed within the allowed time. - case <-mirrorCtx.Done(): - // Mirroring took too long and was canceled. + case d := <-mirrorCh: + // Mirror target finished on time. + targetExecTime = d + case <-time.After(maxMirrorTargetLag): + // Mirror target took too long. + mirrorCtxCancel() + targetExecTime = sourceExecTime + maxMirrorTargetLag } + vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) + return r, err } @@ -102,30 +116,42 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) } - mirrorCh := make(chan any) - mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag) + mirrorCh := make(chan time.Duration) + mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() + var ( + sourceExecTime time.Duration + targetExecTime time.Duration + targetErr error + ) + go func() { - defer close(mirrorCh) mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) - // TODO(maxeng) handle error. - _ = mirrorVCursor.StreamExecutePrimitive( - mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result, - ) error { - return nil - }) + mirrorStartTime := time.Now() + targetErr = mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error { + return nil + }) + mirrorCh <- time.Since(mirrorStartTime) }() + sourceStartTime := time.Now() err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) + sourceExecTime = time.Since(sourceStartTime) + // Cancel the mirror context if it continues executing too long. select { - case <-mirrorCh: - // Mirroring completed within the allowed time. - case <-mirrorCtx.Done(): - // Mirroring took too long and was canceled. + case d := <-mirrorCh: + // Mirror target finished on time. + targetExecTime = d + case <-time.After(maxMirrorTargetLag): + // Mirror target took too long. + mirrorCtxCancel() + targetExecTime = sourceExecTime + maxMirrorTargetLag } + vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) + return err } diff --git a/go/vt/vtgate/engine/mirror_test.go b/go/vt/vtgate/engine/mirror_test.go index b9e442df32d..db9329fa703 100644 --- a/go/vt/vtgate/engine/mirror_test.go +++ b/go/vt/vtgate/engine/mirror_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -76,6 +77,10 @@ func TestMirror(t *testing.T) { }, } + sourceExecTime := atomic.Pointer[time.Duration]{} + targetExecTime := atomic.Pointer[time.Duration]{} + targetErr := atomic.Pointer[error]{} + vc := &loggingVCursor{ shards: []string{"0"}, ksShardMap: map[string][]string{ @@ -90,15 +95,23 @@ func TestMirror(t *testing.T) { "hello", ), }, - handleMirrorClonesFn: func(ctx context.Context) VCursor { + onMirrorClonesFn: func(ctx context.Context) VCursor { return mirrorVC }, + onRecordMirrorStatsFn: func(sourceTime time.Duration, targetTime time.Duration, err error) { + sourceExecTime.Store(&sourceTime) + targetExecTime.Store(&targetTime) + targetErr.Store(&err) + }, } t.Run("TryExecute success", func(t *testing.T) { defer func() { vc.Rewind() mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() want := vc.results[0] @@ -114,6 +127,8 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryExecute return primitive error", func(t *testing.T) { @@ -124,6 +139,9 @@ func TestMirror(t *testing.T) { vc.results = results vc.resultErr = nil mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() vc.results = nil @@ -143,6 +161,8 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryExecute ignore mirror target error", func(t *testing.T) { @@ -153,6 +173,9 @@ func TestMirror(t *testing.T) { mirrorVC.Rewind() mirrorVC.results = results mirrorVC.resultErr = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() mirrorVC.results = nil @@ -171,6 +194,63 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + + require.NotNil(t, targetExecTime.Load()) + mirrorErr := targetErr.Load() + require.ErrorContains(t, *mirrorErr, "ignore me") + }) + + t.Run("TryExecute fast mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onExecuteMultiShardFn = nil + mirrorVC.Rewind() + mirrorVC.onExecuteMultiShardFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) + }() + + primitiveLatency := 10 * time.Millisecond + vc.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + mirrorVC.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + wg.Add(1) + defer wg.Done() + time.Sleep(primitiveLatency / 2) + select { + case <-ctx.Done(): + require.Fail(t, "mirror target context done") + default: + } + } + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, res, want) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + + wg.Wait() + + require.Greater(t, *sourceExecTime.Load(), *targetExecTime.Load()) }) t.Run("TryExecute slow mirror target", func(t *testing.T) { @@ -179,9 +259,12 @@ func TestMirror(t *testing.T) { vc.onExecuteMultiShardFn = nil mirrorVC.Rewind() mirrorVC.onExecuteMultiShardFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() - primitiveLatency := maxMirrorTargetLag * 2 + primitiveLatency := 10 * time.Millisecond vc.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { time.Sleep(primitiveLatency) select { @@ -193,12 +276,14 @@ func TestMirror(t *testing.T) { var wg sync.WaitGroup defer wg.Wait() - wg.Add(1) mirrorVC.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + wg.Add(1) defer wg.Done() - time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + time.Sleep(primitiveLatency + maxMirrorTargetLag + (5 * time.Millisecond)) select { case <-ctx.Done(): + require.NotNil(t, ctx.Err()) + require.ErrorContains(t, ctx.Err(), "context canceled") default: require.Fail(t, "mirror target context not done") } @@ -217,12 +302,19 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + + wg.Wait() + + require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) }) t.Run("TryStreamExecute success", func(t *testing.T) { defer func() { vc.Rewind() mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() want := vc.results[0] @@ -246,6 +338,9 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryStreamExecute return primitive error", func(t *testing.T) { @@ -256,6 +351,9 @@ func TestMirror(t *testing.T) { vc.results = results vc.resultErr = nil mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() vc.results = nil @@ -282,6 +380,9 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryStreamExecute ignore mirror target error", func(t *testing.T) { @@ -292,6 +393,9 @@ func TestMirror(t *testing.T) { mirrorVC.Rewind() mirrorVC.results = results mirrorVC.resultErr = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() mirrorVC.results = nil @@ -318,6 +422,68 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.NotNil(t, targetExecTime.Load()) + require.ErrorContains(t, *targetErr.Load(), "ignore me") + }) + + t.Run("TryStreamExecute fast mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onStreamExecuteMultiFn = nil + mirrorVC.Rewind() + mirrorVC.onStreamExecuteMultiFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) + }() + + primitiveLatency := 10 * time.Millisecond + vc.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + mirrorVC.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + wg.Add(1) + defer wg.Done() + time.Sleep(primitiveLatency / 2) + select { + case <-ctx.Done(): + require.Fail(t, "mirror target context done") + default: + } + } + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + + require.Greater(t, *sourceExecTime.Load(), *targetExecTime.Load()) }) t.Run("TryStreamExecute slow mirror target", func(t *testing.T) { @@ -326,9 +492,12 @@ func TestMirror(t *testing.T) { vc.onStreamExecuteMultiFn = nil mirrorVC.Rewind() mirrorVC.onStreamExecuteMultiFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() - primitiveLatency := maxMirrorTargetLag * 2 + primitiveLatency := 10 * time.Millisecond vc.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { time.Sleep(primitiveLatency) select { @@ -340,10 +509,10 @@ func TestMirror(t *testing.T) { var wg sync.WaitGroup defer wg.Wait() - wg.Add(1) mirrorVC.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + wg.Add(1) defer wg.Done() - time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + time.Sleep(primitiveLatency + maxMirrorTargetLag + (5 * time.Millisecond)) select { case <-ctx.Done(): default: @@ -372,5 +541,7 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) }) } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 76d0a28e516..4f3a388d04f 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -144,6 +144,9 @@ type ( // StartPrimitiveTrace starts a trace for the given primitive, // and returns a function to get the trace logs after the primitive execution. StartPrimitiveTrace() func() Stats + + // RecordMirrorStats is used to record stats about a mirror query. + RecordMirrorStats(time.Duration, time.Duration, error) } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/logstats/logstats.go b/go/vt/vtgate/logstats/logstats.go index 8f8ba41e3cd..fdc0e69c8db 100644 --- a/go/vt/vtgate/logstats/logstats.go +++ b/go/vt/vtgate/logstats/logstats.go @@ -33,25 +33,28 @@ import ( // LogStats records the stats for a single vtgate query type LogStats struct { - Ctx context.Context - Method string - TabletType string - StmtType string - SQL string - BindVariables map[string]*querypb.BindVariable - StartTime time.Time - EndTime time.Time - ShardQueries uint64 - RowsAffected uint64 - RowsReturned uint64 - PlanTime time.Duration - ExecuteTime time.Duration - CommitTime time.Duration - Error error - TablesUsed []string - SessionUUID string - CachedPlan bool - ActiveKeyspace string // ActiveKeyspace is the selected keyspace `use ks` + Ctx context.Context + Method string + TabletType string + StmtType string + SQL string + BindVariables map[string]*querypb.BindVariable + StartTime time.Time + EndTime time.Time + ShardQueries uint64 + RowsAffected uint64 + RowsReturned uint64 + PlanTime time.Duration + ExecuteTime time.Duration + CommitTime time.Duration + Error error + TablesUsed []string + SessionUUID string + CachedPlan bool + ActiveKeyspace string // ActiveKeyspace is the selected keyspace `use ks` + MirrorSourceExecuteTime time.Duration + MirrorTargetExecuteTime time.Duration + MirrorTargetError error } // NewLogStats constructs a new LogStats with supplied Method and ctx @@ -116,6 +119,14 @@ func (stats *LogStats) RemoteAddrUsername() (string, string) { return ci.RemoteAddr(), ci.Username() } +// MirorTargetErrorStr returns the mirror target error string or "" +func (stats *LogStats) MirrorTargetErrorStr() string { + if stats.MirrorTargetError != nil { + return stats.MirrorTargetError.Error() + } + return "" +} + // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { @@ -177,6 +188,12 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { log.Strings(stats.TablesUsed) log.Key("ActiveKeyspace") log.String(stats.ActiveKeyspace) + log.Key("MirrorSourceExecuteTime") + log.Duration(stats.MirrorSourceExecuteTime) + log.Key("MirrorTargetExecuteTime") + log.Duration(stats.MirrorTargetExecuteTime) + log.Key("MirrorTargetError") + log.String(stats.MirrorTargetErrorStr()) return log.Flush(w) } diff --git a/go/vt/vtgate/logstats/logstats_test.go b/go/vt/vtgate/logstats/logstats_test.go index ae3c01e0f0b..872b34c6964 100644 --- a/go/vt/vtgate/logstats/logstats_test.go +++ b/go/vt/vtgate/logstats/logstats_test.go @@ -79,42 +79,42 @@ func TestLogStatsFormat(t *testing.T) { { // 0 redact: false, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: intBindVar, }, { // 1 redact: true, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: intBindVar, }, { // 2 redact: false, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"intVal\":{\"type\":\"INT64\",\"value\":1}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"intVal\":{\"type\":\"INT64\",\"value\":1}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: intBindVar, }, { // 3 redact: true, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: intBindVar, }, { // 4 redact: false, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"strVal\": {\"type\": \"VARCHAR\", \"value\": \"abc\"}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"strVal\": {\"type\": \"VARCHAR\", \"value\": \"abc\"}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: stringBindVar, }, { // 5 redact: true, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: stringBindVar, }, { // 6 redact: false, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"strVal\":{\"type\":\"VARCHAR\",\"value\":\"abc\"}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"strVal\":{\"type\":\"VARCHAR\",\"value\":\"abc\"}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: stringBindVar, }, { // 7 redact: true, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: stringBindVar, }, } @@ -156,12 +156,12 @@ func TestLogStatsFilter(t *testing.T) { params := map[string][]string{"full": {}} got := testFormat(t, logStats, params) - want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogFilterTag("LOG_THIS_QUERY") got = testFormat(t, logStats, params) - want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogFilterTag("NOT_THIS_QUERY") @@ -179,12 +179,12 @@ func TestLogStatsRowThreshold(t *testing.T) { params := map[string][]string{"full": {}} got := testFormat(t, logStats, params) - want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogRowThreshold(0) got = testFormat(t, logStats, params) - want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogRowThreshold(1) got = testFormat(t, logStats, params) @@ -215,6 +215,18 @@ func TestLogStatsErrorStr(t *testing.T) { } } +func TestLogStatsMirrorTargetErrorStr(t *testing.T) { + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) + if logStats.MirrorTargetErrorStr() != "" { + t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr()) + } + errStr := "unknown error" + logStats.MirrorTargetError = errors.New(errStr) + if !strings.Contains(logStats.MirrorTargetErrorStr(), errStr) { + t.Fatalf("expect string '%s' in error message, but got: %s", errStr, logStats.ErrorStr()) + } +} + func TestLogStatsRemoteAddrUsername(t *testing.T) { logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) addr, user := logStats.RemoteAddrUsername() diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 4f616f77fc8..e9b1d3d7712 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -1535,3 +1535,10 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) { func (vc *vcursorImpl) GetForeignKeyChecksState() *bool { return vc.fkChecksState } + +// RecordMirrorStats is used to record stats about a mirror query. +func (vc *vcursorImpl) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) { + vc.logStats.MirrorSourceExecuteTime = sourceExecTime + vc.logStats.MirrorTargetExecuteTime = targetExecTime + vc.logStats.MirrorTargetError = targetErr +} From aff7830793c984917bdcf53a7be8eeae75df3808 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 2 Oct 2024 13:27:01 -0400 Subject: [PATCH 2/2] fix race, add logstats RecordMirrorStats test Signed-off-by: Max Englander --- go/vt/vtgate/engine/mirror.go | 61 +++++++++++++++++++----------- go/vt/vtgate/engine/mirror_test.go | 2 + go/vt/vtgate/vcursor_impl_test.go | 57 ++++++++++++++++++++-------- 3 files changed, 82 insertions(+), 38 deletions(-) diff --git a/go/vt/vtgate/engine/mirror.go b/go/vt/vtgate/engine/mirror.go index 78004639c19..6396e4b33ec 100644 --- a/go/vt/vtgate/engine/mirror.go +++ b/go/vt/vtgate/engine/mirror.go @@ -23,8 +23,12 @@ import ( "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) +var errMirrorTargetQueryTookTooLong = vterrors.Errorf(vtrpc.Code_ABORTED, "Mirror target query took too long") + type ( // percentBasedMirror represents the instructions to execute an // authoritative primitive and, based on whether a die-roll exceeds a @@ -34,6 +38,11 @@ type ( primitive Primitive target Primitive } + + mirrorResult struct { + execTime time.Duration + err error + } ) const ( @@ -74,36 +83,40 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) } - mirrorCh := make(chan time.Duration) + mirrorCh := make(chan mirrorResult, 1) mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() - var ( - sourceExecTime time.Duration - targetExecTime time.Duration - targetErr error - ) - go func() { mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) targetStartTime := time.Now() - _, targetErr = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) - mirrorCh <- time.Since(targetStartTime) + _, targetErr := mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) + mirrorCh <- mirrorResult{ + execTime: time.Since(targetStartTime), + err: targetErr, + } }() + var ( + sourceExecTime, targetExecTime time.Duration + targetErr error + ) + sourceStartTime := time.Now() r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) sourceExecTime = time.Since(sourceStartTime) // Cancel the mirror context if it continues executing too long. select { - case d := <-mirrorCh: + case r := <-mirrorCh: // Mirror target finished on time. - targetExecTime = d + targetExecTime = r.execTime + targetErr = r.err case <-time.After(maxMirrorTargetLag): // Mirror target took too long. mirrorCtxCancel() targetExecTime = sourceExecTime + maxMirrorTargetLag + targetErr = errMirrorTargetQueryTookTooLong } vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) @@ -116,38 +129,42 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) } - mirrorCh := make(chan time.Duration) + mirrorCh := make(chan mirrorResult, 1) mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() - var ( - sourceExecTime time.Duration - targetExecTime time.Duration - targetErr error - ) - go func() { mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) mirrorStartTime := time.Now() - targetErr = mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error { + targetErr := mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error { return nil }) - mirrorCh <- time.Since(mirrorStartTime) + mirrorCh <- mirrorResult{ + execTime: time.Since(mirrorStartTime), + err: targetErr, + } }() + var ( + sourceExecTime, targetExecTime time.Duration + targetErr error + ) + sourceStartTime := time.Now() err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) sourceExecTime = time.Since(sourceStartTime) // Cancel the mirror context if it continues executing too long. select { - case d := <-mirrorCh: + case r := <-mirrorCh: // Mirror target finished on time. - targetExecTime = d + targetExecTime = r.execTime + targetErr = r.err case <-time.After(maxMirrorTargetLag): // Mirror target took too long. mirrorCtxCancel() targetExecTime = sourceExecTime + maxMirrorTargetLag + targetErr = errMirrorTargetQueryTookTooLong } vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) diff --git a/go/vt/vtgate/engine/mirror_test.go b/go/vt/vtgate/engine/mirror_test.go index db9329fa703..753b1a26429 100644 --- a/go/vt/vtgate/engine/mirror_test.go +++ b/go/vt/vtgate/engine/mirror_test.go @@ -306,6 +306,7 @@ func TestMirror(t *testing.T) { wg.Wait() require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) + require.ErrorContains(t, *targetErr.Load(), "Mirror target query took too long") }) t.Run("TryStreamExecute success", func(t *testing.T) { @@ -543,5 +544,6 @@ func TestMirror(t *testing.T) { }) require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) + require.ErrorContains(t, *targetErr.Load(), "Mirror target query took too long") }) } diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index 5d2dc2cb44a..95d9a18078d 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -3,6 +3,7 @@ package vtgate import ( "context" "encoding/hex" + "errors" "fmt" "strconv" "strings" @@ -15,6 +16,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtgate/logstats" "vitess.io/vitess/go/vt/vtgate/vindexes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -37,8 +39,7 @@ func (f fakeVSchemaOperator) UpdateVSchema(ctx context.Context, ksName string, v panic("implement me") } -type fakeTopoServer struct { -} +type fakeTopoServer struct{} // GetTopoServer returns the full topo.Server instance. func (f *fakeTopoServer) GetTopoServer() (*topo.Server, error) { @@ -78,7 +79,6 @@ func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace st // the provided cell. It will call the callback when // a new value or an error occurs. func (f *fakeTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) { - } func TestDestinationKeyspace(t *testing.T) { @@ -106,12 +106,14 @@ func TestDestinationKeyspace(t *testing.T) { Keyspaces: map[string]*vindexes.KeyspaceSchema{ ks1.Name: ks1Schema, ks2.Name: ks2Schema, - }} + }, + } vschemaWith1KS := &vindexes.VSchema{ Keyspaces: map[string]*vindexes.KeyspaceSchema{ ks1.Name: ks1Schema, - }} + }, + } type testCase struct { vschema *vindexes.VSchema @@ -203,20 +205,24 @@ func TestDestinationKeyspace(t *testing.T) { } } -var ks1 = &vindexes.Keyspace{Name: "ks1"} -var ks1Schema = &vindexes.KeyspaceSchema{Keyspace: ks1} -var ks2 = &vindexes.Keyspace{Name: "ks2"} -var ks2Schema = &vindexes.KeyspaceSchema{Keyspace: ks2} -var vschemaWith1KS = &vindexes.VSchema{ - Keyspaces: map[string]*vindexes.KeyspaceSchema{ - ks1.Name: ks1Schema, - }, -} +var ( + ks1 = &vindexes.Keyspace{Name: "ks1"} + ks1Schema = &vindexes.KeyspaceSchema{Keyspace: ks1} + ks2 = &vindexes.Keyspace{Name: "ks2"} + ks2Schema = &vindexes.KeyspaceSchema{Keyspace: ks2} + vschemaWith1KS = &vindexes.VSchema{ + Keyspaces: map[string]*vindexes.KeyspaceSchema{ + ks1.Name: ks1Schema, + }, + } +) + var vschemaWith2KS = &vindexes.VSchema{ Keyspaces: map[string]*vindexes.KeyspaceSchema{ ks1.Name: ks1Schema, ks2.Name: ks2Schema, - }} + }, +} func TestSetTarget(t *testing.T) { type testCase struct { @@ -318,7 +324,8 @@ func TestFirstSortedKeyspace(t *testing.T) { ks1Schema.Keyspace.Name: ks1Schema, ks2Schema.Keyspace.Name: ks2Schema, ks3Schema.Keyspace.Name: ks3Schema, - }} + }, + } r, _, _, _, _ := createExecutorEnv(t) vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) @@ -372,3 +379,21 @@ func TestSetExecQueryTimeout(t *testing.T) { // this should be reset. require.Nil(t, safeSession.Options.Timeout) } + +func TestRecordMirrorStats(t *testing.T) { + executor, _, _, _, _ := createExecutorEnv(t) + safeSession := NewSafeSession(nil) + logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil) + vc, err := newVCursorImpl(safeSession, sqlparser.MarginComments{}, executor, logStats, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4) + require.NoError(t, err) + + require.Zero(t, logStats.MirrorSourceExecuteTime) + require.Zero(t, logStats.MirrorTargetExecuteTime) + require.Nil(t, logStats.MirrorTargetError) + + vc.RecordMirrorStats(10*time.Millisecond, 20*time.Millisecond, errors.New("test error")) + + require.Equal(t, 10*time.Millisecond, logStats.MirrorSourceExecuteTime) + require.Equal(t, 20*time.Millisecond, logStats.MirrorTargetExecuteTime) + require.ErrorContains(t, logStats.MirrorTargetError, "test error") +}