@@ -1727,8 +1727,6 @@ func TestJobManagerWithFault(t *testing.T) {
1727
1727
}
1728
1728
1729
1729
stopTestCh := make (chan struct {})
1730
- wg := & sync.WaitGroup {}
1731
- wg .Add (1 )
1732
1730
1733
1731
fault := newFaultWithFilter (func (sql string ) bool {
1734
1732
// skip some local only sql, ref `getSession()` in `session.go`
@@ -1739,6 +1737,10 @@ func TestJobManagerWithFault(t *testing.T) {
1739
1737
1740
1738
return true
1741
1739
}, newFaultWithProbability (faultPercent ))
1740
+
1741
+ wg := & sync.WaitGroup {}
1742
+ // start the goroutine to inject fault to managers randomly
1743
+ wg .Add (1 )
1742
1744
go func () {
1743
1745
defer wg .Done ()
1744
1746
@@ -1775,6 +1777,38 @@ func TestJobManagerWithFault(t *testing.T) {
1775
1777
}
1776
1778
}()
1777
1779
1780
+ // start the goroutine to randomly scale the worker count
1781
+ wg .Add (1 )
1782
+ go func () {
1783
+ defer wg .Done ()
1784
+
1785
+ maxScanWorkerCount := variable .DefTiDBTTLScanWorkerCount * 2
1786
+ minScanWorkerCount := variable .DefTiDBTTLScanWorkerCount / 2
1787
+
1788
+ maxDelWorkerCount := variable .DefTiDBTTLDeleteWorkerCount * 2
1789
+ minDelWorkerCount := variable .DefTiDBTTLDeleteWorkerCount / 2
1790
+ faultTicker := time .NewTicker (time .Second )
1791
+
1792
+ tk := testkit .NewTestKit (t , store )
1793
+ for {
1794
+ select {
1795
+ case <- stopTestCh :
1796
+ // Recover to the default count
1797
+ tk .MustExec ("set @@global.tidb_ttl_scan_worker_count = ?" , variable .DefTiDBTTLScanWorkerCount )
1798
+ tk .MustExec ("set @@global.tidb_ttl_delete_worker_count = ?" , variable .DefTiDBTTLDeleteWorkerCount )
1799
+
1800
+ return
1801
+ case <- faultTicker .C :
1802
+ scanWorkerCount := rand .Int ()% (maxScanWorkerCount - minScanWorkerCount ) + minScanWorkerCount
1803
+ delWorkerCount := rand .Int ()% (maxDelWorkerCount - minDelWorkerCount ) + minDelWorkerCount
1804
+
1805
+ logutil .BgLogger ().Info ("scale worker count" , zap .Int ("scanWorkerCount" , scanWorkerCount ), zap .Int ("delWorkerCount" , delWorkerCount ))
1806
+ tk .MustExec ("set @@global.tidb_ttl_scan_worker_count = ?" , scanWorkerCount )
1807
+ tk .MustExec ("set @@global.tidb_ttl_delete_worker_count = ?" , delWorkerCount )
1808
+ }
1809
+ }
1810
+ }()
1811
+
1778
1812
// run the workload goroutine
1779
1813
testStart := time .Now ()
1780
1814
for time .Since (testStart ) < testDuration {
@@ -1825,7 +1859,6 @@ func TestJobManagerWithFault(t *testing.T) {
1825
1859
}
1826
1860
1827
1861
logutil .BgLogger ().Info ("test finished" )
1828
- stopTestCh <- struct {}{}
1829
1862
close (stopTestCh )
1830
1863
1831
1864
wg .Wait ()
0 commit comments