Skip to content

Commit

Permalink
Introduce ThrottlerMetricsPublisher, pass throttler so that self-metr…
Browse files Browse the repository at this point in the history
…ics can query it directly

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Jul 25, 2024
1 parent 292adbb commit 0266758
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/throttle/base/self_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type SelfMetric interface {
DefaultScope() Scope
DefaultThreshold() float64
RequiresConn() bool
Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric
Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric
}

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,13 @@ package base

import (
"context"
"sync/atomic"

"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var _ SelfMetric = registerSelfMetric(&CustomQuerySelfMetric{})

type CustomQuerySelfMetric struct {
customQueryFuncPtr atomic.Pointer[func() string]
}

func (m *CustomQuerySelfMetric) GetQuery() string {
customQueryFunc := m.customQueryFuncPtr.Load()
if customQueryFunc == nil {
return ""
}
query := (*customQueryFunc)()
return query
}

func (m *CustomQuerySelfMetric) SetQueryFunc(f func() string) {
if f == nil {
m.customQueryFuncPtr.Store(nil)
return
}
m.customQueryFuncPtr.Store(&f)
}

func (m *CustomQuerySelfMetric) Name() MetricName {
Expand All @@ -62,6 +43,6 @@ func (m *CustomQuerySelfMetric) RequiresConn() bool {
return true
}

func (m *CustomQuerySelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery())
func (m *CustomQuerySelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, throttler.GetCustomMetricsQuery())
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *DefaultSelfMetric) RequiresConn() bool {
return false
}

func (m *DefaultSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
func (m *DefaultSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
return &ThrottleMetric{
Err: fmt.Errorf("unexpected direct call to DefaultSelfMetric.Read"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ func (m *LagSelfMetric) RequiresConn() bool {
return true
}

func (m *LagSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
func (m *LagSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery())
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *LoadAvgSelfMetric) RequiresConn() bool {
return false
}

func (m *LoadAvgSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
func (m *LoadAvgSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
metric := &ThrottleMetric{
Scope: SelfScope,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ func (m *ThreadsRunningSelfMetric) RequiresConn() bool {
return true
}

func (m *ThreadsRunningSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
func (m *ThreadsRunningSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, threadsRunningMetricQuery)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package base

// ThrottlerMetricsPublisher is implemented by throttler.Throttler and is used by SelfMetric
// implementations to query the throttler.
type ThrottlerMetricsPublisher interface {
GetCustomMetricsQuery() string
}
33 changes: 9 additions & 24 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv
throttler.readSelfThrottleMetrics = func(ctx context.Context) base.ThrottleMetrics {
return throttler.readSelfThrottleMetricsInternal(ctx)
}
if customQuerySelfMetric, ok := base.RegisteredSelfMetrics[base.CustomMetricName].(*base.CustomQuerySelfMetric); ok {
customQuerySelfMetric.SetQueryFunc(throttler.GetCustomMetricsQuery)
}
return throttler
}

Expand Down Expand Up @@ -689,20 +686,6 @@ func (throttler *Throttler) stimulatePrimaryThrottler(ctx context.Context, tmCli
return nil
}

// readSelfMySQLThrottleMetric reads the metric from this very tablet or from its backend mysql.
func (throttler *Throttler) readSelfMySQLThrottleMetric(ctx context.Context, query string) *base.ThrottleMetric {
conn, err := throttler.pool.Get(ctx, nil)
if err != nil {
return &base.ThrottleMetric{Err: err}
}
defer conn.Recycle()

result := base.ReadSelfMySQLThrottleMetric(ctx, conn.Conn, query)
result.Alias = throttler.tabletAlias

return result
}

// throttledAppsSnapshot returns a snapshot (a copy) of current throttled apps
func (throttler *Throttler) throttledAppsSnapshot() map[string]cache.Item {
return throttler.throttledApps.Items()
Expand Down Expand Up @@ -946,10 +929,11 @@ func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClie
}
}

// readSelfThrottleMetricsInternal rreads all registsred self metrics on this tablet (or backend MySQL server).
// This is the actual place where metrics are read, to be later aggregated and/or propagated to other tablets.
func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context) base.ThrottleMetrics {

writeMetric := func(metricName base.MetricName, metric *base.ThrottleMetric) {
metric.Name = metricName
writeMetric := func(metric *base.ThrottleMetric) {
select {
case <-ctx.Done():
return
Expand All @@ -958,23 +942,24 @@ func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context)
}
readMetric := func(selfMetric base.SelfMetric) *base.ThrottleMetric {
if !selfMetric.RequiresConn() {
return selfMetric.Read(ctx, nil)
return selfMetric.Read(ctx, throttler, nil)
}
conn, err := throttler.pool.Get(ctx, nil)
if err != nil {
return &base.ThrottleMetric{Err: err}
}
defer conn.Recycle()
return selfMetric.Read(ctx, conn.Conn)
return selfMetric.Read(ctx, throttler, conn.Conn)
}
for metricsName, selfMetric := range base.RegisteredSelfMetrics {
if metricsName == base.DefaultMetricName {
for metricName, selfMetric := range base.RegisteredSelfMetrics {
if metricName == base.DefaultMetricName {
continue
}
metric := readMetric(selfMetric)
metric.Name = metricName
metric.Alias = throttler.tabletAlias

go writeMetric(metricsName, metric)
go writeMetric(metric)
}
return nil
}
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,6 @@ func newTestThrottler() *Throttler {
return selfMetrics
}
throttler.ThrottleApp(throttlerapp.TestingAlwaysThrottlerName.String(), time.Now().Add(time.Hour*24*365*10), DefaultThrottleRatio, false)
if customQuerySelfMetric, ok := base.RegisteredSelfMetrics[base.CustomMetricName].(*base.CustomQuerySelfMetric); ok {
customQuerySelfMetric.SetQueryFunc(throttler.GetCustomMetricsQuery)
}

return throttler
}
Expand Down

0 comments on commit 0266758

Please sign in to comment.