@@ -33,6 +33,7 @@ import (
33
33
"github.com/pingcap/tidb/pkg/domain"
34
34
"github.com/pingcap/tidb/pkg/kv"
35
35
"github.com/pingcap/tidb/pkg/meta/model"
36
+ metrics2 "github.com/pingcap/tidb/pkg/metrics"
36
37
"github.com/pingcap/tidb/pkg/parser/ast"
37
38
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
38
39
"github.com/pingcap/tidb/pkg/statistics"
@@ -49,6 +50,7 @@ import (
49
50
"github.com/pingcap/tidb/pkg/util"
50
51
"github.com/pingcap/tidb/pkg/util/logutil"
51
52
"github.com/pingcap/tidb/pkg/util/skip"
53
+ "github.com/prometheus/client_golang/prometheus"
52
54
dto "github.com/prometheus/client_model/go"
53
55
"github.com/stretchr/testify/require"
54
56
"go.uber.org/atomic"
@@ -797,10 +799,15 @@ func TestGCScanTasks(t *testing.T) {
797
799
addScanTaskRecord (3 , 2 , 1 )
798
800
addScanTaskRecord (3 , 2 , 2 )
799
801
802
+ isLeader := false
800
803
m := ttlworker .NewJobManager ("manager-1" , nil , store , nil , func () bool {
801
- return true
804
+ return isLeader
802
805
})
803
806
se := session .NewSession (tk .Session (), tk .Session (), func (_ session.Session ) {})
807
+ // only leader can do GC
808
+ m .DoGC (context .TODO (), se , se .Now ())
809
+ tk .MustQuery ("select count(1) from mysql.tidb_ttl_task" ).Check (testkit .Rows ("6" ))
810
+ isLeader = true
804
811
m .DoGC (context .TODO (), se , se .Now ())
805
812
tk .MustQuery ("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc" ).Check (testkit .Rows ("1 1" , "1 2" ))
806
813
}
@@ -816,10 +823,15 @@ func TestGCTableStatus(t *testing.T) {
816
823
// insert table status without corresponding table
817
824
tk .MustExec ("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)" , 2024 , 2024 )
818
825
826
+ isLeader := false
819
827
m := ttlworker .NewJobManager ("manager-1" , nil , store , nil , func () bool {
820
- return true
828
+ return isLeader
821
829
})
822
830
se := session .NewSession (tk .Session (), tk .Session (), func (_ session.Session ) {})
831
+ // only leader can do GC
832
+ m .DoGC (context .TODO (), se , se .Now ())
833
+ tk .MustQuery ("select count(1) from mysql.tidb_ttl_table_status" ).Check (testkit .Rows ("1" ))
834
+ isLeader = true
823
835
m .DoGC (context .TODO (), se , se .Now ())
824
836
tk .MustQuery ("select * from mysql.tidb_ttl_table_status" ).Check (nil )
825
837
@@ -877,11 +889,16 @@ func TestGCTTLHistory(t *testing.T) {
877
889
addHistory (6 , 91 )
878
890
addHistory (7 , 100 )
879
891
892
+ isLeader := false
880
893
m := ttlworker .NewJobManager ("manager-1" , nil , store , nil , func () bool {
881
- return true
894
+ return isLeader
882
895
})
883
896
se := session .NewSession (tk .Session (), tk .Session (), func (_ session.Session ) {})
884
897
m .DoGC (context .TODO (), se , se .Now ())
898
+ // only leader can go GC
899
+ tk .MustQuery ("select count(1) from mysql.tidb_ttl_job_history" ).Check (testkit .Rows ("7" ))
900
+ isLeader = true
901
+ m .DoGC (context .TODO (), se , se .Now ())
885
902
tk .MustQuery ("select job_id from mysql.tidb_ttl_job_history order by job_id asc" ).Check (testkit .Rows ("1" , "2" , "3" , "4" , "5" ))
886
903
}
887
904
@@ -1047,6 +1064,37 @@ func TestDelayMetrics(t *testing.T) {
1047
1064
checkRecord (records , "t3" , now .Add (- 3 * time .Hour ))
1048
1065
checkRecord (records , "t4" , now .Add (- 3 * time .Hour ))
1049
1066
checkRecord (records , "t5" , emptyTime )
1067
+
1068
+ metrics .ClearDelayMetrics ()
1069
+ getMetricCnt := func () int {
1070
+ ch := make (chan prometheus.Metric )
1071
+ go func () {
1072
+ metrics2 .TTLWatermarkDelay .Collect (ch )
1073
+ close (ch )
1074
+ }()
1075
+
1076
+ cnt := 0
1077
+ for range ch {
1078
+ cnt ++
1079
+ }
1080
+ return cnt
1081
+ }
1082
+
1083
+ isLeader := false
1084
+ m := ttlworker .NewJobManager ("test-ttl-job-manager" , nil , store , nil , func () bool {
1085
+ return isLeader
1086
+ })
1087
+ // If the manager is not leader, the metrics will be empty.
1088
+ m .ReportMetrics (se )
1089
+ require .Zero (t , getMetricCnt ())
1090
+ // leader will collect metrics
1091
+ isLeader = true
1092
+ m .ReportMetrics (se )
1093
+ require .Equal (t , len (metrics .WaterMarkScheduleDelayNames ), getMetricCnt ())
1094
+ // when back to non-leader, the metrics will be empty.
1095
+ isLeader = false
1096
+ m .ReportMetrics (se )
1097
+ require .Zero (t , getMetricCnt ())
1050
1098
}
1051
1099
1052
1100
type poolTestWrapper struct {
@@ -1503,7 +1551,9 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
1503
1551
1504
1552
ctx := context .Background ()
1505
1553
m1 := ttlworker .NewJobManager ("test-ttl-job-manager-1" , nil , store , nil , nil )
1506
- m2 := ttlworker .NewJobManager ("test-ttl-job-manager-2" , nil , store , nil , nil )
1554
+ m2 := ttlworker .NewJobManager ("test-ttl-job-manager-2" , nil , store , nil , func () bool {
1555
+ return true
1556
+ })
1507
1557
1508
1558
now := se .Now ()
1509
1559
0 commit comments