@@ -17,6 +17,10 @@ package ttlworker
17
17
import (
18
18
"context"
19
19
"errors"
20
+ "fmt"
21
+ "math"
22
+ "slices"
23
+ "strconv"
20
24
"strings"
21
25
"testing"
22
26
"time"
@@ -163,48 +167,65 @@ func TestTTLDelRetryBuffer(t *testing.T) {
163
167
164
168
// test task should be immutable
165
169
require .Equal (t , 10 , len (task5 .rows ))
170
+
171
+ // test drain
172
+ require .Equal (t , 0 , buffer .Len ())
173
+ task6 , rows6 , statics6 := createTask ("t6" )
174
+ buffer .RecordTaskResult (task6 , rows6 [:7 ])
175
+ require .Equal (t , 1 , buffer .Len ())
176
+ require .Equal (t , uint64 (0 ), statics6 .SuccessRows .Load ())
177
+ require .Equal (t , uint64 (0 ), statics6 .ErrorRows .Load ())
178
+ buffer .Drain ()
179
+ require .Equal (t , 0 , buffer .Len ())
180
+ require .Equal (t , uint64 (0 ), statics6 .SuccessRows .Load ())
181
+ require .Equal (t , uint64 (7 ), statics6 .ErrorRows .Load ())
166
182
}
167
183
168
184
func TestTTLDeleteTaskDoDelete (t * testing.T ) {
169
185
origBatchSize := variable .TTLDeleteBatchSize .Load ()
170
- variable .TTLDeleteBatchSize .Store (3 )
186
+ delBatch := 3
187
+ variable .TTLDeleteBatchSize .Store (int64 (delBatch ))
171
188
defer variable .TTLDeleteBatchSize .Store (origBatchSize )
172
189
173
190
t1 := newMockTTLTbl (t , "t1" )
174
- t2 := newMockTTLTbl (t , "t2" )
175
- t3 := newMockTTLTbl (t , "t3" )
176
- t4 := newMockTTLTbl (t , "t4" )
177
191
s := newMockSession (t )
192
+ << << << < HEAD:ttl / ttlworker / del_test .go
178
193
invokes := 0
179
194
s .executeSQL = func (ctx context.Context , sql string , args ... interface {}) ([]chunk.Row , error ) {
180
195
invokes ++
181
196
s .sessionInfoSchema = newMockInfoSchema (t1 .TableInfo , t2 .TableInfo , t3 .TableInfo , t4 .TableInfo )
182
197
if strings .Contains (sql , "`t1`" ) {
183
198
return nil , nil
199
+ == == == =
200
+ var sqls []string
201
+ var retryErrBatches []int
202
+ var nonRetryBatches []int
203
+ var afterExecuteSQL func ()
204
+ s .executeSQL = func (ctx context.Context , sql string , args ... any ) ([]chunk.Row , error ) {
205
+ s .sessionInfoSchema = newMockInfoSchema (t1 .TableInfo )
206
+ sqls = append (sqls , sql )
207
+
208
+ if ! strings .Contains (sql , "`t1`" ) {
209
+ require .FailNow (t , "" )
210
+ >> >> >> > 1 bf01f41083 (ttl : fix the issue that TTL job may hang some time when shrink the delete worker count (#55572 )):pkg / ttl / ttlworker / del_test .go
184
211
}
185
212
186
- if strings .Contains (sql , "`t2`" ) {
213
+ defer func () {
214
+ if afterExecuteSQL != nil {
215
+ afterExecuteSQL ()
216
+ }
217
+ }()
218
+
219
+ if slices .Contains (retryErrBatches , len (sqls )- 1 ) {
187
220
return nil , errors .New ("mockErr" )
188
221
}
189
222
190
- if strings .Contains (sql , "`t3`" ) {
223
+ if slices .Contains (nonRetryBatches , len (sqls )- 1 ) {
224
+ // set an infoschema that contains no table to make an error that cannot retry
191
225
s .sessionInfoSchema = newMockInfoSchema ()
192
226
return nil , nil
193
227
}
194
228
195
- if strings .Contains (sql , "`t4`" ) {
196
- switch invokes {
197
- case 1 :
198
- return nil , nil
199
- case 2 , 4 :
200
- return nil , errors .New ("mockErr" )
201
- case 3 :
202
- s .sessionInfoSchema = newMockInfoSchema ()
203
- return nil , nil
204
- }
205
- }
206
-
207
- require .FailNow (t , "" )
208
229
return nil , nil
209
230
}
210
231
@@ -218,63 +239,117 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
218
239
return rows
219
240
}
220
241
221
- delTask := func (t * cache. PhysicalTable ) * ttlDeleteTask {
242
+ delTask := func (batchCnt int ) * ttlDeleteTask {
222
243
task := & ttlDeleteTask {
223
- tbl : t ,
244
+ tbl : t1 ,
224
245
expire : time .UnixMilli (0 ),
225
- rows : nRows (10 ),
246
+ rows : nRows (batchCnt * delBatch ),
226
247
statistics : & ttlStatistics {},
227
248
}
228
- task .statistics .TotalRows .Add (10 )
249
+ task .statistics .TotalRows .Add (uint64 ( batchCnt * delBatch ) )
229
250
return task
230
251
}
231
252
232
253
cases := []struct {
233
- task * ttlDeleteTask
234
- retryRows []int
235
- successRows int
236
- errorRows int
254
+ batchCnt int
255
+ retryErrBatches []int
256
+ noRetryErrBatches []int
257
+ cancelCtx bool
258
+ cancelCtxBatch int
237
259
}{
238
260
{
239
- task : delTask (t1 ),
240
- retryRows : nil ,
241
- successRows : 10 ,
242
- errorRows : 0 ,
261
+ // all success
262
+ batchCnt : 10 ,
263
+ },
264
+ {
265
+ // all retries
266
+ batchCnt : 10 ,
267
+ retryErrBatches : []int {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 },
243
268
},
244
269
{
245
- task : delTask (t2 ),
246
- retryRows : []int {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 },
247
- successRows : 0 ,
248
- errorRows : 0 ,
270
+ // all errors without retry
271
+ batchCnt : 10 ,
272
+ noRetryErrBatches : []int {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 },
249
273
},
250
274
{
251
- task : delTask ( t3 ),
252
- retryRows : nil ,
253
- successRows : 0 ,
254
- errorRows : 10 ,
275
+ // some retries and some not
276
+ batchCnt : 10 ,
277
+ noRetryErrBatches : [] int { 3 , 8 , 9 } ,
278
+ retryErrBatches : [] int { 1 , 2 , 4 } ,
255
279
},
256
280
{
257
- task : delTask (t4 ),
258
- retryRows : []int {3 , 4 , 5 , 9 },
259
- successRows : 3 ,
260
- errorRows : 3 ,
281
+ // some retries and some not
282
+ batchCnt : 10 ,
283
+ noRetryErrBatches : []int {3 , 8 , 9 },
284
+ retryErrBatches : []int {1 , 2 , 4 },
285
+ cancelCtx : true ,
286
+ cancelCtxBatch : 6 ,
261
287
},
262
288
}
263
289
264
290
for _ , c := range cases {
265
- invokes = 0
266
- retryRows := c .task .doDelete (context .Background (), s )
267
- require .Equal (t , 4 , invokes )
268
- if c .retryRows == nil {
269
- require .Nil (t , retryRows )
291
+ ctx , cancel := context .WithCancel (context .Background ())
292
+ if c .cancelCtx && c .cancelCtxBatch == 0 {
293
+ cancel ()
270
294
}
271
- require .Equal (t , len (c .retryRows ), len (retryRows ))
272
- for i , row := range retryRows {
273
- require .Equal (t , int64 (c .retryRows [i ]), row [0 ].GetInt64 ())
295
+
296
+ afterExecuteSQL = func () {
297
+ if c .cancelCtx {
298
+ if len (sqls ) == c .cancelCtxBatch {
299
+ cancel ()
300
+ }
301
+ }
274
302
}
275
- require .Equal (t , uint64 (10 ), c .task .statistics .TotalRows .Load ())
276
- require .Equal (t , uint64 (c .successRows ), c .task .statistics .SuccessRows .Load ())
277
- require .Equal (t , uint64 (c .errorRows ), c .task .statistics .ErrorRows .Load ())
303
+
304
+ task := delTask (c .batchCnt )
305
+ require .Equal (t , len (task .rows ), c .batchCnt * delBatch )
306
+ sqls = make ([]string , 0 , c .batchCnt )
307
+ retryErrBatches = c .retryErrBatches
308
+ nonRetryBatches = c .noRetryErrBatches
309
+ retryRows := task .doDelete (ctx , s )
310
+ realBatchCnt := c .batchCnt
311
+ if c .cancelCtx {
312
+ realBatchCnt = c .cancelCtxBatch
313
+ }
314
+ require .LessOrEqual (t , realBatchCnt , c .batchCnt )
315
+
316
+ // check SQLs
317
+ require .Equal (t , realBatchCnt , len (sqls ))
318
+ expectedSQLs := make ([]string , 0 , len (sqls ))
319
+ for i := 0 ; i < realBatchCnt ; i ++ {
320
+ batch := task .rows [i * delBatch : (i + 1 )* delBatch ]
321
+ idList := make ([]string , 0 , delBatch )
322
+ for _ , row := range batch {
323
+ idList = append (idList , strconv .FormatInt (row [0 ].GetInt64 (), 10 ))
324
+ }
325
+ sql := fmt .Sprintf ("DELETE LOW_PRIORITY FROM `test`.`t1` " +
326
+ "WHERE `_tidb_rowid` IN (%s) AND `time` < FROM_UNIXTIME(0) LIMIT %d" ,
327
+ strings .Join (idList , ", " ),
328
+ delBatch ,
329
+ )
330
+ expectedSQLs = append (expectedSQLs , sql )
331
+ }
332
+ require .Equal (t , strings .Join (expectedSQLs , "\n " ), strings .Join (sqls , "\n " ))
333
+
334
+ // check retry rows
335
+ var expectedRetryRows [][]types.Datum
336
+ for i := 0 ; i < realBatchCnt ; i ++ {
337
+ if slices .Contains (c .retryErrBatches , i ) {
338
+ expectedRetryRows = append (expectedRetryRows , task .rows [i * delBatch :(i + 1 )* delBatch ]... )
339
+ }
340
+ }
341
+ require .Equal (t , expectedRetryRows , retryRows )
342
+
343
+ // check statistics
344
+ var expectedErrRows uint64
345
+ for i := 0 ; i < c .batchCnt ; i ++ {
346
+ if i >= realBatchCnt || slices .Contains (c .noRetryErrBatches , i ) {
347
+ expectedErrRows += uint64 (delBatch )
348
+ }
349
+ }
350
+ expectedSuccessRows := uint64 (len (task .rows )) - expectedErrRows - uint64 (len (expectedRetryRows ))
351
+ require .Equal (t , expectedSuccessRows , task .statistics .SuccessRows .Load ())
352
+ require .Equal (t , expectedErrRows , task .statistics .ErrorRows .Load ())
278
353
}
279
354
}
280
355
@@ -317,37 +392,63 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
317
392
t1 := newMockTTLTbl (t , "t1" )
318
393
t2 := newMockTTLTbl (t , "t2" )
319
394
t3 := newMockTTLTbl (t , "t3" )
395
+ t4 := newMockTTLTbl (t , "t4" )
320
396
s := newMockSession (t )
321
397
pool := newMockSessionPool (t )
322
398
pool .se = s
323
399
400
+ << << << < HEAD:ttl / ttlworker / del_test .go
324
401
sqlMap := make (map [string ]struct {})
325
402
s .executeSQL = func (ctx context.Context , sql string , args ... interface {}) ([]chunk.Row , error ) {
326
403
pool .lastSession .sessionInfoSchema = newMockInfoSchema (t1 .TableInfo , t2 .TableInfo , t3 .TableInfo )
404
+ == == == =
405
+ sqlMap := make (map [string ]int )
406
+ t3Retried := make (chan struct {})
407
+ t4Retried := make (chan struct {})
408
+ s .executeSQL = func (ctx context.Context , sql string , args ... any ) ([]chunk.Row , error ) {
409
+ pool .lastSession .sessionInfoSchema = newMockInfoSchema (t1 .TableInfo , t2 .TableInfo , t3 .TableInfo , t4 .TableInfo )
410
+ >> >> >> > 1 bf01f41083 (ttl : fix the issue that TTL job may hang some time when shrink the delete worker count (#55572 )):pkg / ttl / ttlworker / del_test .go
327
411
if strings .Contains (sql , "`t1`" ) {
412
+ // success
328
413
return nil , nil
329
414
}
330
415
331
416
if strings .Contains (sql , "`t2`" ) {
417
+ // first error, retry success
332
418
if _ , ok := sqlMap [sql ]; ok {
419
+ close (t3Retried )
333
420
return nil , nil
334
421
}
335
- sqlMap [sql ] = struct {}{}
422
+ sqlMap [sql ] = 1
336
423
return nil , errors .New ("mockErr" )
337
424
}
338
425
339
426
if strings .Contains (sql , "`t3`" ) {
427
+ // error no retry
340
428
pool .lastSession .sessionInfoSchema = newMockInfoSchema ()
341
429
return nil , nil
342
430
}
343
431
432
+ if strings .Contains (sql , "`t4`" ) {
433
+ // error and retry still error
434
+ // this is to test the retry buffer should be drained after the delete worker stopped
435
+ i := sqlMap [sql ]
436
+ if i >= 2 {
437
+ // i >= 2 means t4 has retried once and records in retry buffer
438
+ close (t4Retried )
439
+ }
440
+ sqlMap [sql ] = i + 1
441
+ return nil , errors .New ("mockErr" )
442
+ }
443
+
344
444
require .FailNow (t , "" )
345
445
return nil , nil
346
446
}
347
447
348
448
delCh := make (chan * ttlDeleteTask )
349
449
w := newDeleteWorker (delCh , pool )
350
450
w .retryBuffer .retryInterval = time .Millisecond
451
+ w .retryBuffer .maxRetry = math .MaxInt
351
452
require .Equal (t , workerStatusCreated , w .Status ())
352
453
w .Start ()
353
454
require .Equal (t , workerStatusRunning , w .Status ())
@@ -357,7 +458,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
357
458
}()
358
459
359
460
tasks := make ([]* ttlDeleteTask , 0 )
360
- for _ , tbl := range []* cache.PhysicalTable {t1 , t2 , t3 } {
461
+ for _ , tbl := range []* cache.PhysicalTable {t1 , t2 , t3 , t4 } {
361
462
task := & ttlDeleteTask {
362
463
tbl : tbl ,
363
464
expire : time .UnixMilli (0 ),
@@ -377,7 +478,23 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
377
478
}
378
479
}
379
480
380
- time .Sleep (time .Millisecond * 100 )
481
+ select {
482
+ case <- t3Retried :
483
+ case <- time .After (time .Second ):
484
+ require .FailNow (t , "" )
485
+ }
486
+
487
+ select {
488
+ case <- t4Retried :
489
+ case <- time .After (time .Second ):
490
+ require .FailNow (t , "" )
491
+ }
492
+
493
+ // before stop, t4 should always retry without any error rows
494
+ require .Equal (t , uint64 (0 ), tasks [3 ].statistics .ErrorRows .Load ())
495
+ w .Stop ()
496
+ require .NoError (t , w .WaitStopped (context .Background (), 10 * time .Second ))
497
+
381
498
require .Equal (t , uint64 (3 ), tasks [0 ].statistics .SuccessRows .Load ())
382
499
require .Equal (t , uint64 (0 ), tasks [0 ].statistics .ErrorRows .Load ())
383
500
@@ -386,4 +503,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
386
503
387
504
require .Equal (t , uint64 (0 ), tasks [2 ].statistics .SuccessRows .Load ())
388
505
require .Equal (t , uint64 (3 ), tasks [2 ].statistics .ErrorRows .Load ())
506
+
507
+ require .Equal (t , uint64 (0 ), tasks [3 ].statistics .SuccessRows .Load ())
508
+ require .Equal (t , uint64 (3 ), tasks [3 ].statistics .ErrorRows .Load ())
389
509
}
0 commit comments