@@ -179,6 +179,22 @@ func TestTTLDelRetryBuffer(t *testing.T) {
179
179
require .Equal (t , 0 , buffer .Len ())
180
180
require .Equal (t , uint64 (0 ), statics6 .SuccessRows .Load ())
181
181
require .Equal (t , uint64 (7 ), statics6 .ErrorRows .Load ())
182
+
183
+ // test should only retry at most once for one item in a DoRetry call.
184
+ buffer2 := newTTLDelRetryBuffer ()
185
+ buffer2 .SetRetryInterval (0 )
186
+ buffer2 .maxRetry = math .MaxInt
187
+ task7 , rows7 , statics7 := createTask ("t7" )
188
+ buffer2 .RecordTaskResult (task7 , rows7 [:8 ])
189
+ require .Equal (t , 1 , buffer2 .Len ())
190
+ currentRetryFn := doRetryFail
191
+ buffer2 .DoRetry (func (task * ttlDeleteTask ) [][]types.Datum {
192
+ fn := currentRetryFn
193
+ currentRetryFn = shouldNotDoRetry
194
+ return fn (task )
195
+ })
196
+ require .Equal (t , uint64 (1 ), statics7 .SuccessRows .Load ())
197
+ require .Equal (t , uint64 (0 ), statics7 .ErrorRows .Load ())
182
198
}
183
199
184
200
func TestTTLDeleteTaskDoDelete (t * testing.T ) {
@@ -269,7 +285,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
269
285
retryErrBatches : []int {1 , 2 , 4 },
270
286
},
271
287
{
272
- // some retries and some not
288
+ // some retries and some not and some are executed when ctx canceled
273
289
batchCnt : 10 ,
274
290
noRetryErrBatches : []int {3 , 8 , 9 },
275
291
retryErrBatches : []int {1 , 2 , 4 },
@@ -279,6 +295,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
279
295
}
280
296
281
297
for _ , c := range cases {
298
+ require .True (t , c .cancelCtxBatch >= 0 && c .cancelCtxBatch < c .batchCnt )
282
299
ctx , cancel := context .WithCancel (context .Background ())
283
300
if c .cancelCtx && c .cancelCtxBatch == 0 {
284
301
cancel ()
@@ -298,16 +315,14 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
298
315
retryErrBatches = c .retryErrBatches
299
316
nonRetryBatches = c .noRetryErrBatches
300
317
retryRows := task .doDelete (ctx , s )
301
- realBatchCnt := c .batchCnt
302
- if c .cancelCtx {
303
- realBatchCnt = c .cancelCtxBatch
304
- }
305
- require .LessOrEqual (t , realBatchCnt , c .batchCnt )
306
318
307
319
// check SQLs
308
- require .Equal (t , realBatchCnt , len (sqls ))
309
320
expectedSQLs := make ([]string , 0 , len (sqls ))
310
- for i := 0 ; i < realBatchCnt ; i ++ {
321
+ for i := 0 ; i < c .batchCnt ; i ++ {
322
+ if c .cancelCtx && i >= c .cancelCtxBatch {
323
+ break
324
+ }
325
+
311
326
batch := task .rows [i * delBatch : (i + 1 )* delBatch ]
312
327
idList := make ([]string , 0 , delBatch )
313
328
for _ , row := range batch {
@@ -324,8 +339,8 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
324
339
325
340
// check retry rows
326
341
var expectedRetryRows [][]types.Datum
327
- for i := 0 ; i < realBatchCnt ; i ++ {
328
- if slices .Contains (c .retryErrBatches , i ) {
342
+ for i := 0 ; i < c . batchCnt ; i ++ {
343
+ if slices .Contains (c .retryErrBatches , i ) || ( c . cancelCtx && i >= c . cancelCtxBatch ) {
329
344
expectedRetryRows = append (expectedRetryRows , task .rows [i * delBatch :(i + 1 )* delBatch ]... )
330
345
}
331
346
}
@@ -334,7 +349,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
334
349
// check statistics
335
350
var expectedErrRows uint64
336
351
for i := 0 ; i < c .batchCnt ; i ++ {
337
- if i >= realBatchCnt || slices .Contains (c .noRetryErrBatches , i ) {
352
+ if slices .Contains (c .noRetryErrBatches , i ) && ! ( c . cancelCtx && i >= c . cancelCtxBatch ) {
338
353
expectedErrRows += uint64 (delBatch )
339
354
}
340
355
}
@@ -384,6 +399,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
384
399
t2 := newMockTTLTbl (t , "t2" )
385
400
t3 := newMockTTLTbl (t , "t3" )
386
401
t4 := newMockTTLTbl (t , "t4" )
402
+ t5 := newMockTTLTbl (t , "t5" )
387
403
s := newMockSession (t )
388
404
pool := newMockSessionPool (t )
389
405
pool .se = s
@@ -392,8 +408,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
392
408
sqlMap := make (map [string ]int )
393
409
t3Retried := make (chan struct {})
394
410
t4Retried := make (chan struct {})
411
+ t5Executed := make (chan struct {})
395
412
s .executeSQL = func (ctx context.Context , sql string , args ... any ) ([]chunk.Row , error ) {
396
- pool .lastSession .sessionInfoSchema = newMockInfoSchema (t1 .TableInfo , t2 .TableInfo , t3 .TableInfo , t4 .TableInfo )
413
+ pool .lastSession .sessionInfoSchema = newMockInfoSchema (
414
+ t1 .TableInfo , t2 .TableInfo , t3 .TableInfo , t4 .TableInfo , t5 .TableInfo ,
415
+ )
397
416
if strings .Contains (sql , "`t1`" ) {
398
417
// success
399
418
return nil , nil
@@ -419,20 +438,35 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
419
438
// error and retry still error
420
439
// this is to test the retry buffer should be drained after the delete worker stopped
421
440
i := sqlMap [sql ]
422
- if i > = 2 {
423
- // i > = 2 means t4 has retried once and records in retry buffer
441
+ if i = = 2 {
442
+ // i = = 2 means t4 has retried once and records in retry buffer
424
443
close (t4Retried )
425
444
}
426
445
sqlMap [sql ] = i + 1
427
446
return nil , errors .New ("mockErr" )
428
447
}
429
448
449
+ if strings .Contains (sql , "`t5`" ) {
450
+ // error when the worker is running,
451
+ // success when flushing retry buffer while the worker stopping.
452
+ i := sqlMap [sql ]
453
+ sqlMap [sql ] = i + 1
454
+ if ctx .Value ("delWorker" ) != nil {
455
+ if i == 1 {
456
+ close (t5Executed )
457
+ }
458
+ return nil , errors .New ("mockErr" )
459
+ }
460
+ return nil , nil
461
+ }
462
+
430
463
require .FailNow (t , "" )
431
464
return nil , nil
432
465
}
433
466
434
467
delCh := make (chan * ttlDeleteTask )
435
468
w := newDeleteWorker (delCh , pool )
469
+ w .ctx = context .WithValue (w .ctx , "delWorker" , struct {}{})
436
470
w .retryBuffer .retryInterval = time .Millisecond
437
471
w .retryBuffer .maxRetry = math .MaxInt
438
472
require .Equal (t , workerStatusCreated , w .Status ())
@@ -444,7 +478,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
444
478
}()
445
479
446
480
tasks := make ([]* ttlDeleteTask , 0 )
447
- for _ , tbl := range []* cache.PhysicalTable {t1 , t2 , t3 , t4 } {
481
+ for _ , tbl := range []* cache.PhysicalTable {t1 , t2 , t3 , t4 , t5 } {
448
482
task := & ttlDeleteTask {
449
483
tbl : tbl ,
450
484
expire : time .UnixMilli (0 ),
@@ -476,8 +510,17 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
476
510
require .FailNow (t , "" )
477
511
}
478
512
479
- // before stop, t4 should always retry without any error rows
513
+ select {
514
+ case <- t5Executed :
515
+ case <- time .After (time .Second ):
516
+ require .FailNow (t , "" )
517
+ }
518
+
519
+ // before stop, t4, t5 should always retry without any error rows
520
+ require .Equal (t , uint64 (0 ), tasks [3 ].statistics .SuccessRows .Load ())
480
521
require .Equal (t , uint64 (0 ), tasks [3 ].statistics .ErrorRows .Load ())
522
+ require .Equal (t , uint64 (0 ), tasks [4 ].statistics .SuccessRows .Load ())
523
+ require .Equal (t , uint64 (0 ), tasks [4 ].statistics .ErrorRows .Load ())
481
524
w .Stop ()
482
525
require .NoError (t , w .WaitStopped (context .Background (), 10 * time .Second ))
483
526
@@ -490,6 +533,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
490
533
require .Equal (t , uint64 (0 ), tasks [2 ].statistics .SuccessRows .Load ())
491
534
require .Equal (t , uint64 (3 ), tasks [2 ].statistics .ErrorRows .Load ())
492
535
536
+ // t4 should be error because the buffer flush error while the worker stopping.
493
537
require .Equal (t , uint64 (0 ), tasks [3 ].statistics .SuccessRows .Load ())
494
538
require .Equal (t , uint64 (3 ), tasks [3 ].statistics .ErrorRows .Load ())
539
+
540
+ // t5 should be success because the buffer flush success while the worker stopping.
541
+ require .Equal (t , uint64 (3 ), tasks [4 ].statistics .SuccessRows .Load ())
542
+ require .Equal (t , uint64 (0 ), tasks [4 ].statistics .ErrorRows .Load ())
495
543
}
0 commit comments