@@ -30,6 +30,7 @@ import (
30
30
"github.com/pingcap/failpoint"
31
31
"github.com/pingcap/tidb/pkg/domain"
32
32
"github.com/pingcap/tidb/pkg/kv"
33
+ metrics2 "github.com/pingcap/tidb/pkg/metrics"
33
34
"github.com/pingcap/tidb/pkg/parser/ast"
34
35
"github.com/pingcap/tidb/pkg/parser/model"
35
36
dbsession "github.com/pingcap/tidb/pkg/session"
@@ -44,6 +45,7 @@ import (
44
45
"github.com/pingcap/tidb/pkg/ttl/session"
45
46
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
46
47
"github.com/pingcap/tidb/pkg/util/logutil"
48
+ "github.com/prometheus/client_golang/prometheus"
47
49
dto "github.com/prometheus/client_model/go"
48
50
"github.com/stretchr/testify/require"
49
51
"go.uber.org/atomic"
@@ -807,10 +809,15 @@ func TestGCScanTasks(t *testing.T) {
807
809
addScanTaskRecord (3 , 2 , 1 )
808
810
addScanTaskRecord (3 , 2 , 2 )
809
811
812
+ isLeader := false
810
813
m := ttlworker .NewJobManager ("manager-1" , nil , store , nil , func () bool {
811
- return true
814
+ return isLeader
812
815
})
813
816
se := session .NewSession (tk .Session (), tk .Session (), func (_ session.Session ) {})
817
+ // only leader can do GC
818
+ m .DoGC (context .TODO (), se )
819
+ tk .MustQuery ("select count(1) from mysql.tidb_ttl_task" ).Check (testkit .Rows ("6" ))
820
+ isLeader = true
814
821
m .DoGC (context .TODO (), se )
815
822
tk .MustQuery ("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc" ).Check (testkit .Rows ("1 1" , "1 2" ))
816
823
}
@@ -826,10 +833,15 @@ func TestGCTableStatus(t *testing.T) {
826
833
// insert table status without corresponding table
827
834
tk .MustExec ("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)" , 2024 , 2024 )
828
835
836
+ isLeader := false
829
837
m := ttlworker .NewJobManager ("manager-1" , nil , store , nil , func () bool {
830
- return true
838
+ return isLeader
831
839
})
832
840
se := session .NewSession (tk .Session (), tk .Session (), func (_ session.Session ) {})
841
+ // only leader can do GC
842
+ m .DoGC (context .TODO (), se )
843
+ tk .MustQuery ("select count(1) from mysql.tidb_ttl_table_status" ).Check (testkit .Rows ("1" ))
844
+ isLeader = true
833
845
m .DoGC (context .TODO (), se )
834
846
tk .MustQuery ("select * from mysql.tidb_ttl_table_status" ).Check (nil )
835
847
@@ -887,11 +899,16 @@ func TestGCTTLHistory(t *testing.T) {
887
899
addHistory (6 , 91 )
888
900
addHistory (7 , 100 )
889
901
902
+ isLeader := false
890
903
m := ttlworker .NewJobManager ("manager-1" , nil , store , nil , func () bool {
891
- return true
904
+ return isLeader
892
905
})
893
906
se := session .NewSession (tk .Session (), tk .Session (), func (_ session.Session ) {})
894
907
m .DoGC (context .TODO (), se )
908
+ // only leader can go GC
909
+ tk .MustQuery ("select count(1) from mysql.tidb_ttl_job_history" ).Check (testkit .Rows ("7" ))
910
+ isLeader = true
911
+ m .DoGC (context .TODO (), se )
895
912
tk .MustQuery ("select job_id from mysql.tidb_ttl_job_history order by job_id asc" ).Check (testkit .Rows ("1" , "2" , "3" , "4" , "5" ))
896
913
}
897
914
@@ -1057,6 +1074,53 @@ func TestDelayMetrics(t *testing.T) {
1057
1074
checkRecord (records , "t3" , now .Add (- 3 * time .Hour ))
1058
1075
checkRecord (records , "t4" , now .Add (- 3 * time .Hour ))
1059
1076
checkRecord (records , "t5" , emptyTime )
1077
+
1078
+ metrics .ClearDelayMetrics ()
1079
+ getMetricCnt := func () int {
1080
+ ch := make (chan prometheus.Metric )
1081
+ go func () {
1082
+ metrics2 .TTLWatermarkDelay .Collect (ch )
1083
+ close (ch )
1084
+ }()
1085
+
1086
+ cnt := 0
1087
+ for range ch {
1088
+ cnt ++
1089
+ }
1090
+ return cnt
1091
+ }
1092
+
1093
+ isLeader := false
1094
+ m := ttlworker .NewJobManager ("test-ttl-job-manager" , nil , store , nil , func () bool {
1095
+ return isLeader
1096
+ })
1097
+ // If the manager is not leader, the metrics will be empty.
1098
+ m .ReportMetrics (se )
1099
+ require .Zero (t , getMetricCnt ())
1100
+ // leader will collect metrics
1101
+ isLeader = true
1102
+ m .SetLastReportDelayMetricsTime (time .Now ().Add (- 11 * time .Minute ))
1103
+ m .ReportMetrics (se )
1104
+ require .Equal (t , len (metrics .WaterMarkScheduleDelayNames ), getMetricCnt ())
1105
+ require .InDelta (t , time .Now ().Unix (), m .GetLastReportDelayMetricsTime ().Unix (), 5 )
1106
+ // will not collect metrics in 10 minutes
1107
+ lastReportTime := time .Now ().Add (- 9 * time .Minute )
1108
+ m .SetLastReportDelayMetricsTime (lastReportTime )
1109
+ m .ReportMetrics (se )
1110
+ require .Equal (t , len (metrics .WaterMarkScheduleDelayNames ), getMetricCnt ())
1111
+ require .Equal (t , lastReportTime .Unix (), m .GetLastReportDelayMetricsTime ().Unix (), 5 )
1112
+ // when back to non-leader, the metrics will be empty and last report time will not be updated.
1113
+ isLeader = false
1114
+ lastReportTime = time .Now ().Add (- 11 * time .Minute )
1115
+ m .SetLastReportDelayMetricsTime (lastReportTime )
1116
+ m .ReportMetrics (se )
1117
+ require .Zero (t , getMetricCnt ())
1118
+ require .Equal (t , lastReportTime .Unix (), m .GetLastReportDelayMetricsTime ().Unix ())
1119
+ // when back to leader again, the metrics will be collected.
1120
+ isLeader = true
1121
+ m .ReportMetrics (se )
1122
+ require .Equal (t , len (metrics .WaterMarkScheduleDelayNames ), getMetricCnt ())
1123
+ require .InDelta (t , time .Now ().Unix (), m .GetLastReportDelayMetricsTime ().Unix (), 5 )
1060
1124
}
1061
1125
1062
1126
func TestManagerJobAdapterCanSubmitJob (t * testing.T ) {
0 commit comments