Skip to content
Merged
3 changes: 3 additions & 0 deletions pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ go_library(
"//pkg/config/kerneltype",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx",
"//pkg/sessionctx/vardef",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/logutil",
"@com_github_docker_go_units//:go-units",
Expand Down
46 changes: 21 additions & 25 deletions pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/atomic"
Expand Down Expand Up @@ -293,33 +296,26 @@ func GetCloudStorageURI(ctx context.Context, store kv.Storage) string {

// GetTaskMgrToAccessDXFService returns the task manager to access DXF service.
func GetTaskMgrToAccessDXFService() (*storage.TaskManager, error) {
// TODO currently DXF service is not fully implemented, so we always return
// task manager of current keyspace, replace it with below code when DXF service is ready.
return storage.GetTaskManager()
var (
err error
sysKSSessPool util.SessionPool
)
taskMgr, err := storage.GetTaskManager()
if err != nil {
return nil, err
}
if !keyspace.IsRunningOnUser() {
return taskMgr, nil
}
if err = taskMgr.WithNewSession(func(se sessionctx.Context) error {
sysKSSessPool, err = se.GetSQLServer().GetKSSessPool(keyspace.System)
return err
}); err != nil {
return nil, err
}
return storage.NewTaskManager(sysKSSessPool), nil
}

//// GetTaskMgrToAccessDXFService returns the task manager to access DXF service.
//func GetTaskMgrToAccessDXFService() (*storage.TaskManager, error) {
// var (
// err error
// sysKSSessPool util.SessionPool
// )
// taskMgr, err := storage.GetTaskManager()
// if err != nil {
// return nil, err
// }
// if !keyspace.IsRunningOnUser() {
// return taskMgr, nil
// }
// if err = taskMgr.WithNewSession(func(se sessionctx.Context) error {
// sysKSSessPool, err = se.GetSQLServer().GetKSSessPool(keyspace.System)
// return err
// }); err != nil {
// return nil, err
// }
// return storage.NewTaskManager(sysKSSessPool), nil
//}

func init() {
// domain will init this var at runtime, we store it here for test, as some
// test might not start domain.
Expand Down
27 changes: 23 additions & 4 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/log"
verify "github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/pingcap/tidb/pkg/sessionctx"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -95,15 +98,15 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
}

// postProcess does the post-processing for the task.
func postProcess(ctx context.Context, store kv.Storage, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
failpoint.InjectCall("syncBeforePostProcess", taskMeta.JobID)
func (p *postProcessStepExecutor) postProcess(ctx context.Context, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
failpoint.InjectCall("syncBeforePostProcess", p.taskMeta.JobID)

callLog := log.BeginTask(logger, "post process")
defer func() {
callLog.End(zap.ErrorLevel, err)
}()

if err = importer.RebaseAllocatorBases(ctx, store, subtaskMeta.MaxIDs, &taskMeta.Plan, logger); err != nil {
if err = importer.RebaseAllocatorBases(ctx, p.store, subtaskMeta.MaxIDs, &p.taskMeta.Plan, logger); err != nil {
return err
}

Expand All @@ -120,11 +123,27 @@ func postProcess(ctx context.Context, store kv.Storage, taskMeta *TaskMeta, subt
}

taskManager, err := storage.GetTaskManager()
if err != nil {
return err
}
if keyspace.IsRunningOnSystem() && config.GetGlobalKeyspaceName() != p.taskKeyspace {
var sp tidbutil.SessionPool
err = taskManager.WithNewSession(func(se sessionctx.Context) error {
svr := se.GetSQLServer()
sp, err = svr.GetKSSessPool(p.taskKeyspace)
return err
})
if err != nil {
return err
}
taskManager = storage.NewTaskManager(sp)
}

ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
if err != nil {
return err
}
return taskManager.WithNewSession(func(se sessionctx.Context) error {
return importer.VerifyChecksum(ctx, &taskMeta.Plan, localChecksum.MergedChecksum(), se, logger)
return importer.VerifyChecksum(ctx, &p.taskMeta.Plan, localChecksum.MergedChecksum(), se, logger)
})
}
56 changes: 43 additions & 13 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
tidbconfig "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
disttaskStorage "github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
"github.com/pingcap/tidb/pkg/keyspace"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
Expand All @@ -44,6 +46,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -72,6 +75,7 @@ type importStepExecutor struct {
wg sync.WaitGroup
}

// TODO(tangenta): come back later
func getTableImporter(
ctx context.Context,
taskID int64,
Expand Down Expand Up @@ -531,22 +535,30 @@ func (e *writeAndIngestStepExecutor) Cleanup(_ context.Context) (err error) {

type postProcessStepExecutor struct {
taskexecutor.BaseStepExecutor
taskID int64
store tidbkv.Storage
taskMeta *TaskMeta
logger *zap.Logger
taskID int64
store tidbkv.Storage
taskMeta *TaskMeta
taskKeyspace string
logger *zap.Logger
}

var _ execute.StepExecutor = &postProcessStepExecutor{}

// NewPostProcessStepExecutor creates a new post process step executor.
// exported for testing.
func NewPostProcessStepExecutor(taskID int64, store tidbkv.Storage, taskMeta *TaskMeta, logger *zap.Logger) execute.StepExecutor {
func NewPostProcessStepExecutor(
taskID int64,
store tidbkv.Storage,
taskMeta *TaskMeta,
taskKeyspace string,
logger *zap.Logger,
) execute.StepExecutor {
return &postProcessStepExecutor{
taskID: taskID,
store: store,
taskMeta: taskMeta,
logger: logger,
taskID: taskID,
store: store,
taskMeta: taskMeta,
taskKeyspace: taskKeyspace,
logger: logger,
}
}

Expand All @@ -563,7 +575,7 @@ func (p *postProcessStepExecutor) RunSubtask(ctx context.Context, subtask *proto
failpoint.Inject("waitBeforePostProcess", func() {
time.Sleep(5 * time.Second)
})
return postProcess(ctx, p.store, p.taskMeta, &stepMeta, logger)
return p.postProcess(ctx, &stepMeta, logger)
}

type importExecutor struct {
Expand All @@ -580,6 +592,7 @@ func NewImportExecutor(
) taskexecutor.TaskExecutor {
metrics := metricsManager.getOrCreateMetrics(task.ID)
subCtx := metric.WithCommonMetric(ctx, metrics)

s := &importExecutor{
BaseTaskExecutor: taskexecutor.NewBaseTaskExecutor(subCtx, task, param),
store: store,
Expand Down Expand Up @@ -608,14 +621,31 @@ func (e *importExecutor) GetStepExecutor(task *proto.Task) (execute.StepExecutor
zap.Int64("task-id", task.ID),
zap.String("step", proto.Step2Str(task.Type, task.Step)),
)
store := e.store
if keyspace.IsRunningOnSystem() && task.Keyspace != tidbconfig.GetGlobalKeyspaceName() {
taskMgr, err := disttaskStorage.GetTaskManager()
if err != nil {
return nil, errors.Trace(err)
}
err = taskMgr.WithNewSession(func(se sessionctx.Context) error {
store, err = se.GetSQLServer().GetKSStore(task.Keyspace)
if err != nil {
return err
}
return err
})
if err != nil {
return nil, err
}
}

switch task.Step {
case proto.ImportStepImport, proto.ImportStepEncodeAndSort:
return &importStepExecutor{
taskID: task.ID,
taskMeta: &taskMeta,
logger: logger,
store: e.store,
store: store,
}, nil
case proto.ImportStepMergeSort:
return &mergeSortStepExecutor{
Expand All @@ -628,10 +658,10 @@ func (e *importExecutor) GetStepExecutor(task *proto.Task) (execute.StepExecutor
taskID: task.ID,
taskMeta: &taskMeta,
logger: logger,
store: e.store,
store: store,
}, nil
case proto.ImportStepPostProcess:
return NewPostProcessStepExecutor(task.ID, e.store, &taskMeta, logger), nil
return NewPostProcessStepExecutor(task.ID, store, &taskMeta, task.Keyspace, logger), nil
default:
return nil, errors.Errorf("unknown step %d for import task %d", task.Step, task.ID)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/disttask/importinto/task_executor_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestPostProcessStepExecutor(t *testing.T) {

bytes, err := json.Marshal(stepMeta)
require.NoError(t, err)
executor := importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
executor := importinto.NewPostProcessStepExecutor(1, store, taskMeta, "", zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.NoError(t, err)

Expand All @@ -79,17 +79,17 @@ func TestPostProcessStepExecutor(t *testing.T) {
stepMeta.Checksum[-1] = tmp
bytes, err = json.Marshal(stepMeta)
require.NoError(t, err)
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, "", zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.ErrorContains(t, err, "checksum mismatched remote vs local")

taskMeta.Plan.Checksum = config.OpLevelOptional
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, "", zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.NoError(t, err)

taskMeta.Plan.Checksum = config.OpLevelOff
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, "", zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.NoError(t, err)
}
4 changes: 2 additions & 2 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ func (e *LoadDataController) CreateColAssignSimpleExprs(ctx expression.BuildCont
return res, allWarnings, nil
}

func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.BackendConfig {
func (e *LoadDataController) getLocalBackendCfg(keyspace, pdAddr, dataDir string) local.BackendConfig {
backendConfig := local.BackendConfig{
PDAddr: pdAddr,
LocalStoreDir: dataDir,
Expand All @@ -1517,7 +1517,7 @@ func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.Ba
TiKVWorkerURL: tidb.GetGlobalConfig().TiKVWorkerURL,
StoreWriteBWLimit: int(e.MaxWriteSpeed),
MaxOpenFiles: int(tidbutil.GenRLimit("table_import")),
KeyspaceName: tidb.GetGlobalKeyspaceName(),
KeyspaceName: keyspace,
PausePDSchedulerScope: config.PausePDSchedulerScopeTable,
DisableAutomaticCompactions: true,
BlockSize: config.DefaultBlockSize,
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,14 @@ func TestGetLocalBackendCfg(t *testing.T) {
c := &LoadDataController{
Plan: &Plan{},
}
cfg := c.getLocalBackendCfg("http://1.1.1.1:1234", "/tmp")
cfg := c.getLocalBackendCfg("", "http://1.1.1.1:1234", "/tmp")
require.Equal(t, "http://1.1.1.1:1234", cfg.PDAddr)
require.Equal(t, "/tmp", cfg.LocalStoreDir)
require.True(t, cfg.DisableAutomaticCompactions)
require.Zero(t, cfg.RaftKV2SwitchModeDuration)

c.Plan.IsRaftKV2 = true
cfg = c.getLocalBackendCfg("http://1.1.1.1:1234", "/tmp")
cfg = c.getLocalBackendCfg("", "http://1.1.1.1:1234", "/tmp")
require.Greater(t, cfg.RaftKV2SwitchModeDuration, time.Duration(0))
require.Equal(t, config.DefaultSwitchTiKVModeInterval, cfg.RaftKV2SwitchModeDuration)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewTableImporter(
return nil, err
}

backendConfig := e.getLocalBackendCfg(tidbCfg.Path, dir)
backendConfig := e.getLocalBackendCfg(kvStore.GetKeyspace(), tidbCfg.Path, dir)
d := kvStore.(tidbkv.StorageWithPD).GetPDClient().GetServiceDiscovery()
localBackend, err := local.NewBackend(ctx, tls, backendConfig, d)
if err != nil {
Expand Down Expand Up @@ -261,7 +261,7 @@ func NewTableImporterForTest(ctx context.Context, e *LoadDataController, id stri
return nil, err
}

backendConfig := e.getLocalBackendCfg(tidbCfg.Path, dir)
backendConfig := e.getLocalBackendCfg("", tidbCfg.Path, dir)
localBackend, err := local.NewBackendForTest(ctx, backendConfig, helper)
if err != nil {
return nil, err
Expand Down