@@ -130,8 +130,6 @@ func TestParallelSchedule(t *testing.T) {
130
130
sql := fmt .Sprintf ("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())" , table .Meta ().ID , i )
131
131
tk .MustExec (sql )
132
132
}
133
- isc := cache .NewInfoSchemaCache (time .Second )
134
- require .NoError (t , isc .Update (sessionFactory ()))
135
133
scheduleWg := sync.WaitGroup {}
136
134
finishTasks := make ([]func (), 0 , 4 )
137
135
for i := 0 ; i < 4 ; i ++ {
@@ -143,7 +141,7 @@ func TestParallelSchedule(t *testing.T) {
143
141
}
144
142
145
143
managerID := fmt .Sprintf ("task-manager-%d" , i )
146
- m := ttlworker .NewTaskManager (context .Background (), nil , isc , managerID , store )
144
+ m := ttlworker .NewTaskManager (context .Background (), nil , cache . NewInfoSchemaCache ( time . Second ) , managerID , store )
147
145
m .SetScanWorkers4Test (workers )
148
146
scheduleWg .Add (1 )
149
147
go func () {
@@ -187,14 +185,10 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
187
185
sql := fmt .Sprintf ("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())" , table .Meta ().ID , 1 )
188
186
tk .MustExec (sql )
189
187
190
- // update the infoschema cache
191
- isc := cache .NewInfoSchemaCache (time .Second )
192
- require .NoError (t , isc .Update (sessionFactory ()))
193
-
194
188
// schedule in a task manager
195
189
scanWorker := ttlworker .NewMockScanWorker (t )
196
190
scanWorker .Start ()
197
- m := ttlworker .NewTaskManager (context .Background (), nil , isc , "task-manager-1" , store )
191
+ m := ttlworker .NewTaskManager (context .Background (), nil , cache . NewInfoSchemaCache ( time . Second ) , "task-manager-1" , store )
198
192
m .SetScanWorkers4Test ([]ttlworker.Worker {scanWorker })
199
193
se := sessionFactory ()
200
194
now := se .Now ()
@@ -204,7 +198,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
204
198
// another task manager should fetch this task after heartbeat expire
205
199
scanWorker2 := ttlworker .NewMockScanWorker (t )
206
200
scanWorker2 .Start ()
207
- m2 := ttlworker .NewTaskManager (context .Background (), nil , isc , "task-manager-2" , store )
201
+ m2 := ttlworker .NewTaskManager (context .Background (), nil , cache . NewInfoSchemaCache ( time . Second ) , "task-manager-2" , store )
208
202
m2 .SetScanWorkers4Test ([]ttlworker.Worker {scanWorker2 })
209
203
m2 .RescheduleTasks (sessionFactory (), now .Add (time .Hour ))
210
204
tk .MustQuery ("select status,owner_id from mysql.tidb_ttl_task" ).Check (testkit .Rows ("running task-manager-2" ))
@@ -215,7 +209,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
215
209
m2 .CheckFinishedTask (sessionFactory (), now )
216
210
scanWorker3 := ttlworker .NewMockScanWorker (t )
217
211
scanWorker3 .Start ()
218
- m3 := ttlworker .NewTaskManager (context .Background (), nil , isc , "task-manager-3" , store )
212
+ m3 := ttlworker .NewTaskManager (context .Background (), nil , cache . NewInfoSchemaCache ( time . Second ) , "task-manager-3" , store )
219
213
m3 .SetScanWorkers4Test ([]ttlworker.Worker {scanWorker3 })
220
214
m3 .RescheduleTasks (sessionFactory (), now .Add (time .Hour ))
221
215
tk .MustQuery ("select status,owner_id from mysql.tidb_ttl_task" ).Check (testkit .Rows ("finished task-manager-2" ))
@@ -235,14 +229,10 @@ func TestTaskMetrics(t *testing.T) {
235
229
sql := fmt .Sprintf ("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())" , table .Meta ().ID , 1 )
236
230
tk .MustExec (sql )
237
231
238
- // update the infoschema cache
239
- isc := cache .NewInfoSchemaCache (time .Second )
240
- require .NoError (t , isc .Update (sessionFactory ()))
241
-
242
232
// schedule in a task manager
243
233
scanWorker := ttlworker .NewMockScanWorker (t )
244
234
scanWorker .Start ()
245
- m := ttlworker .NewTaskManager (context .Background (), nil , isc , "task-manager-1" , store )
235
+ m := ttlworker .NewTaskManager (context .Background (), nil , cache . NewInfoSchemaCache ( time . Minute ) , "task-manager-1" , store )
246
236
m .SetScanWorkers4Test ([]ttlworker.Worker {scanWorker })
247
237
se := sessionFactory ()
248
238
now := se .Now ()
@@ -268,13 +258,11 @@ func TestRescheduleWithError(t *testing.T) {
268
258
269
259
se := sessionFactory ()
270
260
now := se .Now ()
271
- isc := cache .NewInfoSchemaCache (time .Second )
272
- require .NoError (t , isc .Update (se ))
273
261
274
262
// schedule in a task manager
275
263
scanWorker := ttlworker .NewMockScanWorker (t )
276
264
scanWorker .Start ()
277
- m := ttlworker .NewTaskManager (context .Background (), nil , isc , "task-manager-1" , store )
265
+ m := ttlworker .NewTaskManager (context .Background (), nil , cache . NewInfoSchemaCache ( time . Minute ) , "task-manager-1" , store )
278
266
m .SetScanWorkers4Test ([]ttlworker.Worker {scanWorker })
279
267
notify := make (chan struct {})
280
268
go func () {
@@ -307,8 +295,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
307
295
sql := fmt .Sprintf ("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())" , table .Meta ().ID , i )
308
296
tk .MustExec (sql )
309
297
}
310
- isc := cache .NewInfoSchemaCache (time .Second )
311
- require .NoError (t , isc .Update (sessionFactory ()))
298
+
312
299
scheduleWg := sync.WaitGroup {}
313
300
for i := 0 ; i < 16 ; i ++ {
314
301
workers := []ttlworker.Worker {}
@@ -319,7 +306,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
319
306
}
320
307
321
308
ctx := logutil .WithKeyValue (context .Background (), "ttl-worker-test" , fmt .Sprintf ("task-manager-%d" , i ))
322
- m := ttlworker .NewTaskManager (ctx , nil , isc , fmt .Sprintf ("task-manager-%d" , i ), store )
309
+ m := ttlworker .NewTaskManager (ctx , nil , cache . NewInfoSchemaCache ( time . Minute ) , fmt .Sprintf ("task-manager-%d" , i ), store )
323
310
m .SetScanWorkers4Test (workers )
324
311
scheduleWg .Add (1 )
325
312
go func () {
@@ -384,9 +371,8 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
384
371
se := sessionFactory ()
385
372
now := se .Now ()
386
373
387
- isc := cache .NewInfoSchemaCache (time .Minute )
388
- require .NoError (t , isc .Update (se ))
389
- m := ttlworker .NewTaskManager (context .Background (), pool , isc , "scan-manager-1" , store )
374
+ m := ttlworker .NewTaskManager (context .Background (), pool , cache .NewInfoSchemaCache (time .Minute ), "scan-manager-1" , store )
375
+
390
376
startBlockNotifyCh := make (chan struct {})
391
377
blockCancelCh := make (chan struct {})
392
378
workers := make ([]ttlworker.Worker , 0 , taskCnt )
@@ -521,7 +507,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
521
507
))
522
508
523
509
// A resigned task can be obtained by other task managers
524
- m2 := ttlworker .NewTaskManager (context .Background (), pool , isc , "scan-manager-2" , store )
510
+ m2 := ttlworker .NewTaskManager (context .Background (), pool , cache . NewInfoSchemaCache ( time . Minute ) , "scan-manager-2" , store )
525
511
worker2 := ttlworker .NewMockScanWorker (t )
526
512
worker2 .Start ()
527
513
defer func () {
@@ -561,8 +547,6 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
561
547
sql := fmt .Sprintf ("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())" , table .Meta ().ID , i )
562
548
tk .MustExec (sql )
563
549
}
564
- isc := cache .NewInfoSchemaCache (time .Second )
565
- require .NoError (t , isc .Update (se ))
566
550
567
551
workers := []ttlworker.Worker {}
568
552
for j := 0 ; j < 8 ; j ++ {
@@ -572,10 +556,10 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
572
556
}
573
557
574
558
now := se .Now ()
575
- m1 := ttlworker .NewTaskManager (context .Background (), pool , isc , "task-manager-1" , store )
559
+ m1 := ttlworker .NewTaskManager (context .Background (), pool , cache . NewInfoSchemaCache ( time . Minute ) , "task-manager-1" , store )
576
560
m1 .SetScanWorkers4Test (workers [0 :4 ])
577
561
m1 .RescheduleTasks (se , now )
578
- m2 := ttlworker .NewTaskManager (context .Background (), pool , isc , "task-manager-2" , store )
562
+ m2 := ttlworker .NewTaskManager (context .Background (), pool , cache . NewInfoSchemaCache ( time . Minute ) , "task-manager-2" , store )
579
563
m2 .SetScanWorkers4Test (workers [4 :])
580
564
581
565
// All tasks should be scheduled to m1 and running
@@ -664,9 +648,7 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
664
648
se := sessionFactory ()
665
649
now := se .Now ()
666
650
667
- isc := cache .NewInfoSchemaCache (time .Minute )
668
- require .NoError (t , isc .Update (se ))
669
- m := ttlworker .NewTaskManager (context .Background (), pool , isc , "task-manager-1" , store )
651
+ m := ttlworker .NewTaskManager (context .Background (), pool , cache .NewInfoSchemaCache (time .Minute ), "task-manager-1" , store )
670
652
workers := []ttlworker.Worker {}
671
653
for j := 0 ; j < 4 ; j ++ {
672
654
scanWorker := ttlworker .NewMockScanWorker (t )
0 commit comments