Skip to content

Commit e7ebb75

Browse files
authored
ddl: fix cancelling stuck issue during adding index (pingcap#61541)
close pingcap#61087
1 parent 17bb929 commit e7ebb75

File tree

6 files changed

+85
-23
lines changed

6 files changed

+85
-23
lines changed

pkg/ddl/backfilling.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func (w *backfillWorker) sendResult(result *backfillResult) {
338338
}
339339

340340
func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
341-
logger := ddlLogger.With(zap.Stringer("worker", w), zap.Int64("jobID", job.ID))
341+
logger := logutil.BgLogger().With(zap.String("category", "ddl"), zap.Stringer("worker", w), zap.Int64("jobID", job.ID))
342342
var (
343343
curTaskID int
344344
task *reorgBackfillTask
@@ -634,6 +634,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
634634
bfWorkerType backfillerType,
635635
reorgInfo *reorgInfo,
636636
) error {
637+
logger := logutil.BgLogger().With(zap.String("category", "ddl"))
637638
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
638639

639640
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
@@ -674,7 +675,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
674675
return egCtx.Err()
675676
case result, ok := <-scheduler.resultChan():
676677
if !ok {
677-
ddlLogger.Info("backfill workers successfully processed",
678+
logger.Info("backfill workers successfully processed",
678679
zap.Stringer("element", reorgInfo.currElement),
679680
zap.Int64("total added count", totalAddedCount),
680681
zap.String("start key", hex.EncodeToString(startKey)))
@@ -683,7 +684,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
683684
cnt++
684685

685686
if result.err != nil {
686-
ddlLogger.Warn("backfill worker failed",
687+
logger.Warn("backfill worker failed",
687688
zap.Int64("job ID", reorgInfo.ID),
688689
zap.Int64("total added count", totalAddedCount),
689690
zap.String("start key", hex.EncodeToString(startKey)),
@@ -704,15 +705,15 @@ func (dc *ddlCtx) writePhysicalTableRecord(
704705
if cnt%(scheduler.currentWorkerSize()*4) == 0 {
705706
err2 := reorgInfo.UpdateReorgMeta(keeper.nextKey, sessPool)
706707
if err2 != nil {
707-
ddlLogger.Warn("update reorg meta failed",
708+
logger.Warn("update reorg meta failed",
708709
zap.Int64("job ID", reorgInfo.ID),
709710
zap.Error(err2))
710711
}
711712
// We try to adjust the worker size regularly to reduce
712713
// the overhead of loading the DDL related global variables.
713714
err2 = scheduler.adjustWorkerSize()
714715
if err2 != nil {
715-
ddlLogger.Warn("cannot adjust backfill worker size",
716+
logger.Warn("cannot adjust backfill worker size",
716717
zap.Int64("job ID", reorgInfo.ID),
717718
zap.Error(err2))
718719
}
@@ -735,7 +736,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
735736
if len(kvRanges) == 0 {
736737
break
737738
}
738-
ddlLogger.Info("start backfill workers to reorg record",
739+
logutil.BgLogger().Info("start backfill workers to reorg record",
739740
zap.Stringer("type", bfWorkerType),
740741
zap.Int("workerCnt", scheduler.currentWorkerSize()),
741742
zap.Int("regionCnt", len(kvRanges)),

pkg/ddl/backfilling_operators.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,10 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
451451
var done bool
452452
for !done {
453453
srcChk := w.getChunk()
454+
if srcChk == nil {
455+
terror.Call(rs.Close)
456+
return err
457+
}
454458
done, err = fetchTableScanResult(w.ctx, w.copCtx.GetBase(), rs, srcChk)
455459
if err != nil || util2.IsContextDone(w.ctx) {
456460
w.recycleChunk(srcChk)
@@ -468,17 +472,25 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
468472
}
469473

470474
func (w *tableScanWorker) getChunk() *chunk.Chunk {
471-
chk := <-w.srcChkPool
472-
newCap := copReadBatchSize()
473-
if chk.Capacity() != newCap {
474-
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
475+
select {
476+
case <-w.ctx.Done():
477+
return nil
478+
case chk := <-w.srcChkPool:
479+
newCap := copReadBatchSize()
480+
if chk.Capacity() != newCap {
481+
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
482+
}
483+
chk.Reset()
484+
return chk
475485
}
476-
chk.Reset()
477-
return chk
478486
}
479487

480488
func (w *tableScanWorker) recycleChunk(chk *chunk.Chunk) {
481-
w.srcChkPool <- chk
489+
select {
490+
case <-w.ctx.Done():
491+
return
492+
case w.srcChkPool <- chk:
493+
}
482494
}
483495

484496
// WriteExternalStoreOperator writes index records to external storage.

pkg/ddl/ddl.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ const (
9898
recoverCheckFlagDisableGC
9999
)
100100

101-
var ddlLogger = logutil.BgLogger().With(zap.String("category", "ddl"))
102-
103101
// OnExist specifies what to do when a new object has a name collision.
104102
type OnExist uint8
105103

pkg/ddl/index_cop.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,12 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
155155
var done bool
156156
startTime := time.Now()
157157
for !done {
158+
failpoint.InjectCall("beforeGetChunk")
158159
srcChk := p.getChunk()
160+
if srcChk == nil {
161+
terror.Call(rs.Close)
162+
return err
163+
}
159164
done, err = fetchTableScanResult(p.ctx, p.copCtx.GetBase(), rs, srcChk)
160165
if err != nil {
161166
p.recycleChunk(srcChk)
@@ -253,21 +258,29 @@ func (c *copReqSenderPool) close(force bool) {
253258
}
254259

255260
func (c *copReqSenderPool) getChunk() *chunk.Chunk {
256-
chk := <-c.srcChkPool
257-
newCap := copReadBatchSize()
258-
if chk.Capacity() != newCap {
259-
chk = chunk.NewChunkWithCapacity(c.copCtx.GetBase().FieldTypes, newCap)
261+
select {
262+
case <-c.ctx.Done():
263+
return nil
264+
case chk := <-c.srcChkPool:
265+
newCap := copReadBatchSize()
266+
if chk.Capacity() != newCap {
267+
chk = chunk.NewChunkWithCapacity(c.copCtx.GetBase().FieldTypes, newCap)
268+
}
269+
chk.Reset()
270+
return chk
260271
}
261-
chk.Reset()
262-
return chk
263272
}
264273

265274
// recycleChunk puts the index record slice and the chunk back to the pool for reuse.
266275
func (c *copReqSenderPool) recycleChunk(chk *chunk.Chunk) {
267276
if chk == nil {
268277
return
269278
}
270-
c.srcChkPool <- chk
279+
select {
280+
case <-c.ctx.Done():
281+
return
282+
case c.srcChkPool <- chk:
283+
}
271284
}
272285

273286
func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) {

pkg/ddl/ingest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ go_test(
6767
embed = [":ingest"],
6868
flaky = True,
6969
race = "on",
70-
shard_count = 18,
70+
shard_count = 19,
7171
deps = [
7272
"//pkg/config",
7373
"//pkg/ddl",

pkg/ddl/ingest/integration_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,3 +402,41 @@ func TestMultiSchemaAddIndexMerge(t *testing.T) {
402402
tk.MustExec("admin check table t;")
403403
}
404404
}
405+
406+
func TestAddIndexGetChunkCancel(t *testing.T) {
407+
store, dom := testkit.CreateMockStoreAndDomain(t)
408+
tk := testkit.NewTestKit(t, store)
409+
tk.MustExec("use test;")
410+
defer ingesttestutil.InjectMockBackendMgr(t, store)()
411+
412+
tk.MustExec("create table t (a int primary key, b int);")
413+
for i := 0; i < 100; i++ {
414+
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
415+
}
416+
tk.MustExec("split table t between (0) and (1000000) regions 10;")
417+
jobID := int64(0)
418+
hook := &callback.TestDDLCallback{Do: dom}
419+
hook.OnJobRunBeforeExported = func(job *model.Job) {
420+
if jobID != 0 {
421+
return
422+
}
423+
if job.Type == model.ActionAddIndex {
424+
jobID = job.ID
425+
}
426+
}
427+
dom.DDL().SetHook(hook)
428+
429+
cancelled := false
430+
err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/beforeGetChunk", func() {
431+
if !cancelled {
432+
tk2 := testkit.NewTestKit(t, store)
433+
tk2.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobID))
434+
cancelled = true
435+
}
436+
})
437+
require.NoError(t, err)
438+
tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob)
439+
require.True(t, cancelled)
440+
tk.MustExec("admin check table t;")
441+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/beforeGetChunk"))
442+
}

0 commit comments

Comments
 (0)