Skip to content
4 changes: 2 additions & 2 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2648,7 +2648,7 @@ func (w *worker) executeDistTask(jobCtx *jobContext, t table.Table, reorgInfo *r
// When pausing the related ddl job, it is possible that the task with taskKey is succeed and in tidb_global_task_history.
// As a result, when resuming the related ddl job,
// it is necessary to check task exits in tidb_global_task and tidb_global_task_history tables.
taskManager, err := storage.GetTaskManager()
taskManager, err := handle.GetTaskMgrToAccessDXFService()
if err != nil {
return err
}
Expand Down Expand Up @@ -2991,7 +2991,7 @@ func estimateRowSizeFromRegion(ctx context.Context, store kv.Storage, tbl table.
}

func (w *worker) updateDistTaskRowCount(taskKey string, jobID int64) {
taskMgr, err := storage.GetTaskManager()
taskMgr, err := handle.GetTaskMgrToAccessDXFService()
if err != nil {
logutil.DDLLogger().Warn("cannot get task manager", zap.String("task_key", taskKey), zap.Error(err))
return
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ go_library(
"//pkg/config/kerneltype",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx",
"//pkg/sessionctx/vardef",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/logutil",
"@com_github_docker_go_units//:go-units",
Expand Down
56 changes: 26 additions & 30 deletions pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/atomic"
Expand Down Expand Up @@ -59,7 +62,7 @@ func NotifyTaskChange() {

// GetCPUCountOfNode gets the CPU count of the managed node.
func GetCPUCountOfNode(ctx context.Context) (int, error) {
manager, err := storage.GetTaskManager()
manager, err := GetTaskMgrToAccessDXFService()
if err != nil {
return 0, err
}
Expand All @@ -68,7 +71,7 @@ func GetCPUCountOfNode(ctx context.Context) (int, error) {

// SubmitTask submits a task.
func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, targetScope string, maxNodeCnt int, taskMeta []byte) (*proto.Task, error) {
taskManager, err := storage.GetTaskManager()
taskManager, err := GetTaskMgrToAccessDXFService()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,7 +177,7 @@ func WaitTask(ctx context.Context, id int64, matchFn func(base *proto.TaskBase)

// CancelTask cancels a task.
func CancelTask(ctx context.Context, taskKey string) error {
taskManager, err := storage.GetTaskManager()
taskManager, err := GetTaskMgrToAccessDXFService()
if err != nil {
return err
}
Expand All @@ -191,7 +194,7 @@ func CancelTask(ctx context.Context, taskKey string) error {

// PauseTask pauses a task.
func PauseTask(ctx context.Context, taskKey string) error {
taskManager, err := storage.GetTaskManager()
taskManager, err := GetTaskMgrToAccessDXFService()
if err != nil {
return err
}
Expand All @@ -205,7 +208,7 @@ func PauseTask(ctx context.Context, taskKey string) error {

// ResumeTask resumes a task.
func ResumeTask(ctx context.Context, taskKey string) error {
taskManager, err := storage.GetTaskManager()
taskManager, err := GetTaskMgrToAccessDXFService()
if err != nil {
return err
}
Expand Down Expand Up @@ -289,33 +292,26 @@ func GetCloudStorageURI(ctx context.Context, store kv.Storage) string {

// GetTaskMgrToAccessDXFService returns the task manager to access DXF service.
func GetTaskMgrToAccessDXFService() (*storage.TaskManager, error) {
// TODO currently DXF service is not fully implemented, so we always return
// task manager of current keyspace, replace it with below code when DXF service is ready.
return storage.GetTaskManager()
var (
err error
sysKSSessPool util.SessionPool
)
taskMgr, err := storage.GetTaskManager()
if err != nil {
return nil, err
}
if !keyspace.IsRunningOnUser() {
return taskMgr, nil
}
if err = taskMgr.WithNewSession(func(se sessionctx.Context) error {
sysKSSessPool, err = se.GetSQLServer().GetKSSessPool(keyspace.System)
return err
}); err != nil {
return nil, err
}
return storage.NewTaskManager(sysKSSessPool), nil
}

//// GetTaskMgrToAccessDXFService returns the task manager to access DXF service.
//func GetTaskMgrToAccessDXFService() (*storage.TaskManager, error) {
// var (
// err error
// sysKSSessPool util.SessionPool
// )
// taskMgr, err := storage.GetTaskManager()
// if err != nil {
// return nil, err
// }
// if !keyspace.IsRunningOnUser() {
// return taskMgr, nil
// }
// if err = taskMgr.WithNewSession(func(se sessionctx.Context) error {
// sysKSSessPool, err = se.GetSQLServer().GetKSSessPool(keyspace.System)
// return err
// }); err != nil {
// return nil, err
// }
// return storage.NewTaskManager(sysKSSessPool), nil
//}

func init() {
// domain will init this var at runtime, we store it here for test, as some
// test might not start domain.
Expand Down
10 changes: 1 addition & 9 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,15 +1065,7 @@ func (do *Domain) InitDistTaskLoop() error {
}
})

distTaskSessPool := do.sysSessionPool
if keyspace.IsRunningOnUser() {
sp, err := do.GetKSSessPool(keyspace.System)
if err != nil {
return err
}
distTaskSessPool = sp
}
taskManager := storage.NewTaskManager(distTaskSessPool)
taskManager := storage.NewTaskManager(do.sysSessionPool)
var serverID string
if intest.InTest {
do.InitInfo4Test()
Expand Down