Skip to content

Commit 4fac6e1

Browse files
lcwangchaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#58306
Signed-off-by: ti-chi-bot <[email protected]>
1 parent f3072ea commit 4fac6e1

File tree

4 files changed

+68
-30
lines changed

4 files changed

+68
-30
lines changed

pkg/ttl/ttlworker/job_manager_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ import (
3333
"github.com/pingcap/tidb/pkg/util/chunk"
3434
"github.com/stretchr/testify/assert"
3535
"github.com/stretchr/testify/require"
36+
<<<<<<< HEAD
37+
=======
38+
"github.com/tikv/client-go/v2/testutils"
39+
"github.com/tikv/client-go/v2/tikv"
40+
"github.com/tikv/client-go/v2/tikvrpc"
41+
>>>>>>> d92dce025a4 (ttl: reduce some warnings logs when locking TTL tasks (#58306))
3642
)
3743

3844
func newTTLTableStatusRows(status ...*cache.TableStatus) []chunk.Row {
@@ -674,3 +680,29 @@ func TestLocalJobs(t *testing.T) {
674680
assert.Len(t, m.localJobs(), 1)
675681
assert.Equal(t, m.localJobs()[0].id, "1")
676682
}
683+
<<<<<<< HEAD
684+
=======
685+
686+
func TestSplitCnt(t *testing.T) {
687+
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
688+
require.NoError(t, err)
689+
defer func() {
690+
pdClient.Close()
691+
err = mockClient.Close()
692+
require.NoError(t, err)
693+
}()
694+
695+
require.Equal(t, 64, getScanSplitCnt(nil))
696+
require.Equal(t, 64, getScanSplitCnt(&mockKVStore{}))
697+
698+
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)}
699+
for i := uint64(1); i <= 128; i++ {
700+
s.GetRegionCache().SetRegionCacheStore(i, "", "", tikvrpc.TiKV, 1, nil)
701+
if i <= 64 {
702+
require.Equal(t, 64, getScanSplitCnt(s))
703+
} else {
704+
require.Equal(t, int(i), getScanSplitCnt(s))
705+
}
706+
}
707+
}
708+
>>>>>>> d92dce025a4 (ttl: reduce some warnings logs when locking TTL tasks (#58306))

pkg/ttl/ttlworker/task_manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,15 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) {
309309
return
310310
}
311311

312+
if len(tasks) == 0 {
313+
return
314+
}
315+
316+
err = m.infoSchemaCache.Update(se)
317+
if err != nil {
318+
logutil.Logger(m.ctx).Warn("fail to update infoSchemaCache", zap.Error(err))
319+
return
320+
}
312321
loop:
313322
for _, t := range tasks {
314323
logger := logutil.Logger(m.ctx).With(

pkg/ttl/ttlworker/task_manager_integration_test.go

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ func TestParallelSchedule(t *testing.T) {
130130
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
131131
tk.MustExec(sql)
132132
}
133-
isc := cache.NewInfoSchemaCache(time.Second)
134-
require.NoError(t, isc.Update(sessionFactory()))
135133
scheduleWg := sync.WaitGroup{}
136134
finishTasks := make([]func(), 0, 4)
137135
for i := 0; i < 4; i++ {
@@ -143,7 +141,7 @@ func TestParallelSchedule(t *testing.T) {
143141
}
144142

145143
managerID := fmt.Sprintf("task-manager-%d", i)
146-
m := ttlworker.NewTaskManager(context.Background(), nil, isc, managerID, store)
144+
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), managerID, store)
147145
m.SetScanWorkers4Test(workers)
148146
scheduleWg.Add(1)
149147
go func() {
@@ -187,14 +185,10 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
187185
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1)
188186
tk.MustExec(sql)
189187

190-
// update the infoschema cache
191-
isc := cache.NewInfoSchemaCache(time.Second)
192-
require.NoError(t, isc.Update(sessionFactory()))
193-
194188
// schedule in a task manager
195189
scanWorker := ttlworker.NewMockScanWorker(t)
196190
scanWorker.Start()
197-
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
191+
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-1", store)
198192
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
199193
se := sessionFactory()
200194
now := se.Now()
@@ -204,7 +198,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
204198
// another task manager should fetch this task after heartbeat expire
205199
scanWorker2 := ttlworker.NewMockScanWorker(t)
206200
scanWorker2.Start()
207-
m2 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-2", store)
201+
m2 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-2", store)
208202
m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2})
209203
m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
210204
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2"))
@@ -215,7 +209,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
215209
m2.CheckFinishedTask(sessionFactory(), now)
216210
scanWorker3 := ttlworker.NewMockScanWorker(t)
217211
scanWorker3.Start()
218-
m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3", store)
212+
m3 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-3", store)
219213
m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3})
220214
m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
221215
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2"))
@@ -235,14 +229,10 @@ func TestTaskMetrics(t *testing.T) {
235229
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1)
236230
tk.MustExec(sql)
237231

238-
// update the infoschema cache
239-
isc := cache.NewInfoSchemaCache(time.Second)
240-
require.NoError(t, isc.Update(sessionFactory()))
241-
242232
// schedule in a task manager
243233
scanWorker := ttlworker.NewMockScanWorker(t)
244234
scanWorker.Start()
245-
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
235+
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
246236
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
247237
se := sessionFactory()
248238
now := se.Now()
@@ -268,13 +258,11 @@ func TestRescheduleWithError(t *testing.T) {
268258

269259
se := sessionFactory()
270260
now := se.Now()
271-
isc := cache.NewInfoSchemaCache(time.Second)
272-
require.NoError(t, isc.Update(se))
273261

274262
// schedule in a task manager
275263
scanWorker := ttlworker.NewMockScanWorker(t)
276264
scanWorker.Start()
277-
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
265+
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
278266
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
279267
notify := make(chan struct{})
280268
go func() {
@@ -307,8 +295,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
307295
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
308296
tk.MustExec(sql)
309297
}
310-
isc := cache.NewInfoSchemaCache(time.Second)
311-
require.NoError(t, isc.Update(sessionFactory()))
298+
312299
scheduleWg := sync.WaitGroup{}
313300
for i := 0; i < 16; i++ {
314301
workers := []ttlworker.Worker{}
@@ -319,7 +306,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
319306
}
320307

321308
ctx := logutil.WithKeyValue(context.Background(), "ttl-worker-test", fmt.Sprintf("task-manager-%d", i))
322-
m := ttlworker.NewTaskManager(ctx, nil, isc, fmt.Sprintf("task-manager-%d", i), store)
309+
m := ttlworker.NewTaskManager(ctx, nil, cache.NewInfoSchemaCache(time.Minute), fmt.Sprintf("task-manager-%d", i), store)
323310
m.SetScanWorkers4Test(workers)
324311
scheduleWg.Add(1)
325312
go func() {
@@ -384,9 +371,14 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
384371
se := sessionFactory()
385372
now := se.Now()
386373

374+
<<<<<<< HEAD
387375
isc := cache.NewInfoSchemaCache(time.Minute)
388376
require.NoError(t, isc.Update(se))
389377
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store)
378+
=======
379+
m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-1", store)
380+
381+
>>>>>>> d92dce025a4 (ttl: reduce some warnings logs when locking TTL tasks (#58306))
390382
startBlockNotifyCh := make(chan struct{})
391383
blockCancelCh := make(chan struct{})
392384
workers := make([]ttlworker.Worker, 0, taskCnt)
@@ -521,7 +513,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
521513
))
522514

523515
// A resigned task can be obtained by other task managers
524-
m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-2", store)
516+
m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-2", store)
525517
worker2 := ttlworker.NewMockScanWorker(t)
526518
worker2.Start()
527519
defer func() {
@@ -561,8 +553,6 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
561553
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
562554
tk.MustExec(sql)
563555
}
564-
isc := cache.NewInfoSchemaCache(time.Second)
565-
require.NoError(t, isc.Update(se))
566556

567557
workers := []ttlworker.Worker{}
568558
for j := 0; j < 8; j++ {
@@ -572,10 +562,10 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
572562
}
573563

574564
now := se.Now()
575-
m1 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
565+
m1 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
576566
m1.SetScanWorkers4Test(workers[0:4])
577567
m1.RescheduleTasks(se, now)
578-
m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-2", store)
568+
m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-2", store)
579569
m2.SetScanWorkers4Test(workers[4:])
580570

581571
// All tasks should be scheduled to m1 and running
@@ -664,9 +654,7 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
664654
se := sessionFactory()
665655
now := se.Now()
666656

667-
isc := cache.NewInfoSchemaCache(time.Minute)
668-
require.NoError(t, isc.Update(se))
669-
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
657+
m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
670658
workers := []ttlworker.Worker{}
671659
for j := 0; j < 4; j++ {
672660
scanWorker := ttlworker.NewMockScanWorker(t)

pkg/ttl/ttlworker/task_manager_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/tidb/pkg/util/logutil"
2828
"github.com/stretchr/testify/assert"
2929
"github.com/stretchr/testify/require"
30+
"github.com/tikv/client-go/v2/testutils"
3031
"github.com/tikv/client-go/v2/tikv"
3132
"github.com/tikv/client-go/v2/tikvrpc"
3233
)
@@ -283,6 +284,14 @@ func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache {
283284
}
284285

285286
func TestGetMaxRunningTasksLimit(t *testing.T) {
287+
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
288+
require.NoError(t, err)
289+
defer func() {
290+
pdClient.Close()
291+
err = mockClient.Close()
292+
require.NoError(t, err)
293+
}()
294+
286295
variable.TTLRunningTasks.Store(1)
287296
require.Equal(t, 1, getMaxRunningTasksLimit(&mockTiKVStore{}))
288297

@@ -294,7 +303,7 @@ func TestGetMaxRunningTasksLimit(t *testing.T) {
294303
require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockKVStore{}))
295304
require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockTiKVStore{}))
296305

297-
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)}
306+
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)}
298307
s.GetRegionCache().SetRegionCacheStore(1, "", "", tikvrpc.TiKV, 1, nil)
299308
s.GetRegionCache().SetRegionCacheStore(2, "", "", tikvrpc.TiKV, 1, nil)
300309
s.GetRegionCache().SetRegionCacheStore(3, "", "", tikvrpc.TiFlash, 1, nil)

0 commit comments

Comments
 (0)