Skip to content
Merged
5 changes: 3 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ func (s *BaseScheduler) scheduleSubTask(
adjustedEligibleNodes := s.slotMgr.adjustEligibleNodes(eligibleNodes, task.Concurrency)
var size uint64
subTasks := make([]*proto.Subtask, 0, len(metas))
randPos := rand.Intn(len(adjustedEligibleNodes))
for i, meta := range metas {
// we assign the subtask to the instance in a round-robin way.
pos := i % len(adjustedEligibleNodes)
// we assign the subtask to the instance in a round-robin way at the random start position.
pos := (i + randPos) % len(adjustedEligibleNodes)
instanceID := adjustedEligibleNodes[pos]
s.logger.Debug("create subtasks", zap.String("instanceID", instanceID))
subTasks = append(subTasks, proto.NewSubtask(
Expand Down
6 changes: 3 additions & 3 deletions pkg/disttask/importinto/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@ import (
// SubmitStandaloneTask submits a task to the distribute framework that only runs on the current node.
// when import from server-disk, pass engine checkpoints too, as scheduler might run on another
// node where we can't access the data files.
func SubmitStandaloneTask(ctx context.Context, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint) (int64, *proto.TaskBase, error) {
func SubmitStandaloneTask(ctx context.Context, plan *importer.Plan, stmt string, chunkMap map[int32][]checkpoints.Chunk) (int64, *proto.TaskBase, error) {
serverInfo, err := infosync.GetServerInfo()
if err != nil {
return 0, nil, err
}
return doSubmitTask(ctx, plan, stmt, serverInfo, toChunkMap(ecp))
return doSubmitTask(ctx, plan, stmt, serverInfo, chunkMap)
}

// SubmitTask submits a task to the distribute framework that runs on all managed nodes.
func SubmitTask(ctx context.Context, plan *importer.Plan, stmt string) (int64, *proto.TaskBase, error) {
return doSubmitTask(ctx, plan, stmt, nil, nil)
}

func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instance *infosync.ServerInfo, chunkMap map[int32][]Chunk) (int64, *proto.TaskBase, error) {
func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instance *infosync.ServerInfo, chunkMap map[int32][]checkpoints.Chunk) (int64, *proto.TaskBase, error) {
var instances []*infosync.ServerInfo
if instance != nil {
instances = append(instances, instance)
Expand Down
15 changes: 8 additions & 7 deletions pkg/disttask/importinto/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
verify "github.com/pingcap/tidb/pkg/lightning/verification"
Expand All @@ -54,7 +55,7 @@ type LogicalPlan struct {
Plan importer.Plan
Stmt string
EligibleInstances []*infosync.ServerInfo
ChunkMap map[int32][]Chunk
ChunkMap map[int32][]checkpoints.Chunk
}

// ToTaskMeta converts the logical plan to task meta.
Expand Down Expand Up @@ -156,7 +157,7 @@ func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.Physical
type ImportSpec struct {
ID int32
Plan importer.Plan
Chunks []Chunk
Chunks []checkpoints.Chunk
}

// ToSubtaskMeta converts the import spec to subtask meta.
Expand Down Expand Up @@ -257,7 +258,7 @@ func buildController(plan *importer.Plan, stmt string) (*importer.LoadDataContro
}

func generateImportSpecs(pCtx planner.PlanCtx, p *LogicalPlan) ([]planner.PipelineSpec, error) {
var chunkMap map[int32][]Chunk
var chunkMap map[int32][]checkpoints.Chunk
if len(p.ChunkMap) > 0 {
chunkMap = p.ChunkMap
} else {
Expand All @@ -270,21 +271,21 @@ func generateImportSpecs(pCtx planner.PlanCtx, p *LogicalPlan) ([]planner.Pipeli
}

controller.SetExecuteNodeCnt(pCtx.ExecuteNodesCnt)
engineCheckpoints, err2 := controller.PopulateChunks(pCtx.Ctx)
chunkMap, err2 = controller.PopulateChunks(pCtx.Ctx)
if err2 != nil {
return nil, err2
}
chunkMap = toChunkMap(engineCheckpoints)
}

importSpecs := make([]planner.PipelineSpec, 0, len(chunkMap))
for id := range chunkMap {
for id, chunks := range chunkMap {
if id == common.IndexEngineID {
continue
}
importSpec := &ImportSpec{
ID: id,
Plan: p.Plan,
Chunks: chunkMap[id],
Chunks: chunks,
}
importSpecs = append(importSpecs, importSpec)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/disttask/importinto/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand All @@ -44,7 +45,7 @@ func TestLogicalPlan(t *testing.T) {
Plan: importer.Plan{},
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
ChunkMap: map[int32][]Chunk{1: {{Path: "gs://test-load/1.csv"}}},
ChunkMap: map[int32][]checkpoints.Chunk{1: {{Path: "gs://test-load/1.csv"}}},
}
bs, err := logicalPlan.ToTaskMeta()
require.NoError(t, err)
Expand All @@ -65,7 +66,7 @@ func TestToPhysicalPlan(t *testing.T) {
},
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
ChunkMap: map[int32][]Chunk{chunkID: {{Path: "gs://test-load/1.csv"}}},
ChunkMap: map[int32][]checkpoints.Chunk{chunkID: {{Path: "gs://test-load/1.csv"}}},
}
planCtx := planner.PlanCtx{
NextTaskStep: proto.ImportStepImport,
Expand Down
21 changes: 4 additions & 17 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/pingcap/tidb/pkg/meta/autoid"
)
Expand All @@ -44,7 +44,7 @@ type TaskMeta struct {
// files to the framework scheduler which might run on another instance.
// we use a map from engine ID to chunks since we need support split_file for CSV,
// so need to split them into engines before passing to scheduler.
ChunkMap map[int32][]Chunk
ChunkMap map[int32][]checkpoints.Chunk
}

// ImportStepMeta is the meta of import step.
Expand All @@ -53,7 +53,7 @@ type TaskMeta struct {
type ImportStepMeta struct {
// this is the engine ID, not the id in tidb_background_subtask table.
ID int32
Chunks []Chunk
Chunks []checkpoints.Chunk
Checksum map[int64]Checksum // see KVGroupChecksum for definition of map key.
Result Result
// MaxIDs stores the max id that have been used during encoding for each allocator type.
Expand Down Expand Up @@ -142,7 +142,7 @@ func (sv *SharedVars) mergeIndexSummary(indexID int64, summary *external.WriterS
// TaskExecutor will split the subtask into minimal tasks(Chunks -> Chunk)
type importStepMinimalTask struct {
Plan importer.Plan
Chunk Chunk
Chunk checkpoints.Chunk
SharedVars *SharedVars
panicked *atomic.Bool
}
Expand All @@ -158,19 +158,6 @@ func (t *importStepMinimalTask) String() string {
return fmt.Sprintf("chunk:%s:%d", t.Chunk.Path, t.Chunk.Offset)
}

// Chunk records the chunk information.
type Chunk struct {
Path string
FileSize int64
Offset int64
EndOffset int64
PrevRowIDMax int64
RowIDMax int64
Type mydump.SourceType
Compression mydump.Compression
Timestamp int64
}

// Checksum records the checksum information.
type Checksum struct {
Sum uint64
Expand Down
13 changes: 0 additions & 13 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/metric"
Expand Down Expand Up @@ -542,18 +541,6 @@ func updateMeta(task *proto.Task, taskMeta *TaskMeta) error {
return nil
}

// todo: converting back and forth, we should unify struct and remove this function later.
func toChunkMap(engineCheckpoints map[int32]*checkpoints.EngineCheckpoint) map[int32][]Chunk {
chunkMap := make(map[int32][]Chunk, len(engineCheckpoints))
for id, ecp := range engineCheckpoints {
chunkMap[id] = make([]Chunk, 0, len(ecp.Chunks))
for _, chunkCheckpoint := range ecp.Chunks {
chunkMap[id] = append(chunkMap[id], toChunk(*chunkCheckpoint))
}
}
return chunkMap
}

func getStepOfEncode(globalSort bool) proto.Step {
if globalSort {
return proto.ImportStepEncodeAndSort
Expand Down
5 changes: 3 additions & 2 deletions pkg/disttask/importinto/scheduler_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestSchedulerExtLocalSort(t *testing.T) {
},
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
ChunkMap: map[int32][]importinto.Chunk{1: {{Path: "gs://test-load/1.csv"}}},
ChunkMap: map[int32][]checkpoints.Chunk{1: {{Path: "gs://test-load/1.csv"}}},
}
bs, err := logicalPlan.ToTaskMeta()
require.NoError(t, err)
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestSchedulerExtGlobalSort(t *testing.T) {
},
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
ChunkMap: map[int32][]importinto.Chunk{
ChunkMap: map[int32][]checkpoints.Chunk{
1: {{Path: "gs://test-load/1.csv"}},
2: {{Path: "gs://test-load/2.csv"}},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/disttask/importinto/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/mydump"
)

func toChunkCheckpoint(chunk Chunk) checkpoints.ChunkCheckpoint {
func toChunkCheckpoint(chunk checkpoints.Chunk) checkpoints.ChunkCheckpoint {
return checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{
Path: chunk.Path,
Expand All @@ -41,8 +41,8 @@ func toChunkCheckpoint(chunk Chunk) checkpoints.ChunkCheckpoint {
}
}

func toChunk(chunkCheckpoint checkpoints.ChunkCheckpoint) Chunk {
return Chunk{
func toChunk(chunkCheckpoint checkpoints.ChunkCheckpoint) checkpoints.Chunk {
return checkpoints.Chunk{
Path: chunkCheckpoint.FileMeta.Path,
FileSize: chunkCheckpoint.FileMeta.FileSize,
Offset: chunkCheckpoint.Chunk.Offset,
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ func (e *ImportIntoExec) submitTask(ctx context.Context) (int64, *proto.TaskBase
logutil.Logger(ctx).Info("get job importer", zap.Stringer("param", e.controller.Parameters),
zap.Bool("dist-task-enabled", variable.EnableDistTask.Load()))
if importFromServer {
ecp, err2 := e.controller.PopulateChunks(ctx)
chunkMap, err2 := e.controller.PopulateChunks(ctx)
if err2 != nil {
return 0, nil, err2
}
return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, ecp)
return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, chunkMap)
}
// if tidb_enable_dist_task=true, we import distributively, otherwise we import on current node.
if variable.EnableDistTask.Load() {
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ func (e *LoadDataController) InitDataStore(ctx context.Context) error {
}
return nil
}

func (*LoadDataController) initExternalStore(ctx context.Context, u *url.URL, target string) (storage.ExternalStorage, error) {
b, err2 := storage.ParseBackendFromURL(u, nil)
if err2 != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func TestPopulateChunks(t *testing.T) {
engines, err := ti.PopulateChunks(ctx)
require.NoError(t, err)
require.Len(t, engines, 3)
require.Len(t, engines[0].Chunks, 2)
require.Len(t, engines[1].Chunks, 1)
require.Len(t, engines[0], 2)
require.Len(t, engines[1], 1)
require.Len(t, engines[common.IndexEngineID], 0)
}
7 changes: 1 addition & 6 deletions pkg/executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var GetEtcdClient = getEtcdClient
// - when import from file
// 1. there is no active job on the target table
// 2. the total file size > 0
// 3. if global sort, thread count >= 16 and have required privileges
// 3. if global sort, check required privileges
// - target table should be empty
// - no CDC or PiTR tasks running
//
Expand All @@ -63,11 +63,6 @@ func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec
if err := e.checkTotalFileSize(); err != nil {
return err
}
// run global sort with < 8 thread might OOM on ingest step
// TODO: remove this limit after control memory usage.
if e.IsGlobalSort() && e.ThreadCnt < 8 {
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("global sort requires at least 8 threads")
}
}
if err := e.checkTableEmpty(ctx, conn); err != nil {
return err
Expand Down
13 changes: 3 additions & 10 deletions pkg/executor/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,9 @@ func TestCheckRequirements(t *testing.T) {

// make checkTotalFileSize pass
c.TotalFileSize = 1
// global sort with thread count < 8
c.ThreadCnt = 7
c.CloudStorageURI = "s3://test"
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "global sort requires at least 8 threads")

// reset fields, make global sort thread check pass
c.ThreadCnt = 1
c.CloudStorageURI = ""

// non-empty table
_, err = conn.Execute(ctx, "insert into test.t values(1)")
require.NoError(t, err)
Expand Down Expand Up @@ -178,8 +171,8 @@ func TestCheckRequirements(t *testing.T) {
require.NoError(t, err)
require.NoError(t, c.CheckRequirements(ctx, conn))

// with global sort
c.Plan.ThreadCnt = 8
// with global sort with threadCnt < 8
c.Plan.ThreadCnt = 2
c.Plan.CloudStorageURI = ":"
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI)
c.Plan.CloudStorageURI = "sdsdsdsd://sdsdsdsd"
Expand Down
Loading