Skip to content

Commit 2d8423f

Browse files
D3Hunterti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#59271
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 48a94ab commit 2d8423f

File tree

6 files changed

+87
-2
lines changed

6 files changed

+87
-2
lines changed

pkg/ddl/backfilling.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,12 @@ func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPi
897897
case <-ctx.Done():
898898
return
899899
case <-ticker.C:
900+
<<<<<<< HEAD
900901
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault()
902+
=======
903+
failpoint.InjectCall("onUpdateJobParam")
904+
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeed()
905+
>>>>>>> 444c38fba07 (workerpool: fix block on Tune when all workers finished (#59271))
901906
if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() {
902907
bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed)
903908
logutil.DDLIngestLogger().Info("adjust ddl job config success",
@@ -938,7 +943,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job
938943
}
939944

940945
err = pipe.Close()
941-
946+
failpoint.InjectCall("afterPipeLineClose")
942947
cancel()
943948
wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit
944949
if opErr := ctx.OperatorErr(); opErr != nil {

pkg/ddl/job_worker.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,12 @@ func (w *worker) runOneJobStep(
874874
case <-stopCheckingJobCancelled:
875875
return
876876
case <-ticker.C:
877+
<<<<<<< HEAD
877878
latestJob, err := jobCtx.sysTblMgr.GetJobByID(w.workCtx, job.ID)
879+
=======
880+
failpoint.InjectCall("checkJobCancelled", job)
881+
latestJob, err := sysTblMgr.GetJobByID(w.workCtx, job.ID)
882+
>>>>>>> 444c38fba07 (workerpool: fix block on Tune when all workers finished (#59271))
878883
if err == systable.ErrNotFound {
879884
logutil.DDLLogger().Info(
880885
"job not found, might already finished",

pkg/resourcemanager/pool/workerpool/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ go_library(
88
deps = [
99
"//pkg/resourcemanager/util",
1010
"//pkg/util",
11+
"//pkg/util/logutil",
1112
"//pkg/util/syncutil",
1213
"@org_uber_go_atomic//:atomic",
14+
"@org_uber_go_zap//:zap",
1315
],
1416
)
1517

pkg/resourcemanager/pool/workerpool/workerpool.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import (
2020

2121
"github.com/pingcap/tidb/pkg/resourcemanager/util"
2222
tidbutil "github.com/pingcap/tidb/pkg/util"
23+
"github.com/pingcap/tidb/pkg/util/logutil"
2324
"github.com/pingcap/tidb/pkg/util/syncutil"
2425
atomicutil "go.uber.org/atomic"
26+
"go.uber.org/zap"
2527
)
2628

2729
// TaskMayPanic is a type to remind the developer that need to handle panic in
@@ -196,8 +198,15 @@ func (p *WorkerPool[T, R]) Tune(numWorkers int32) {
196198
}
197199
} else if diff < 0 {
198200
// Remove workers
201+
outer:
199202
for i := 0; i < int(-diff); i++ {
200-
p.quitChan <- struct{}{}
203+
select {
204+
case p.quitChan <- struct{}{}:
205+
case <-p.ctx.Done():
206+
logutil.BgLogger().Info("context done when tuning worker pool",
207+
zap.Int32("from", p.numWorkers), zap.Int32("to", numWorkers))
208+
break outer
209+
}
201210
}
202211
}
203212
p.numWorkers = numWorkers

tests/realtikvtest/addindextest2/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_test(
44
name = "addindextest2_test",
55
timeout = "long",
66
srcs = [
7+
"alter_job_test.go",
78
"global_sort_test.go",
89
"main_test.go",
910
],
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package addindextest
16+
17+
import (
18+
"fmt"
19+
"sync/atomic"
20+
"testing"
21+
"time"
22+
23+
"github.com/pingcap/tidb/pkg/meta/model"
24+
"github.com/pingcap/tidb/pkg/testkit"
25+
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
26+
"github.com/pingcap/tidb/tests/realtikvtest"
27+
)
28+
29+
func TestAlterThreadRightAfterJobFinish(t *testing.T) {
30+
store := realtikvtest.CreateMockStoreAndSetup(t)
31+
tk := testkit.NewTestKit(t, store)
32+
tk.MustExec("use test")
33+
tk.MustExec("set global tidb_enable_dist_task=0;")
34+
tk.MustExec("create table t (c1 int primary key, c2 int)")
35+
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")
36+
var updated bool
37+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/checkJobCancelled", func(job *model.Job) {
38+
if !updated && job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization {
39+
updated = true
40+
fmt.Println("TEST-LOG: set thread=1")
41+
tk2 := testkit.NewTestKit(t, store)
42+
tk2.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 1", job.ID))
43+
}
44+
})
45+
var pipeClosed atomic.Bool
46+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterPipeLineClose", func() {
47+
fmt.Println("TEST-LOG: start sleep")
48+
pipeClosed.Store(true)
49+
time.Sleep(5 * time.Second)
50+
fmt.Println("TEST-LOG: end sleep")
51+
})
52+
var onUpdateJobParam bool
53+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onUpdateJobParam", func() {
54+
if !onUpdateJobParam {
55+
onUpdateJobParam = true
56+
for !pipeClosed.Load() {
57+
time.Sleep(100 * time.Millisecond)
58+
}
59+
fmt.Println("TEST-LOG: proceed update param")
60+
}
61+
})
62+
tk.MustExec("alter table t add index idx(c2)")
63+
}

0 commit comments

Comments
 (0)