diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index e375c6fb06f4b..db76e85e757bd 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -20,13 +20,18 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + disttaskStorage "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" + "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" "go.uber.org/zap" ) @@ -145,9 +150,33 @@ func (s *backfillDistExecutor) newBackfillStepExecutor( jobMeta := &s.taskMeta.Job ddlObj := s.d + store := ddlObj.store + sessPool := ddlObj.sessPool + taskKS := s.task.Keyspace + // Although taskKS != config.GetGlobalKeyspaceName() implies running on the system keyspace, + // we still check kernel type explicitly to avoid unexpected executions. + if keyspace.IsRunningOnSystem() && taskKS != config.GetGlobalKeyspaceName() { + taskMgr, err := disttaskStorage.GetTaskManager() + if err != nil { + return nil, errors.Trace(err) + } + err = taskMgr.WithNewSession(func(se sessionctx.Context) error { + svr := se.GetSQLServer() + store, err = svr.GetKSStore(taskKS) + if err != nil { + return err + } + sp, err := svr.GetKSSessPool(taskKS) + sessPool = sess.NewSessionPool(sp) + return err + }) + if err != nil { + return nil, err + } + } // TODO getTableByTxn is using DDL ctx which is never cancelled except when shutdown. // we should move this operation out of GetStepExecutor, and put into Init. - _, tblIface, err := ddlObj.getTableByTxn(ddlObj.ddlCtx.getAutoIDRequirement(), jobMeta.SchemaID, jobMeta.TableID) + _, tblIface, err := getTableByTxn(ddlObj.ctx, store, jobMeta.SchemaID, jobMeta.TableID) if err != nil { return nil, err } @@ -172,14 +201,14 @@ func (s *backfillDistExecutor) newBackfillStepExecutor( jc := ddlObj.jobContext(jobMeta.ID, jobMeta.ReorgMeta) ddlObj.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) ddlObj.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) - return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, cloudStorageURI, estRowSize) + return newReadIndexExecutor(store, sessPool, ddlObj.etcdCli, jobMeta, indexInfos, tbl, jc, cloudStorageURI, estRowSize) case proto.BackfillStepMergeSort: return newMergeSortExecutor(jobMeta.ID, indexInfos, tbl, cloudStorageURI) case proto.BackfillStepWriteAndIngest: if len(cloudStorageURI) == 0 { return nil, errors.Errorf("local import does not have write & ingest step") } - return newCloudImportExecutor(jobMeta, ddlObj.store, indexInfos, tbl, cloudStorageURI, s.GetTaskBase().Concurrency) + return newCloudImportExecutor(jobMeta, store, indexInfos, tbl, cloudStorageURI, s.GetTaskBase().Concurrency) default: // should not happen, caller has checked the stage return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID) @@ -193,7 +222,11 @@ type backfillDistExecutor struct { taskMeta *BackfillTaskMeta } -func newBackfillDistExecutor(ctx context.Context, task *proto.Task, param taskexecutor.Param, d *ddl) taskexecutor.TaskExecutor { +func newBackfillDistExecutor( + ctx context.Context, + task *proto.Task, + param taskexecutor.Param, + d *ddl) taskexecutor.TaskExecutor { s := &backfillDistExecutor{ BaseTaskExecutor: taskexecutor.NewBaseTaskExecutor(ctx, task, param), d: d, diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 3b848d2586be4..474f25a7b2d90 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" @@ -41,16 +42,19 @@ import ( "github.com/pingcap/tidb/pkg/table" tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" "github.com/prometheus/client_golang/prometheus" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) type readIndexStepExecutor struct { taskexecutor.BaseStepExecutor - d *ddl - job *model.Job - indexes []*model.IndexInfo - ptbl table.PhysicalTable - jc *ReorgContext + store kv.Storage + etcdCli *clientv3.Client + sessPool *sess.Pool + job *model.Job + indexes []*model.IndexInfo + ptbl table.PhysicalTable + jc *ReorgContext avgRowSize int cloudStorageURI string @@ -72,7 +76,9 @@ type readIndexSummary struct { } func newReadIndexExecutor( - d *ddl, + store kv.Storage, + sessPool *sess.Pool, + etcdCli *clientv3.Client, job *model.Job, indexes []*model.IndexInfo, ptbl table.PhysicalTable, @@ -81,7 +87,9 @@ func newReadIndexExecutor( avgRowSize int, ) (*readIndexStepExecutor, error) { return &readIndexStepExecutor{ - d: d, + store: store, + etcdCli: etcdCli, + sessPool: sessPool, job: job, indexes: indexes, ptbl: ptbl, @@ -100,7 +108,7 @@ func (r *readIndexStepExecutor) Init(ctx context.Context) error { r.metric = metrics.RegisterLightningCommonMetricsForDDL(r.job.ID) ctx = lightningmetric.WithCommonMetric(ctx, r.metric) } - cfg, bd, err := ingest.CreateLocalBackend(ctx, r.d.store, r.job, false, 0) + cfg, bd, err := ingest.CreateLocalBackend(ctx, r.store, r.job, false, 0) if err != nil { return errors.Trace(err) } @@ -141,8 +149,8 @@ func (r *readIndexStepExecutor) runLocalPipeline( concurrency int, ) error { // TODO(tangenta): support checkpoint manager that interact with subtask table. - bCtx, err := ingest.NewBackendCtxBuilder(ctx, r.d.store, r.job). - WithImportDistributedLock(r.d.etcdCli, sm.TS). + bCtx, err := ingest.NewBackendCtxBuilder(ctx, r.store, r.job). + WithImportDistributedLock(r.etcdCli, sm.TS). Build(r.backendCfg, r.backend) if err != nil { return err @@ -306,11 +314,11 @@ func (r *readIndexStepExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) ( tbl = parTbl.GetPartition(pid) if len(sm.RowStart) == 0 { // Handle upgrade compatibility - currentVer, err1 := getValidCurrentVersion(r.d.store) + currentVer, err1 := getValidCurrentVersion(r.store) if err1 != nil { return nil, nil, nil, errors.Trace(err1) } - start, end, err = getTableRange(r.jc, r.d.store, parTbl.GetPartition(pid), currentVer.Ver, r.job.Priority) + start, end, err = getTableRange(r.jc, r.store, parTbl.GetPartition(pid), currentVer.Ver, r.job.Priority) if err != nil { logutil.DDLLogger().Error("get table range error", zap.Error(err)) @@ -334,7 +342,6 @@ func (r *readIndexStepExecutor) buildLocalStorePipeline( if err != nil { return nil, err } - d := r.d indexIDs := make([]int64, 0, len(r.indexes)) uniques := make([]bool, 0, len(r.indexes)) var idxNames strings.Builder @@ -357,8 +364,8 @@ func (r *readIndexStepExecutor) buildLocalStorePipeline( rowCntCollector := newDistTaskRowCntCollector(r.SubtaskSummary, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String()) return NewAddIndexIngestPipeline( opCtx, - d.store, - d.sessPool, + r.store, + r.sessPool, backendCtx, engines, r.job.ID, @@ -385,7 +392,6 @@ func (r *readIndexStepExecutor) buildExternalStorePipeline( return nil, err } - d := r.d onClose := func(summary *external.WriterSummary) { sum, _ := r.subtaskSummary.Load(subtaskID) s := sum.(*readIndexSummary) @@ -408,9 +414,9 @@ func (r *readIndexStepExecutor) buildExternalStorePipeline( rowCntCollector := newDistTaskRowCntCollector(r.SubtaskSummary, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String()) return NewWriteIndexToExternalStoragePipeline( opCtx, - d.store, + r.store, r.cloudStorageURI, - r.d.sessPool, + r.sessPool, taskID, subtaskID, tbl, diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 34853793aab5a..430c0656ac89c 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2648,7 +2648,7 @@ func (w *worker) executeDistTask(jobCtx *jobContext, t table.Table, reorgInfo *r // When pausing the related ddl job, it is possible that the task with taskKey is succeed and in tidb_global_task_history. // As a result, when resuming the related ddl job, // it is necessary to check task exits in tidb_global_task and tidb_global_task_history tables. - taskManager, err := storage.GetTaskManager() + taskManager, err := handle.GetTaskMgrToAccessDXFService() if err != nil { return err } @@ -2991,7 +2991,7 @@ func estimateRowSizeFromRegion(ctx context.Context, store kv.Storage, tbl table. } func (w *worker) updateDistTaskRowCount(taskKey string, jobID int64) { - taskMgr, err := storage.GetTaskManager() + taskMgr, err := handle.GetTaskMgrToAccessDXFService() if err != nil { logutil.DDLLogger().Warn("cannot get task manager", zap.String("task_key", taskKey), zap.Error(err)) return diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 8ed824a9dbbfa..481262ff1021e 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -160,7 +160,7 @@ func CreateLocalBackend(ctx context.Context, store kv.Storage, job *model.Job, c if err != nil { return nil, nil, err } - cfg := genConfig(ctx, jobSortPath, LitMemRoot, hasUnique, resGroupName, concurrency, maxWriteSpeed, job.ReorgMeta.UseCloudStorage) + cfg := genConfig(ctx, jobSortPath, LitMemRoot, hasUnique, resGroupName, store.GetKeyspace(), concurrency, maxWriteSpeed, job.ReorgMeta.UseCloudStorage) if adjustedWorkerConcurrency > 0 { cfg.WorkerConcurrency = adjustedWorkerConcurrency } diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index cd2119de97178..d8b4ead1e4504 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -43,6 +43,7 @@ func genConfig( memRoot MemRoot, unique bool, resourceGroup string, + keyspace string, concurrency int, maxWriteSpeed int, globalSort bool, @@ -52,7 +53,7 @@ func genConfig( ResourceGroupName: resourceGroup, MaxConnPerStore: concurrency, WorkerConcurrency: concurrency * 2, - KeyspaceName: tidb.GetGlobalKeyspaceName(), + KeyspaceName: keyspace, // We disable the switch TiKV mode feature for now, because the impact is not // fully tested. ShouldCheckWriteStall: true, diff --git a/pkg/ddl/job_scheduler.go b/pkg/ddl/job_scheduler.go index 154feb147cb46..8d404ac6e96d6 100644 --- a/pkg/ddl/job_scheduler.go +++ b/pkg/ddl/job_scheduler.go @@ -655,10 +655,10 @@ func (s *jobScheduler) cleanMDLInfo(job *model.Job, ownerID string) { } } -func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*model.DBInfo, table.Table, error) { +func getTableByTxn(ctx context.Context, store kv.Storage, schemaID, tableID int64) (*model.DBInfo, table.Table, error) { var tbl table.Table var dbInfo *model.DBInfo - err := kv.RunInNewTxn(d.ctx, r.Store(), false, func(_ context.Context, txn kv.Transaction) error { + err := kv.RunInNewTxn(ctx, store, false, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMutator(txn) var err1 error dbInfo, err1 = t.GetDatabase(schemaID) @@ -669,7 +669,10 @@ func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*mod if err1 != nil { return errors.Trace(err1) } - tbl, err1 = getTable(r, schemaID, tblInfo) + // This tableInfo should never interact with the autoid allocator, + // so we can use the autoid.Allocators{} here. + // TODO(tangenta): Use model.TableInfo instead of tables.Table. + tbl, err1 = table.TableFromMeta(autoid.Allocators{}, tblInfo) return errors.Trace(err1) }) return dbInfo, tbl, err diff --git a/pkg/ddl/session/BUILD.bazel b/pkg/ddl/session/BUILD.bazel index 70365492a8c10..1b5719dec9a35 100644 --- a/pkg/ddl/session/BUILD.bazel +++ b/pkg/ddl/session/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/parser/terror", "//pkg/sessionctx", "//pkg/sessiontxn", + "//pkg/util", "//pkg/util/chunk", "//pkg/util/intest", "//pkg/util/sqlexec", diff --git a/pkg/ddl/session/session_pool.go b/pkg/ddl/session/session_pool.go index c09d787aa5a75..a6a117af36a6a 100644 --- a/pkg/ddl/session/session_pool.go +++ b/pkg/ddl/session/session_pool.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" ) @@ -34,11 +35,11 @@ type Pool struct { sync.Mutex closed bool } - resPool *pools.ResourcePool + resPool util.SessionPool } // NewSessionPool creates a new Session pool. -func NewSessionPool(resPool *pools.ResourcePool) *Pool { +func NewSessionPool(resPool util.SessionPool) *Pool { intest.AssertNotNil(resPool) return &Pool{resPool: resPool} } diff --git a/pkg/disttask/framework/handle/handle.go b/pkg/disttask/framework/handle/handle.go index 4e25fd4d3d0b4..29f9b73812569 100644 --- a/pkg/disttask/framework/handle/handle.go +++ b/pkg/disttask/framework/handle/handle.go @@ -59,7 +59,7 @@ func NotifyTaskChange() { // GetCPUCountOfNode gets the CPU count of the managed node. func GetCPUCountOfNode(ctx context.Context) (int, error) { - manager, err := storage.GetTaskManager() + manager, err := GetTaskMgrToAccessDXFService() if err != nil { return 0, err } @@ -68,7 +68,7 @@ func GetCPUCountOfNode(ctx context.Context) (int, error) { // SubmitTask submits a task. func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, targetScope string, maxNodeCnt int, taskMeta []byte) (*proto.Task, error) { - taskManager, err := storage.GetTaskManager() + taskManager, err := GetTaskMgrToAccessDXFService() if err != nil { return nil, err } @@ -174,7 +174,7 @@ func WaitTask(ctx context.Context, id int64, matchFn func(base *proto.TaskBase) // CancelTask cancels a task. func CancelTask(ctx context.Context, taskKey string) error { - taskManager, err := storage.GetTaskManager() + taskManager, err := GetTaskMgrToAccessDXFService() if err != nil { return err } @@ -191,7 +191,7 @@ func CancelTask(ctx context.Context, taskKey string) error { // PauseTask pauses a task. func PauseTask(ctx context.Context, taskKey string) error { - taskManager, err := storage.GetTaskManager() + taskManager, err := GetTaskMgrToAccessDXFService() if err != nil { return err } @@ -205,7 +205,7 @@ func PauseTask(ctx context.Context, taskKey string) error { // ResumeTask resumes a task. func ResumeTask(ctx context.Context, taskKey string) error { - taskManager, err := storage.GetTaskManager() + taskManager, err := GetTaskMgrToAccessDXFService() if err != nil { return err } @@ -281,8 +281,9 @@ func GetCloudStorageURI(ctx context.Context, store kv.Storage) string { u.Path = path.Join(u.Path, strconv.FormatUint(s.GetPDClient().GetClusterID(ctx), 10)) return u.String() } + } else { + logutil.BgLogger().Warn("Can't get cluster id from store, use default cloud storage uri") } - logutil.BgLogger().Error("Can't get cluster id from store, use default cloud storage uri") return cloudURI } diff --git a/pkg/domain/sqlsvrapi/BUILD.bazel b/pkg/domain/sqlsvrapi/BUILD.bazel index c4db2c4c8e88e..b38bb4a992dce 100644 --- a/pkg/domain/sqlsvrapi/BUILD.bazel +++ b/pkg/domain/sqlsvrapi/BUILD.bazel @@ -5,5 +5,8 @@ go_library( srcs = ["server.go"], importpath = "github.com/pingcap/tidb/pkg/domain/sqlsvrapi", visibility = ["//visibility:public"], - deps = ["//pkg/util"], + deps = [ + "//pkg/kv", + "//pkg/util", + ], ) diff --git a/pkg/domain/sqlsvrapi/server.go b/pkg/domain/sqlsvrapi/server.go index c80b92f54efad..3a2f9f205cb6f 100644 --- a/pkg/domain/sqlsvrapi/server.go +++ b/pkg/domain/sqlsvrapi/server.go @@ -14,10 +14,14 @@ package sqlsvrapi -import "github.com/pingcap/tidb/pkg/util" +import ( + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/util" +) // Server defines the interface for a SQL server. // The SQL server manages nearly everything related to SQL execution. type Server interface { GetKSSessPool(targetKS string) (util.DestroyableSessionPool, error) + GetKSStore(targetKS string) (store kv.Storage, err error) } diff --git a/pkg/keyspace/BUILD.bazel b/pkg/keyspace/BUILD.bazel index 2299cdd512892..3f80ea65d17bf 100644 --- a/pkg/keyspace/BUILD.bazel +++ b/pkg/keyspace/BUILD.bazel @@ -24,7 +24,7 @@ go_test( srcs = ["keyspace_test.go"], embed = [":keyspace"], flaky = True, - shard_count = 3, + shard_count = 4, deps = [ "//pkg/config", "//pkg/config/kerneltype", diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index e893044bda097..1675c581aea2e 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -96,3 +96,9 @@ func WrapZapcoreWithKeyspace() zap.Option { func IsRunningOnUser() bool { return kerneltype.IsNextGen() && config.GetGlobalKeyspaceName() != System } + +// IsRunningOnSystem return true if we are on nextgen, and keyspace of current +// instance is the system keyspace. +func IsRunningOnSystem() bool { + return kerneltype.IsNextGen() && config.GetGlobalKeyspaceName() == System +} diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index c6ebb14537835..52054318c2085 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -111,3 +111,23 @@ func TestIsRunningOnUser(t *testing.T) { require.False(t, IsRunningOnUser()) } } + +func TestIsRunningOnSystem(t *testing.T) { + if kerneltype.IsClassic() { + require.False(t, IsRunningOnSystem()) + } else { + bak := *config.GetGlobalConfig() + t.Cleanup(func() { + config.StoreGlobalConfig(&bak) + }) + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceName = System + }) + require.True(t, IsRunningOnSystem()) + + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceName = "aa" + }) + require.False(t, IsRunningOnSystem()) + } +}