Skip to content

Commit c66caf7

Browse files
committed
ddl: fix cancelling stuck issue during adding index (pingcap#61544)
Signed-off-by: tangenta <[email protected]>
1 parent c647853 commit c66caf7

File tree

4 files changed

+79
-15
lines changed

4 files changed

+79
-15
lines changed

pkg/ddl/backfilling_operators.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,10 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
466466
var done bool
467467
for !done {
468468
srcChk := w.getChunk()
469+
if srcChk == nil {
470+
terror.Call(rs.Close)
471+
return err
472+
}
469473
done, err = fetchTableScanResult(w.ctx, w.copCtx.GetBase(), rs, srcChk)
470474
if err != nil || w.ctx.Err() != nil {
471475
w.recycleChunk(srcChk)
@@ -483,17 +487,25 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
483487
}
484488

485489
func (w *tableScanWorker) getChunk() *chunk.Chunk {
486-
chk := <-w.srcChkPool
487-
newCap := copReadBatchSize()
488-
if chk.Capacity() != newCap {
489-
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
490+
select {
491+
case <-w.ctx.Done():
492+
return nil
493+
case chk := <-w.srcChkPool:
494+
newCap := copReadBatchSize()
495+
if chk.Capacity() != newCap {
496+
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
497+
}
498+
chk.Reset()
499+
return chk
490500
}
491-
chk.Reset()
492-
return chk
493501
}
494502

495503
func (w *tableScanWorker) recycleChunk(chk *chunk.Chunk) {
496-
w.srcChkPool <- chk
504+
select {
505+
case <-w.ctx.Done():
506+
return
507+
case w.srcChkPool <- chk:
508+
}
497509
}
498510

499511
// WriteExternalStoreOperator writes index records to external storage.

pkg/ddl/index_cop.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,12 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
149149
var done bool
150150
startTime := time.Now()
151151
for !done {
152+
failpoint.InjectCall("beforeGetChunk")
152153
srcChk := p.getChunk()
154+
if srcChk == nil {
155+
terror.Call(rs.Close)
156+
return err
157+
}
153158
done, err = fetchTableScanResult(p.ctx, p.copCtx.GetBase(), rs, srcChk)
154159
if err != nil {
155160
p.recycleChunk(srcChk)
@@ -245,21 +250,29 @@ func (c *copReqSenderPool) close(force bool) {
245250
}
246251

247252
func (c *copReqSenderPool) getChunk() *chunk.Chunk {
248-
chk := <-c.srcChkPool
249-
newCap := copReadBatchSize()
250-
if chk.Capacity() != newCap {
251-
chk = chunk.NewChunkWithCapacity(c.copCtx.GetBase().FieldTypes, newCap)
253+
select {
254+
case <-c.ctx.Done():
255+
return nil
256+
case chk := <-c.srcChkPool:
257+
newCap := copReadBatchSize()
258+
if chk.Capacity() != newCap {
259+
chk = chunk.NewChunkWithCapacity(c.copCtx.GetBase().FieldTypes, newCap)
260+
}
261+
chk.Reset()
262+
return chk
252263
}
253-
chk.Reset()
254-
return chk
255264
}
256265

257266
// recycleChunk puts the index record slice and the chunk back to the pool for reuse.
258267
func (c *copReqSenderPool) recycleChunk(chk *chunk.Chunk) {
259268
if chk == nil {
260269
return
261270
}
262-
c.srcChkPool <- chk
271+
select {
272+
case <-c.ctx.Done():
273+
return
274+
case c.srcChkPool <- chk:
275+
}
263276
}
264277

265278
func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) {

pkg/ddl/ingest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ go_test(
7272
embed = [":ingest"],
7373
flaky = True,
7474
race = "on",
75-
shard_count = 17,
75+
shard_count = 18,
7676
deps = [
7777
"//pkg/config",
7878
"//pkg/ddl",

pkg/ddl/ingest/integration_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,42 @@ func TestMultiSchemaAddIndexMerge(t *testing.T) {
376376
tk.MustExec("admin check table t;")
377377
}
378378
}
379+
380+
func TestAddIndexGetChunkCancel(t *testing.T) {
381+
store, dom := testkit.CreateMockStoreAndDomain(t)
382+
tk := testkit.NewTestKit(t, store)
383+
tk.MustExec("use test;")
384+
defer ingesttestutil.InjectMockBackendMgr(t, store)()
385+
386+
tk.MustExec("create table t (a int primary key, b int);")
387+
for i := 0; i < 100; i++ {
388+
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
389+
}
390+
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
391+
tk.MustExec("split table t between (0) and (1000000) regions 10;")
392+
jobID := int64(0)
393+
hook := &callback.TestDDLCallback{Do: dom}
394+
hook.OnJobRunBeforeExported = func(job *model.Job) {
395+
if jobID != 0 {
396+
return
397+
}
398+
if job.Type == model.ActionAddIndex {
399+
jobID = job.ID
400+
}
401+
}
402+
dom.DDL().SetHook(hook)
403+
404+
cancelled := false
405+
err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/beforeGetChunk", func() {
406+
if !cancelled {
407+
tk2 := testkit.NewTestKit(t, store)
408+
tk2.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobID))
409+
cancelled = true
410+
}
411+
})
412+
require.NoError(t, err)
413+
tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob)
414+
require.True(t, cancelled)
415+
tk.MustExec("admin check table t;")
416+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/beforeGetChunk"))
417+
}

0 commit comments

Comments
 (0)