Skip to content

Commit 7991b7f

Browse files
lcwangchaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#59358
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 4d157f6 commit 7991b7f

File tree

6 files changed

+127
-4
lines changed

6 files changed

+127
-4
lines changed

pkg/metrics/grafana/tidb.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20236,7 +20236,7 @@
2023620236
"targets": [
2023720237
{
2023820238
"exemplar": true,
20239-
"expr": "avg(tidb_server_ttl_watermark_delay{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", type=\"schedule\"}) by (type, name)",
20239+
"expr": "max(tidb_server_ttl_watermark_delay{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", type=\"schedule\"}) by (type, name)",
2024020240
"interval": "",
2024120241
"legendFormat": "{{ name }}",
2024220242
"queryType": "randomWalk",

pkg/ttl/metrics/metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,8 @@ func UpdateDelayMetrics(records map[int64]*DelayMetricsRecord) {
255255
metrics.TTLWatermarkDelay.With(prometheus.Labels{metrics.LblType: "schedule", metrics.LblName: delay}).Set(v)
256256
}
257257
}
258+
259+
// ClearDelayMetrics clears the metrics of TTL delay
260+
func ClearDelayMetrics() {
261+
metrics.TTLWatermarkDelay.Reset()
262+
}

pkg/ttl/ttlworker/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ go_test(
101101
"@com_github_ngaut_pools//:pools",
102102
"@com_github_pingcap_errors//:errors",
103103
"@com_github_pingcap_failpoint//:failpoint",
104+
"@com_github_prometheus_client_golang//prometheus",
104105
"@com_github_prometheus_client_model//go",
105106
"@com_github_stretchr_testify//assert",
106107
"@com_github_stretchr_testify//mock",

pkg/ttl/ttlworker/job_manager.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,8 +490,15 @@ func (m *JobManager) reportMetrics(se session.Session) {
490490
metrics.RunningJobsCnt.Set(runningJobs)
491491
metrics.CancellingJobsCnt.Set(cancellingJobs)
492492

493+
if !m.isLeader() {
494+
// only the leader can do collect delay metrics to reduce the performance overhead
495+
metrics.ClearDelayMetrics()
496+
return
497+
}
498+
493499
if time.Since(m.lastReportDelayMetricsTime) > 10*time.Minute {
494500
m.lastReportDelayMetricsTime = time.Now()
501+
logutil.Logger(m.ctx).Info("TTL leader to collect delay metrics")
495502
records, err := GetDelayMetricRecords(m.ctx, se, time.Now())
496503
if err != nil {
497504
logutil.Logger(m.ctx).Info("failed to get TTL delay metrics", zap.Error(err))
@@ -997,7 +1004,17 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) {
9971004
}
9981005

9991006
// DoGC deletes some old TTL job histories and redundant scan tasks
1007+
<<<<<<< HEAD
10001008
func (m *JobManager) DoGC(ctx context.Context, se session.Session) {
1009+
=======
1010+
func (m *JobManager) DoGC(ctx context.Context, se session.Session, now time.Time) {
1011+
if !m.isLeader() {
1012+
// only the leader can do the GC to reduce the performance impact
1013+
return
1014+
}
1015+
1016+
logutil.Logger(m.ctx).Info("TTL leader to DoGC")
1017+
>>>>>>> 75154399927 (ttl: only gc in leader to save performance (#59358))
10011018
// Remove the table not exist in info schema cache.
10021019
// Delete the table status before deleting the tasks. Therefore the related tasks
10031020
if err := m.updateInfoSchemaCache(se); err == nil {

pkg/ttl/ttlworker/job_manager_integration_test.go

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ import (
3030
"github.com/pingcap/failpoint"
3131
"github.com/pingcap/tidb/pkg/domain"
3232
"github.com/pingcap/tidb/pkg/kv"
33+
<<<<<<< HEAD
34+
=======
35+
"github.com/pingcap/tidb/pkg/meta/model"
36+
metrics2 "github.com/pingcap/tidb/pkg/metrics"
37+
>>>>>>> 75154399927 (ttl: only gc in leader to save performance (#59358))
3338
"github.com/pingcap/tidb/pkg/parser/ast"
3439
"github.com/pingcap/tidb/pkg/parser/model"
3540
dbsession "github.com/pingcap/tidb/pkg/session"
@@ -44,6 +49,11 @@ import (
4449
"github.com/pingcap/tidb/pkg/ttl/session"
4550
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
4651
"github.com/pingcap/tidb/pkg/util/logutil"
52+
<<<<<<< HEAD
53+
=======
54+
"github.com/pingcap/tidb/pkg/util/skip"
55+
"github.com/prometheus/client_golang/prometheus"
56+
>>>>>>> 75154399927 (ttl: only gc in leader to save performance (#59358))
4757
dto "github.com/prometheus/client_model/go"
4858
"github.com/stretchr/testify/require"
4959
"go.uber.org/atomic"
@@ -807,11 +817,20 @@ func TestGCScanTasks(t *testing.T) {
807817
addScanTaskRecord(3, 2, 1)
808818
addScanTaskRecord(3, 2, 2)
809819

820+
isLeader := false
810821
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
811-
return true
822+
return isLeader
812823
})
813824
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
825+
<<<<<<< HEAD
814826
m.DoGC(context.TODO(), se)
827+
=======
828+
// only leader can do GC
829+
m.DoGC(context.TODO(), se, se.Now())
830+
tk.MustQuery("select count(1) from mysql.tidb_ttl_task").Check(testkit.Rows("6"))
831+
isLeader = true
832+
m.DoGC(context.TODO(), se, se.Now())
833+
>>>>>>> 75154399927 (ttl: only gc in leader to save performance (#59358))
815834
tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2"))
816835
}
817836

@@ -826,11 +845,20 @@ func TestGCTableStatus(t *testing.T) {
826845
// insert table status without corresponding table
827846
tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", 2024, 2024)
828847

848+
isLeader := false
829849
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
830-
return true
850+
return isLeader
831851
})
832852
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
853+
<<<<<<< HEAD
833854
m.DoGC(context.TODO(), se)
855+
=======
856+
// only leader can do GC
857+
m.DoGC(context.TODO(), se, se.Now())
858+
tk.MustQuery("select count(1) from mysql.tidb_ttl_table_status").Check(testkit.Rows("1"))
859+
isLeader = true
860+
m.DoGC(context.TODO(), se, se.Now())
861+
>>>>>>> 75154399927 (ttl: only gc in leader to save performance (#59358))
834862
tk.MustQuery("select * from mysql.tidb_ttl_table_status").Check(nil)
835863

836864
// insert a running table status without corresponding table
@@ -887,11 +915,20 @@ func TestGCTTLHistory(t *testing.T) {
887915
addHistory(6, 91)
888916
addHistory(7, 100)
889917

918+
isLeader := false
890919
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
891-
return true
920+
return isLeader
892921
})
893922
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
923+
<<<<<<< HEAD
894924
m.DoGC(context.TODO(), se)
925+
=======
926+
m.DoGC(context.TODO(), se, se.Now())
927+
// only leader can go GC
928+
tk.MustQuery("select count(1) from mysql.tidb_ttl_job_history").Check(testkit.Rows("7"))
929+
isLeader = true
930+
m.DoGC(context.TODO(), se, se.Now())
931+
>>>>>>> 75154399927 (ttl: only gc in leader to save performance (#59358))
895932
tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5"))
896933
}
897934

@@ -1057,6 +1094,53 @@ func TestDelayMetrics(t *testing.T) {
10571094
checkRecord(records, "t3", now.Add(-3*time.Hour))
10581095
checkRecord(records, "t4", now.Add(-3*time.Hour))
10591096
checkRecord(records, "t5", emptyTime)
1097+
1098+
metrics.ClearDelayMetrics()
1099+
getMetricCnt := func() int {
1100+
ch := make(chan prometheus.Metric)
1101+
go func() {
1102+
metrics2.TTLWatermarkDelay.Collect(ch)
1103+
close(ch)
1104+
}()
1105+
1106+
cnt := 0
1107+
for range ch {
1108+
cnt++
1109+
}
1110+
return cnt
1111+
}
1112+
1113+
isLeader := false
1114+
m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, func() bool {
1115+
return isLeader
1116+
})
1117+
// If the manager is not leader, the metrics will be empty.
1118+
m.ReportMetrics(se)
1119+
require.Zero(t, getMetricCnt())
1120+
// leader will collect metrics
1121+
isLeader = true
1122+
m.SetLastReportDelayMetricsTime(time.Now().Add(-11 * time.Minute))
1123+
m.ReportMetrics(se)
1124+
require.Equal(t, len(metrics.WaterMarkScheduleDelayNames), getMetricCnt())
1125+
require.InDelta(t, time.Now().Unix(), m.GetLastReportDelayMetricsTime().Unix(), 5)
1126+
// will not collect metrics in 10 minutes
1127+
lastReportTime := time.Now().Add(-9 * time.Minute)
1128+
m.SetLastReportDelayMetricsTime(lastReportTime)
1129+
m.ReportMetrics(se)
1130+
require.Equal(t, len(metrics.WaterMarkScheduleDelayNames), getMetricCnt())
1131+
require.Equal(t, lastReportTime.Unix(), m.GetLastReportDelayMetricsTime().Unix(), 5)
1132+
// when back to non-leader, the metrics will be empty and last report time will not be updated.
1133+
isLeader = false
1134+
lastReportTime = time.Now().Add(-11 * time.Minute)
1135+
m.SetLastReportDelayMetricsTime(lastReportTime)
1136+
m.ReportMetrics(se)
1137+
require.Zero(t, getMetricCnt())
1138+
require.Equal(t, lastReportTime.Unix(), m.GetLastReportDelayMetricsTime().Unix())
1139+
// when back to leader again, the metrics will be collected.
1140+
isLeader = true
1141+
m.ReportMetrics(se)
1142+
require.Equal(t, len(metrics.WaterMarkScheduleDelayNames), getMetricCnt())
1143+
require.InDelta(t, time.Now().Unix(), m.GetLastReportDelayMetricsTime().Unix(), 5)
10601144
}
10611145

10621146
func TestManagerJobAdapterCanSubmitJob(t *testing.T) {
@@ -1444,6 +1528,7 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
14441528

14451529
ctx := context.Background()
14461530
m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
1531+
<<<<<<< HEAD
14471532
require.NoError(t, m1.InfoSchemaCache().Update(se))
14481533
require.NoError(t, m1.TableStatusCache().Update(ctx, se))
14491534

@@ -1464,6 +1549,11 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
14641549
require.NoError(t, m2.InfoSchemaCache().Update(se))
14651550
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
14661551
m2.RescheduleJobs(se, now)
1552+
=======
1553+
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, func() bool {
1554+
return true
1555+
})
1556+
>>>>>>> 75154399927 (ttl: only gc in leader to save performance (#59358))
14671557

14681558
// the job should have been cancelled
14691559
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))

pkg/ttl/ttlworker/job_manager_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,16 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no
190190
return m.updateHeartBeat(ctx, se, now)
191191
}
192192

193+
// SetLastReportDelayMetricsTime sets the lastReportDelayMetricsTime for test
194+
func (m *JobManager) SetLastReportDelayMetricsTime(t time.Time) {
195+
m.lastReportDelayMetricsTime = t
196+
}
197+
198+
// GetLastReportDelayMetricsTime returns the lastReportDelayMetricsTime for test
199+
func (m *JobManager) GetLastReportDelayMetricsTime() time.Time {
200+
return m.lastReportDelayMetricsTime
201+
}
202+
193203
// ReportMetrics is an exported version of reportMetrics
194204
func (m *JobManager) ReportMetrics(se session.Session) {
195205
m.reportMetrics(se)

0 commit comments

Comments
 (0)