Skip to content

Commit 1f03420

Browse files
authored
disttask: fix subtask cannot resume after upgrade (#51072)
close #50894
1 parent e3e0f7e commit 1f03420

File tree

3 files changed

+32
-2
lines changed

3 files changed

+32
-2
lines changed

pkg/disttask/framework/storage/task_table.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,3 +746,19 @@ func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, e
746746
}
747747
return subtasks, nil
748748
}
749+
750+
// AdjustTaskOverflowConcurrency change the task concurrency to a max value supported by current cluster.
751+
// This is a workaround for an upgrade bug: in v7.5.x, the task concurrency is hard-coded to 16, resulting in
752+
// a stuck issue if the new version TiDB has less than 16 CPU count.
753+
// We don't adjust the concurrency in subtask table because this field does not exist in v7.5.0.
754+
// For details, see https://github.com/pingcap/tidb/issues/50894.
755+
// For the following versions, there is a check when submiting a new task. This function should be a no-op.
756+
func (mgr *TaskManager) AdjustTaskOverflowConcurrency(ctx context.Context, se sessionctx.Context) error {
757+
cpuCount, err := mgr.getCPUCountOfManagedNode(ctx, se)
758+
if err != nil {
759+
return err
760+
}
761+
sql := "update mysql.tidb_global_task set concurrency = %? where concurrency > %?;"
762+
_, err = sqlexec.ExecSQL(ctx, se, sql, cpuCount, cpuCount)
763+
return err
764+
}

pkg/session/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ go_library(
2323
"//pkg/ddl/placement",
2424
"//pkg/ddl/schematracker",
2525
"//pkg/ddl/syncer",
26+
"//pkg/disttask/framework/storage",
2627
"//pkg/domain",
2728
"//pkg/domain/infosync",
2829
"//pkg/errno",
@@ -106,6 +107,7 @@ go_library(
106107
"@com_github_pingcap_errors//:errors",
107108
"@com_github_pingcap_failpoint//:failpoint",
108109
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
110+
"@com_github_pingcap_log//:log",
109111
"@com_github_pingcap_tipb//go-binlog",
110112
"@com_github_stretchr_testify//require",
111113
"@com_github_tikv_client_go_v2//error",

pkg/session/sync_upgrade.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ import (
1919
"time"
2020

2121
"github.com/pingcap/failpoint"
22+
"github.com/pingcap/log"
2223
"github.com/pingcap/tidb/pkg/ddl"
2324
"github.com/pingcap/tidb/pkg/ddl/syncer"
25+
dist_store "github.com/pingcap/tidb/pkg/disttask/framework/storage"
2426
"github.com/pingcap/tidb/pkg/domain"
27+
"github.com/pingcap/tidb/pkg/kv"
2528
"github.com/pingcap/tidb/pkg/owner"
2629
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
2730
"github.com/pingcap/tidb/pkg/sessionctx"
@@ -77,11 +80,12 @@ func SyncUpgradeState(s sessionctx.Context, timeout time.Duration) error {
7780

7881
// SyncNormalRunning syncs normal state to etcd.
7982
func SyncNormalRunning(s sessionctx.Context) error {
83+
bgCtx := context.Background()
8084
failpoint.Inject("mockResumeAllJobsFailed", func(val failpoint.Value) {
8185
if val.(bool) {
8286
dom := domain.GetDomain(s)
8387
//nolint: errcheck
84-
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), syncer.NewStateInfo(syncer.StateNormalRunning))
88+
dom.DDL().StateSyncer().UpdateGlobalState(bgCtx, syncer.NewStateInfo(syncer.StateNormalRunning))
8589
failpoint.Return(nil)
8690
}
8791
})
@@ -95,7 +99,15 @@ func SyncNormalRunning(s sessionctx.Context) error {
9599
logger.Warn("resume the job failed", zap.Error(e))
96100
}
97101

98-
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
102+
if mgr, _ := dist_store.GetTaskManager(); mgr != nil {
103+
ctx := kv.WithInternalSourceType(bgCtx, kv.InternalDistTask)
104+
err := mgr.AdjustTaskOverflowConcurrency(ctx, s)
105+
if err != nil {
106+
log.Warn("cannot adjust task overflow concurrency", zap.Error(err))
107+
}
108+
}
109+
110+
ctx, cancelFunc := context.WithTimeout(bgCtx, 3*time.Second)
99111
defer cancelFunc()
100112
dom := domain.GetDomain(s)
101113
err = dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateNormalRunning))

0 commit comments

Comments
 (0)