From 84f6a9adbc50600b3175b0ad241019f488a87a95 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 13 Feb 2025 10:55:28 +0800 Subject: [PATCH] ttl: only gc in leader to save performance (#59358) (#59469) close pingcap/tidb#59357 --- pkg/metrics/grafana/tidb.json | 2 +- pkg/ttl/metrics/metrics.go | 5 ++ pkg/ttl/ttlworker/BUILD.bazel | 1 + pkg/ttl/ttlworker/job_manager.go | 13 ++++ .../ttlworker/job_manager_integration_test.go | 74 ++++++++++++++++++- pkg/ttl/ttlworker/job_manager_test.go | 10 +++ 6 files changed, 100 insertions(+), 5 deletions(-) diff --git a/pkg/metrics/grafana/tidb.json b/pkg/metrics/grafana/tidb.json index 02235b7c187b8..9979d6c3779c8 100644 --- a/pkg/metrics/grafana/tidb.json +++ b/pkg/metrics/grafana/tidb.json @@ -22581,7 +22581,7 @@ "targets": [ { "exemplar": true, - "expr": "avg(tidb_server_ttl_watermark_delay{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", type=\"schedule\"}) by (type, name)", + "expr": "max(tidb_server_ttl_watermark_delay{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", type=\"schedule\"}) by (type, name)", "interval": "", "legendFormat": "{{ name }}", "queryType": "randomWalk", diff --git a/pkg/ttl/metrics/metrics.go b/pkg/ttl/metrics/metrics.go index c308821a157f4..4e032b1fbb48c 100644 --- a/pkg/ttl/metrics/metrics.go +++ b/pkg/ttl/metrics/metrics.go @@ -255,3 +255,8 @@ func UpdateDelayMetrics(records map[int64]*DelayMetricsRecord) { metrics.TTLWatermarkDelay.With(prometheus.Labels{metrics.LblType: "schedule", metrics.LblName: delay}).Set(v) } } + +// ClearDelayMetrics clears the metrics of TTL delay +func ClearDelayMetrics() { + metrics.TTLWatermarkDelay.Reset() +} diff --git a/pkg/ttl/ttlworker/BUILD.bazel b/pkg/ttl/ttlworker/BUILD.bazel index d44ca296eb5ad..e86ca43997d70 100644 --- a/pkg/ttl/ttlworker/BUILD.bazel +++ b/pkg/ttl/ttlworker/BUILD.bazel @@ -107,6 +107,7 @@ go_test( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_prometheus_client_golang//prometheus", "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//mock", diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index a0047f641b0c0..b3e8fb158c951 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -500,8 +500,15 @@ func (m *JobManager) reportMetrics(se session.Session) { metrics.RunningJobsCnt.Set(runningJobs) metrics.CancellingJobsCnt.Set(cancellingJobs) + if !m.isLeader() { + // only the leader can do collect delay metrics to reduce the performance overhead + metrics.ClearDelayMetrics() + return + } + if time.Since(m.lastReportDelayMetricsTime) > 10*time.Minute { m.lastReportDelayMetricsTime = time.Now() + logutil.Logger(m.ctx).Info("TTL leader to collect delay metrics") records, err := GetDelayMetricRecords(m.ctx, se, time.Now()) if err != nil { logutil.Logger(m.ctx).Info("failed to get TTL delay metrics", zap.Error(err)) @@ -1031,6 +1038,12 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) { // DoGC deletes some old TTL job histories and redundant scan tasks func (m *JobManager) DoGC(ctx context.Context, se session.Session, now time.Time) { + if !m.isLeader() { + // only the leader can do the GC to reduce the performance impact + return + } + + logutil.Logger(m.ctx).Info("TTL leader to DoGC") // Remove the table not exist in info schema cache. // Delete the table status before deleting the tasks. Therefore the related tasks if err := m.updateInfoSchemaCache(se); err == nil { diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index 7920914ae036d..5454db9c58981 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" + metrics2 "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -46,6 +47,7 @@ import ( "github.com/pingcap/tidb/pkg/ttl/ttlworker" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -777,10 +779,15 @@ func TestGCScanTasks(t *testing.T) { addScanTaskRecord(3, 2, 1) addScanTaskRecord(3, 2, 2) + isLeader := false m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { - return true + return isLeader }) se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) + // only leader can do GC + m.DoGC(context.TODO(), se, se.Now()) + tk.MustQuery("select count(1) from mysql.tidb_ttl_task").Check(testkit.Rows("6")) + isLeader = true m.DoGC(context.TODO(), se, se.Now()) tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2")) } @@ -796,10 +803,15 @@ func TestGCTableStatus(t *testing.T) { // insert table status without corresponding table tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", 2024, 2024) + isLeader := false m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { - return true + return isLeader }) se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) + // only leader can do GC + m.DoGC(context.TODO(), se, se.Now()) + tk.MustQuery("select count(1) from mysql.tidb_ttl_table_status").Check(testkit.Rows("1")) + isLeader = true m.DoGC(context.TODO(), se, se.Now()) tk.MustQuery("select * from mysql.tidb_ttl_table_status").Check(nil) @@ -857,11 +869,16 @@ func TestGCTTLHistory(t *testing.T) { addHistory(6, 91) addHistory(7, 100) + isLeader := false m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { - return true + return isLeader }) se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) m.DoGC(context.TODO(), se, se.Now()) + // only leader can go GC + tk.MustQuery("select count(1) from mysql.tidb_ttl_job_history").Check(testkit.Rows("7")) + isLeader = true + m.DoGC(context.TODO(), se, se.Now()) tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5")) } @@ -1027,6 +1044,53 @@ func TestDelayMetrics(t *testing.T) { checkRecord(records, "t3", now.Add(-3*time.Hour)) checkRecord(records, "t4", now.Add(-3*time.Hour)) checkRecord(records, "t5", emptyTime) + + metrics.ClearDelayMetrics() + getMetricCnt := func() int { + ch := make(chan prometheus.Metric) + go func() { + metrics2.TTLWatermarkDelay.Collect(ch) + close(ch) + }() + + cnt := 0 + for range ch { + cnt++ + } + return cnt + } + + isLeader := false + m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, func() bool { + return isLeader + }) + // If the manager is not leader, the metrics will be empty. + m.ReportMetrics(se) + require.Zero(t, getMetricCnt()) + // leader will collect metrics + isLeader = true + m.SetLastReportDelayMetricsTime(time.Now().Add(-11 * time.Minute)) + m.ReportMetrics(se) + require.Equal(t, len(metrics.WaterMarkScheduleDelayNames), getMetricCnt()) + require.InDelta(t, time.Now().Unix(), m.GetLastReportDelayMetricsTime().Unix(), 5) + // will not collect metrics in 10 minutes + lastReportTime := time.Now().Add(-9 * time.Minute) + m.SetLastReportDelayMetricsTime(lastReportTime) + m.ReportMetrics(se) + require.Equal(t, len(metrics.WaterMarkScheduleDelayNames), getMetricCnt()) + require.Equal(t, lastReportTime.Unix(), m.GetLastReportDelayMetricsTime().Unix(), 5) + // when back to non-leader, the metrics will be empty and last report time will not be updated. + isLeader = false + lastReportTime = time.Now().Add(-11 * time.Minute) + m.SetLastReportDelayMetricsTime(lastReportTime) + m.ReportMetrics(se) + require.Zero(t, getMetricCnt()) + require.Equal(t, lastReportTime.Unix(), m.GetLastReportDelayMetricsTime().Unix()) + // when back to leader again, the metrics will be collected. + isLeader = true + m.ReportMetrics(se) + require.Equal(t, len(metrics.WaterMarkScheduleDelayNames), getMetricCnt()) + require.InDelta(t, time.Now().Unix(), m.GetLastReportDelayMetricsTime().Unix(), 5) } type poolTestWrapper struct { @@ -1485,7 +1549,9 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) { ctx := context.Background() m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil) - m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil) + m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, func() bool { + return true + }) now := se.Now() diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index bb539f996ed9d..125264bc71d28 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -202,6 +202,16 @@ func (m *JobManager) UpdateHeartBeatForJob(ctx context.Context, se session.Sessi return m.updateHeartBeatForJob(ctx, se, now, job) } +// SetLastReportDelayMetricsTime sets the lastReportDelayMetricsTime for test +func (m *JobManager) SetLastReportDelayMetricsTime(t time.Time) { + m.lastReportDelayMetricsTime = t +} + +// GetLastReportDelayMetricsTime returns the lastReportDelayMetricsTime for test +func (m *JobManager) GetLastReportDelayMetricsTime() time.Time { + return m.lastReportDelayMetricsTime +} + // ReportMetrics is an exported version of reportMetrics func (m *JobManager) ReportMetrics(se session.Session) { m.reportMetrics(se)