From b3d21ea1d65580a061793ac7951669063d0255f9 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 2 Oct 2024 11:19:12 -0400 Subject: [PATCH] fix bug, add tests Signed-off-by: Max Englander --- go/vt/vtgate/engine/fake_vcursor_test.go | 17 ++- go/vt/vtgate/engine/mirror.go | 70 ++++++--- go/vt/vtgate/engine/mirror_test.go | 185 ++++++++++++++++++++++- go/vt/vtgate/engine/primitive.go | 3 + go/vt/vtgate/logstats/logstats.go | 55 ++++--- go/vt/vtgate/logstats/logstats_test.go | 36 +++-- go/vt/vtgate/vcursor_impl.go | 7 + 7 files changed, 310 insertions(+), 63 deletions(-) diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index f0b11b7a3ac..9ba4fdc6a6b 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -445,9 +445,10 @@ type loggingVCursor struct { parser *sqlparser.Parser - handleMirrorClonesFn func(context.Context) VCursor + onMirrorClonesFn func(context.Context) VCursor onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool) onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error) + onRecordMirrorStatsFn func(time.Duration, time.Duration, error) } func (f *loggingVCursor) HasCreatedTempTable() { @@ -564,8 +565,8 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { } func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor { - if f.handleMirrorClonesFn != nil { - return f.handleMirrorClonesFn(ctx) + if f.onMirrorClonesFn != nil { + return f.onMirrorClonesFn(ctx) } panic("no mirror clones available") } @@ -886,6 +887,12 @@ func (t *loggingVCursor) SQLParser() *sqlparser.Parser { return t.parser } +func (t *loggingVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) { + if t.onRecordMirrorStatsFn != nil { + t.onRecordMirrorStatsFn(sourceExecTime, targetExecTime, targetErr) + } +} + func (t *noopVCursor) VExplainLogging() {} func (t *noopVCursor) DisableLogging() {} func (t *noopVCursor) GetVExplainLogs() []ExecuteEntry { @@ -896,6 +903,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) { return nil, nil } +// RecordMirrorStats implements VCursor. +func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) { +} + func expectResult(t *testing.T, result, want *sqltypes.Result) { t.Helper() fieldsResult := fmt.Sprintf("%v", result.Fields) diff --git a/go/vt/vtgate/engine/mirror.go b/go/vt/vtgate/engine/mirror.go index 89e70ebd695..78004639c19 100644 --- a/go/vt/vtgate/engine/mirror.go +++ b/go/vt/vtgate/engine/mirror.go @@ -74,26 +74,40 @@ func (m *percentBasedMirror) TryExecute(ctx context.Context, vcursor VCursor, bi return vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) } - mirrorCh := make(chan any) - mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag) + mirrorCh := make(chan time.Duration) + mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() + var ( + sourceExecTime time.Duration + targetExecTime time.Duration + targetErr error + ) + go func() { - defer close(mirrorCh) mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) - // TODO(maxeng) handle error. - _, _ = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) + targetStartTime := time.Now() + _, targetErr = mirrorVCursor.ExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields) + mirrorCh <- time.Since(targetStartTime) }() + sourceStartTime := time.Now() r, err := vcursor.ExecutePrimitive(ctx, m.primitive, bindVars, wantfields) + sourceExecTime = time.Since(sourceStartTime) + // Cancel the mirror context if it continues executing too long. select { - case <-mirrorCh: - // Mirroring completed within the allowed time. - case <-mirrorCtx.Done(): - // Mirroring took too long and was canceled. + case d := <-mirrorCh: + // Mirror target finished on time. + targetExecTime = d + case <-time.After(maxMirrorTargetLag): + // Mirror target took too long. + mirrorCtxCancel() + targetExecTime = sourceExecTime + maxMirrorTargetLag } + vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) + return r, err } @@ -102,30 +116,42 @@ func (m *percentBasedMirror) TryStreamExecute(ctx context.Context, vcursor VCurs return vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) } - mirrorCh := make(chan any) - mirrorCtx, mirrorCtxCancel := context.WithTimeout(ctx, maxMirrorTargetLag) + mirrorCh := make(chan time.Duration) + mirrorCtx, mirrorCtxCancel := context.WithCancel(ctx) defer mirrorCtxCancel() + var ( + sourceExecTime time.Duration + targetExecTime time.Duration + targetErr error + ) + go func() { - defer close(mirrorCh) mirrorVCursor := vcursor.CloneForMirroring(mirrorCtx) - // TODO(maxeng) handle error. - _ = mirrorVCursor.StreamExecutePrimitive( - mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result, - ) error { - return nil - }) + mirrorStartTime := time.Now() + targetErr = mirrorVCursor.StreamExecutePrimitive(mirrorCtx, m.target, bindVars, wantfields, func(_ *sqltypes.Result) error { + return nil + }) + mirrorCh <- time.Since(mirrorStartTime) }() + sourceStartTime := time.Now() err := vcursor.StreamExecutePrimitive(ctx, m.primitive, bindVars, wantfields, callback) + sourceExecTime = time.Since(sourceStartTime) + // Cancel the mirror context if it continues executing too long. select { - case <-mirrorCh: - // Mirroring completed within the allowed time. - case <-mirrorCtx.Done(): - // Mirroring took too long and was canceled. + case d := <-mirrorCh: + // Mirror target finished on time. + targetExecTime = d + case <-time.After(maxMirrorTargetLag): + // Mirror target took too long. + mirrorCtxCancel() + targetExecTime = sourceExecTime + maxMirrorTargetLag } + vcursor.RecordMirrorStats(sourceExecTime, targetExecTime, targetErr) + return err } diff --git a/go/vt/vtgate/engine/mirror_test.go b/go/vt/vtgate/engine/mirror_test.go index b9e442df32d..db9329fa703 100644 --- a/go/vt/vtgate/engine/mirror_test.go +++ b/go/vt/vtgate/engine/mirror_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -76,6 +77,10 @@ func TestMirror(t *testing.T) { }, } + sourceExecTime := atomic.Pointer[time.Duration]{} + targetExecTime := atomic.Pointer[time.Duration]{} + targetErr := atomic.Pointer[error]{} + vc := &loggingVCursor{ shards: []string{"0"}, ksShardMap: map[string][]string{ @@ -90,15 +95,23 @@ func TestMirror(t *testing.T) { "hello", ), }, - handleMirrorClonesFn: func(ctx context.Context) VCursor { + onMirrorClonesFn: func(ctx context.Context) VCursor { return mirrorVC }, + onRecordMirrorStatsFn: func(sourceTime time.Duration, targetTime time.Duration, err error) { + sourceExecTime.Store(&sourceTime) + targetExecTime.Store(&targetTime) + targetErr.Store(&err) + }, } t.Run("TryExecute success", func(t *testing.T) { defer func() { vc.Rewind() mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() want := vc.results[0] @@ -114,6 +127,8 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryExecute return primitive error", func(t *testing.T) { @@ -124,6 +139,9 @@ func TestMirror(t *testing.T) { vc.results = results vc.resultErr = nil mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() vc.results = nil @@ -143,6 +161,8 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryExecute ignore mirror target error", func(t *testing.T) { @@ -153,6 +173,9 @@ func TestMirror(t *testing.T) { mirrorVC.Rewind() mirrorVC.results = results mirrorVC.resultErr = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() mirrorVC.results = nil @@ -171,6 +194,63 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + + require.NotNil(t, targetExecTime.Load()) + mirrorErr := targetErr.Load() + require.ErrorContains(t, *mirrorErr, "ignore me") + }) + + t.Run("TryExecute fast mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onExecuteMultiShardFn = nil + mirrorVC.Rewind() + mirrorVC.onExecuteMultiShardFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) + }() + + primitiveLatency := 10 * time.Millisecond + vc.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + mirrorVC.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + wg.Add(1) + defer wg.Done() + time.Sleep(primitiveLatency / 2) + select { + case <-ctx.Done(): + require.Fail(t, "mirror target context done") + default: + } + } + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, res, want) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + + wg.Wait() + + require.Greater(t, *sourceExecTime.Load(), *targetExecTime.Load()) }) t.Run("TryExecute slow mirror target", func(t *testing.T) { @@ -179,9 +259,12 @@ func TestMirror(t *testing.T) { vc.onExecuteMultiShardFn = nil mirrorVC.Rewind() mirrorVC.onExecuteMultiShardFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() - primitiveLatency := maxMirrorTargetLag * 2 + primitiveLatency := 10 * time.Millisecond vc.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { time.Sleep(primitiveLatency) select { @@ -193,12 +276,14 @@ func TestMirror(t *testing.T) { var wg sync.WaitGroup defer wg.Wait() - wg.Add(1) mirrorVC.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + wg.Add(1) defer wg.Done() - time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + time.Sleep(primitiveLatency + maxMirrorTargetLag + (5 * time.Millisecond)) select { case <-ctx.Done(): + require.NotNil(t, ctx.Err()) + require.ErrorContains(t, ctx.Err(), "context canceled") default: require.Fail(t, "mirror target context not done") } @@ -217,12 +302,19 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", }) + + wg.Wait() + + require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) }) t.Run("TryStreamExecute success", func(t *testing.T) { defer func() { vc.Rewind() mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() want := vc.results[0] @@ -246,6 +338,9 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryStreamExecute return primitive error", func(t *testing.T) { @@ -256,6 +351,9 @@ func TestMirror(t *testing.T) { vc.results = results vc.resultErr = nil mirrorVC.Rewind() + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() vc.results = nil @@ -282,6 +380,9 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.NotNil(t, targetExecTime.Load()) + require.Nil(t, *targetErr.Load()) }) t.Run("TryStreamExecute ignore mirror target error", func(t *testing.T) { @@ -292,6 +393,9 @@ func TestMirror(t *testing.T) { mirrorVC.Rewind() mirrorVC.results = results mirrorVC.resultErr = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() mirrorVC.results = nil @@ -318,6 +422,68 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.NotNil(t, targetExecTime.Load()) + require.ErrorContains(t, *targetErr.Load(), "ignore me") + }) + + t.Run("TryStreamExecute fast mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onStreamExecuteMultiFn = nil + mirrorVC.Rewind() + mirrorVC.onStreamExecuteMultiFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) + }() + + primitiveLatency := 10 * time.Millisecond + vc.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + mirrorVC.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + wg.Add(1) + defer wg.Done() + time.Sleep(primitiveLatency / 2) + select { + case <-ctx.Done(): + require.Fail(t, "mirror target context done") + default: + } + } + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + + require.Greater(t, *sourceExecTime.Load(), *targetExecTime.Load()) }) t.Run("TryStreamExecute slow mirror target", func(t *testing.T) { @@ -326,9 +492,12 @@ func TestMirror(t *testing.T) { vc.onStreamExecuteMultiFn = nil mirrorVC.Rewind() mirrorVC.onStreamExecuteMultiFn = nil + sourceExecTime.Store(nil) + targetExecTime.Store(nil) + targetErr.Store(nil) }() - primitiveLatency := maxMirrorTargetLag * 2 + primitiveLatency := 10 * time.Millisecond vc.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { time.Sleep(primitiveLatency) select { @@ -340,10 +509,10 @@ func TestMirror(t *testing.T) { var wg sync.WaitGroup defer wg.Wait() - wg.Add(1) mirrorVC.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + wg.Add(1) defer wg.Done() - time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + time.Sleep(primitiveLatency + maxMirrorTargetLag + (5 * time.Millisecond)) select { case <-ctx.Done(): default: @@ -372,5 +541,7 @@ func TestMirror(t *testing.T) { `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", }) + + require.Greater(t, *targetExecTime.Load(), *sourceExecTime.Load()) }) } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 76d0a28e516..4f3a388d04f 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -144,6 +144,9 @@ type ( // StartPrimitiveTrace starts a trace for the given primitive, // and returns a function to get the trace logs after the primitive execution. StartPrimitiveTrace() func() Stats + + // RecordMirrorStats is used to record stats about a mirror query. + RecordMirrorStats(time.Duration, time.Duration, error) } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/logstats/logstats.go b/go/vt/vtgate/logstats/logstats.go index 8f8ba41e3cd..fdc0e69c8db 100644 --- a/go/vt/vtgate/logstats/logstats.go +++ b/go/vt/vtgate/logstats/logstats.go @@ -33,25 +33,28 @@ import ( // LogStats records the stats for a single vtgate query type LogStats struct { - Ctx context.Context - Method string - TabletType string - StmtType string - SQL string - BindVariables map[string]*querypb.BindVariable - StartTime time.Time - EndTime time.Time - ShardQueries uint64 - RowsAffected uint64 - RowsReturned uint64 - PlanTime time.Duration - ExecuteTime time.Duration - CommitTime time.Duration - Error error - TablesUsed []string - SessionUUID string - CachedPlan bool - ActiveKeyspace string // ActiveKeyspace is the selected keyspace `use ks` + Ctx context.Context + Method string + TabletType string + StmtType string + SQL string + BindVariables map[string]*querypb.BindVariable + StartTime time.Time + EndTime time.Time + ShardQueries uint64 + RowsAffected uint64 + RowsReturned uint64 + PlanTime time.Duration + ExecuteTime time.Duration + CommitTime time.Duration + Error error + TablesUsed []string + SessionUUID string + CachedPlan bool + ActiveKeyspace string // ActiveKeyspace is the selected keyspace `use ks` + MirrorSourceExecuteTime time.Duration + MirrorTargetExecuteTime time.Duration + MirrorTargetError error } // NewLogStats constructs a new LogStats with supplied Method and ctx @@ -116,6 +119,14 @@ func (stats *LogStats) RemoteAddrUsername() (string, string) { return ci.RemoteAddr(), ci.Username() } +// MirorTargetErrorStr returns the mirror target error string or "" +func (stats *LogStats) MirrorTargetErrorStr() string { + if stats.MirrorTargetError != nil { + return stats.MirrorTargetError.Error() + } + return "" +} + // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { @@ -177,6 +188,12 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { log.Strings(stats.TablesUsed) log.Key("ActiveKeyspace") log.String(stats.ActiveKeyspace) + log.Key("MirrorSourceExecuteTime") + log.Duration(stats.MirrorSourceExecuteTime) + log.Key("MirrorTargetExecuteTime") + log.Duration(stats.MirrorTargetExecuteTime) + log.Key("MirrorTargetError") + log.String(stats.MirrorTargetErrorStr()) return log.Flush(w) } diff --git a/go/vt/vtgate/logstats/logstats_test.go b/go/vt/vtgate/logstats/logstats_test.go index ae3c01e0f0b..872b34c6964 100644 --- a/go/vt/vtgate/logstats/logstats_test.go +++ b/go/vt/vtgate/logstats/logstats_test.go @@ -79,42 +79,42 @@ func TestLogStatsFormat(t *testing.T) { { // 0 redact: false, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: intBindVar, }, { // 1 redact: true, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: intBindVar, }, { // 2 redact: false, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"intVal\":{\"type\":\"INT64\",\"value\":1}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"intVal\":{\"type\":\"INT64\",\"value\":1}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: intBindVar, }, { // 3 redact: true, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: intBindVar, }, { // 4 redact: false, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"strVal\": {\"type\": \"VARCHAR\", \"value\": \"abc\"}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t{\"strVal\": {\"type\": \"VARCHAR\", \"value\": \"abc\"}}\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: stringBindVar, }, { // 5 redact: true, format: "text", - expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\n", + expected: "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1\"\t\"[REDACTED]\"\t0\t0\t\"\"\t\"PRIMARY\"\t\"suuid\"\tfalse\t[\"ks1.tbl1\",\"ks2.tbl2\"]\t\"db\"\t0.000000\t0.000000\t\"\"\n", bindVars: stringBindVar, }, { // 6 redact: false, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"strVal\":{\"type\":\"VARCHAR\",\"value\":\"abc\"}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":{\"strVal\":{\"type\":\"VARCHAR\",\"value\":\"abc\"}},\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: stringBindVar, }, { // 7 redact: true, format: "json", - expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", + expected: "{\"ActiveKeyspace\":\"db\",\"BindVars\":\"[REDACTED]\",\"Cached Plan\":false,\"CommitTime\":0,\"Effective Caller\":\"\",\"End\":\"2017-01-01 01:02:04.000001\",\"Error\":\"\",\"ExecuteTime\":0,\"ImmediateCaller\":\"\",\"Method\":\"test\",\"MirrorSourceExecuteTime\":0,\"MirrorTargetError\":\"\",\"MirrorTargetExecuteTime\":0,\"PlanTime\":0,\"RemoteAddr\":\"\",\"RowsAffected\":0,\"SQL\":\"sql1\",\"SessionUUID\":\"suuid\",\"ShardQueries\":0,\"Start\":\"2017-01-01 01:02:03.000000\",\"StmtType\":\"\",\"TablesUsed\":[\"ks1.tbl1\",\"ks2.tbl2\"],\"TabletType\":\"PRIMARY\",\"TotalTime\":1.000001,\"Username\":\"\"}", bindVars: stringBindVar, }, } @@ -156,12 +156,12 @@ func TestLogStatsFilter(t *testing.T) { params := map[string][]string{"full": {}} got := testFormat(t, logStats, params) - want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogFilterTag("LOG_THIS_QUERY") got = testFormat(t, logStats, params) - want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogFilterTag("NOT_THIS_QUERY") @@ -179,12 +179,12 @@ func TestLogStatsRowThreshold(t *testing.T) { params := map[string][]string{"full": {}} got := testFormat(t, logStats, params) - want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogRowThreshold(0) got = testFormat(t, logStats, params) - want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\n" + want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) streamlog.SetQueryLogRowThreshold(1) got = testFormat(t, logStats, params) @@ -215,6 +215,18 @@ func TestLogStatsErrorStr(t *testing.T) { } } +func TestLogStatsMirrorTargetErrorStr(t *testing.T) { + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) + if logStats.MirrorTargetErrorStr() != "" { + t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr()) + } + errStr := "unknown error" + logStats.MirrorTargetError = errors.New(errStr) + if !strings.Contains(logStats.MirrorTargetErrorStr(), errStr) { + t.Fatalf("expect string '%s' in error message, but got: %s", errStr, logStats.ErrorStr()) + } +} + func TestLogStatsRemoteAddrUsername(t *testing.T) { logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) addr, user := logStats.RemoteAddrUsername() diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 4f616f77fc8..e9b1d3d7712 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -1535,3 +1535,10 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) { func (vc *vcursorImpl) GetForeignKeyChecksState() *bool { return vc.fkChecksState } + +// RecordMirrorStats is used to record stats about a mirror query. +func (vc *vcursorImpl) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) { + vc.logStats.MirrorSourceExecuteTime = sourceExecTime + vc.logStats.MirrorTargetExecuteTime = targetExecTime + vc.logStats.MirrorTargetError = targetErr +}