Skip to content

reduce add-index worker count might stuck forever if add-index finish right before alter ddl job #59267

@D3Hunter

Description

@D3Hunter

Bug Report

Please answer these questions before submitting your issue. Thanks!

1. Minimal reproduce step (Required)

  • apply this diff
diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go
index 9f3a720414..f91ced1a9b 100644
--- a/pkg/ddl/backfilling.go
+++ b/pkg/ddl/backfilling.go
@@ -837,6 +837,7 @@ func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPi
 		case <-ctx.Done():
 			return
 		case <-ticker.C:
+			failpoint.InjectCall("onUpdateJobParam")
 			maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeed()
 			if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() {
 				bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed)
@@ -876,7 +877,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job
 	}
 
 	err = pipe.Close()
-
+	failpoint.InjectCall("afterPipeLineClose")
 	cancel()
 	wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit
 	if opErr := ctx.OperatorErr(); opErr != nil {
diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go
index 45e135d99c..3ec69b9d05 100644
--- a/pkg/ddl/job_worker.go
+++ b/pkg/ddl/job_worker.go
@@ -854,6 +854,7 @@ func (w *worker) runOneJobStep(
 					case <-stopCheckingJobCancelled:
 						return
 					case <-ticker.C:
+						failpoint.InjectCall("checkJobCancelled", job)
 						latestJob, err := sysTblMgr.GetJobByID(w.workCtx, job.ID)
 						if err == systable.ErrNotFound {
 							logutil.DDLLogger().Info(
diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go
index edd8a420b6..2a6e993795 100644
--- a/tests/realtikvtest/addindextest1/disttask_test.go
+++ b/tests/realtikvtest/addindextest1/disttask_test.go
@@ -464,3 +464,35 @@ func TestAddIndexScheduleAway(t *testing.T) {
 	tk.MustExec("alter table t add index idx(b);")
 	require.NotEqual(t, int64(0), jobID.Load())
 }
+
+func TestSetThread(t *testing.T) {
+	store := realtikvtest.CreateMockStoreAndSetup(t)
+	tk := testkit.NewTestKit(t, store)
+	tk.MustExec("use test")
+	tk.MustExec("set global tidb_enable_dist_task=0;")
+	tk.MustExec("create table t (c1 int primary key, c2 int)")
+	tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")
+	var updated bool
+	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/checkJobCancelled", func(job *model.Job) {
+		if !updated && job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization {
+			updated = true
+			fmt.Println("TEST-LOG: set thread=1")
+			tk2 := testkit.NewTestKit(t, store)
+			tk2.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 1", job.ID))
+		}
+	})
+	pipeClosed := atomic.Bool{}
+	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterPipeLineClose", func() {
+		fmt.Println("TEST-LOG: start sleep")
+		pipeClosed.Store(true)
+		time.Sleep(5 * time.Second)
+		fmt.Println("TEST-LOG: end sleep")
+	})
+	testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onUpdateJobParam", func() {
+		for !pipeClosed.Load() {
+			time.Sleep(100 * time.Millisecond)
+		}
+		fmt.Println("TEST-LOG: proceed update param")
+	})
+	tk.MustExec("alter table t add index idx(c2)")
+}
  • TestSetThread

the cause is we adjust worker count by send to a channel which might not have a receiver when all workers finished

// Remove workers
for i := 0; i < int(-diff); i++ {
p.quitChan <- struct{}{}
}

2. What did you expect to see? (Required)

success

3. What did you see instead (Required)

stuck

4. What is your TiDB version? (Required)

master

Metadata

Metadata

Assignees

No one assigned

    Labels

    affects-8.5This bug affects the 8.5.x(LTS) versions.component/ddlThis issue is related to DDL of TiDB.severity/moderatetype/bugThe issue is confirmed as a bug.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions