diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index ef86249996a..862d41b115d 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -62,8 +62,11 @@ type QueryExecutor struct { ctx context.Context logStats *tabletenv.LogStats tsv *TabletServer - tabletType topodatapb.TabletType - setting *smartconnpool.Setting + // targetTabletType stores the target tablet type that we got as part of the request. + // We have the tablet server object too, which stores the current tablet type, but this is different. + // The target type we requested might be different from tsv's tablet type, if we had a change to the tablet type recently. + targetTabletType topodatapb.TabletType + setting *smartconnpool.Setting } const ( @@ -108,10 +111,10 @@ func (qre *QueryExecutor) shouldConsolidate() bool { case querypb.ExecuteOptions_CONSOLIDATOR_ENABLED: return true case querypb.ExecuteOptions_CONSOLIDATOR_ENABLED_REPLICAS: - return qre.tabletType != topodatapb.TabletType_PRIMARY + return qre.targetTabletType != topodatapb.TabletType_PRIMARY default: cm := qre.tsv.qe.consolidatorMode.Load().(string) - return cm == tabletenv.Enable || (cm == tabletenv.NotOnPrimary && qre.tabletType != topodatapb.TabletType_PRIMARY) + return cm == tabletenv.Enable || (cm == tabletenv.NotOnPrimary && qre.targetTabletType != topodatapb.TabletType_PRIMARY) } } @@ -122,7 +125,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { defer func(start time.Time) { duration := time.Since(start) qre.tsv.stats.QueryTimings.Add(planName, duration) - qre.tsv.stats.QueryTimingsByTabletType.Add(qre.tabletType.String(), duration) + qre.tsv.stats.QueryTimingsByTabletType.Add(qre.targetTabletType.String(), duration) qre.recordUserQuery("Execute", int64(duration)) mysqlTime := qre.logStats.MysqlResponseTime @@ -136,12 +139,12 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { errCode = vtErrorCode.String() if reply == nil { - qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), qre.tabletType, 1, duration, mysqlTime, 0, 0, 1, errCode) + qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), qre.targetTabletType, 1, duration, mysqlTime, 0, 0, 1, errCode) qre.plan.AddStats(1, duration, mysqlTime, 0, 0, 1) return } - qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), qre.tabletType, 1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0, errCode) + qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), qre.targetTabletType, 1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0, errCode) qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0) qre.logStats.RowsAffected = int(reply.RowsAffected) qre.logStats.Rows = reply.Rows @@ -315,7 +318,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error { defer func(start time.Time) { qre.tsv.stats.QueryTimings.Record(qre.plan.PlanID.String(), start) - qre.tsv.stats.QueryTimingsByTabletType.Record(qre.tabletType.String(), start) + qre.tsv.stats.QueryTimingsByTabletType.Record(qre.targetTabletType.String(), start) qre.recordUserQuery("Stream", int64(time.Since(start))) }(time.Now()) @@ -405,7 +408,7 @@ func (qre *QueryExecutor) MessageStream(callback StreamCallback) error { defer func(start time.Time) { qre.tsv.stats.QueryTimings.Record(qre.plan.PlanID.String(), start) - qre.tsv.stats.QueryTimingsByTabletType.Record(qre.tabletType.String(), start) + qre.tsv.stats.QueryTimingsByTabletType.Record(qre.targetTabletType.String(), start) qre.recordUserQuery("MessageStream", int64(time.Since(start))) }(time.Now()) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 295a85c31fd..7f4d3a09513 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -871,17 +871,17 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq return err } qre := &QueryExecutor{ - query: query, - marginComments: comments, - bindVars: bindVariables, - connID: connID, - options: options, - plan: plan, - ctx: ctx, - logStats: logStats, - tsv: tsv, - tabletType: targetType, - setting: connSetting, + query: query, + marginComments: comments, + bindVars: bindVariables, + connID: connID, + options: options, + plan: plan, + ctx: ctx, + logStats: logStats, + tsv: tsv, + targetTabletType: targetType, + setting: connSetting, } result, err = qre.Execute() if err != nil { @@ -973,17 +973,17 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ } } qre := &QueryExecutor{ - query: query, - marginComments: comments, - bindVars: bindVariables, - connID: connID, - options: options, - plan: plan, - ctx: ctx, - logStats: logStats, - tsv: tsv, - tabletType: target.GetTabletType(), - setting: connSetting, + query: query, + marginComments: comments, + bindVars: bindVariables, + connID: connID, + options: options, + plan: plan, + ctx: ctx, + logStats: logStats, + tsv: tsv, + targetTabletType: target.GetTabletType(), + setting: connSetting, } return qre.Stream(callback) },