Skip to content

Commit 579734d

Browse files
joccauzeminzhou
authored andcommitted
import-into: support running with 1 thread when using global sort (pingcap#58572)
close pingcap#58680
1 parent b267fe8 commit 579734d

File tree

14 files changed

+65
-96
lines changed

14 files changed

+65
-96
lines changed

pkg/disttask/importinto/job.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/pingcap/tidb/pkg/domain/infosync"
2929
"github.com/pingcap/tidb/pkg/executor/importer"
3030
"github.com/pingcap/tidb/pkg/kv"
31-
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
3231
"github.com/pingcap/tidb/pkg/metrics"
3332
"github.com/pingcap/tidb/pkg/sessionctx"
3433
"github.com/pingcap/tidb/pkg/util/logutil"
@@ -39,20 +38,20 @@ import (
3938
// SubmitStandaloneTask submits a task to the distribute framework that only runs on the current node.
4039
// when import from server-disk, pass engine checkpoints too, as scheduler might run on another
4140
// node where we can't access the data files.
42-
func SubmitStandaloneTask(ctx context.Context, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint) (int64, *proto.TaskBase, error) {
41+
func SubmitStandaloneTask(ctx context.Context, plan *importer.Plan, stmt string, chunkMap map[int32][]importer.Chunk) (int64, *proto.TaskBase, error) {
4342
serverInfo, err := infosync.GetServerInfo()
4443
if err != nil {
4544
return 0, nil, err
4645
}
47-
return doSubmitTask(ctx, plan, stmt, serverInfo, toChunkMap(ecp))
46+
return doSubmitTask(ctx, plan, stmt, serverInfo, chunkMap)
4847
}
4948

5049
// SubmitTask submits a task to the distribute framework that runs on all managed nodes.
5150
func SubmitTask(ctx context.Context, plan *importer.Plan, stmt string) (int64, *proto.TaskBase, error) {
5251
return doSubmitTask(ctx, plan, stmt, nil, nil)
5352
}
5453

55-
func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instance *infosync.ServerInfo, chunkMap map[int32][]Chunk) (int64, *proto.TaskBase, error) {
54+
func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instance *infosync.ServerInfo, chunkMap map[int32][]importer.Chunk) (int64, *proto.TaskBase, error) {
5655
var instances []*infosync.ServerInfo
5756
if instance != nil {
5857
instances = append(instances, instance)

pkg/disttask/importinto/planner.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type LogicalPlan struct {
5454
Plan importer.Plan
5555
Stmt string
5656
EligibleInstances []*infosync.ServerInfo
57-
ChunkMap map[int32][]Chunk
57+
ChunkMap map[int32][]importer.Chunk
5858
}
5959

6060
// ToTaskMeta converts the logical plan to task meta.
@@ -156,7 +156,7 @@ func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.Physical
156156
type ImportSpec struct {
157157
ID int32
158158
Plan importer.Plan
159-
Chunks []Chunk
159+
Chunks []importer.Chunk
160160
}
161161

162162
// ToSubtaskMeta converts the import spec to subtask meta.
@@ -257,7 +257,7 @@ func buildController(plan *importer.Plan, stmt string) (*importer.LoadDataContro
257257
}
258258

259259
func generateImportSpecs(pCtx planner.PlanCtx, p *LogicalPlan) ([]planner.PipelineSpec, error) {
260-
var chunkMap map[int32][]Chunk
260+
var chunkMap map[int32][]importer.Chunk
261261
if len(p.ChunkMap) > 0 {
262262
chunkMap = p.ChunkMap
263263
} else {
@@ -270,21 +270,21 @@ func generateImportSpecs(pCtx planner.PlanCtx, p *LogicalPlan) ([]planner.Pipeli
270270
}
271271

272272
controller.SetExecuteNodeCnt(pCtx.ExecuteNodesCnt)
273-
engineCheckpoints, err2 := controller.PopulateChunks(pCtx.Ctx)
273+
chunkMap, err2 = controller.PopulateChunks(pCtx.Ctx)
274274
if err2 != nil {
275275
return nil, err2
276276
}
277-
chunkMap = toChunkMap(engineCheckpoints)
278277
}
278+
279279
importSpecs := make([]planner.PipelineSpec, 0, len(chunkMap))
280-
for id := range chunkMap {
280+
for id, chunks := range chunkMap {
281281
if id == common.IndexEngineID {
282282
continue
283283
}
284284
importSpec := &ImportSpec{
285285
ID: id,
286286
Plan: p.Plan,
287-
Chunks: chunkMap[id],
287+
Chunks: chunks,
288288
}
289289
importSpecs = append(importSpecs, importSpec)
290290
}

pkg/disttask/importinto/planner_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestLogicalPlan(t *testing.T) {
4444
Plan: importer.Plan{},
4545
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
4646
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
47-
ChunkMap: map[int32][]Chunk{1: {{Path: "gs://test-load/1.csv"}}},
47+
ChunkMap: map[int32][]importer.Chunk{1: {{Path: "gs://test-load/1.csv"}}},
4848
}
4949
bs, err := logicalPlan.ToTaskMeta()
5050
require.NoError(t, err)
@@ -65,7 +65,7 @@ func TestToPhysicalPlan(t *testing.T) {
6565
},
6666
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
6767
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
68-
ChunkMap: map[int32][]Chunk{chunkID: {{Path: "gs://test-load/1.csv"}}},
68+
ChunkMap: map[int32][]importer.Chunk{chunkID: {{Path: "gs://test-load/1.csv"}}},
6969
}
7070
planCtx := planner.PlanCtx{
7171
NextTaskStep: proto.ImportStepImport,

pkg/disttask/importinto/proto.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/pingcap/tidb/pkg/executor/importer"
2424
"github.com/pingcap/tidb/pkg/lightning/backend"
2525
"github.com/pingcap/tidb/pkg/lightning/backend/external"
26-
"github.com/pingcap/tidb/pkg/lightning/mydump"
2726
"github.com/pingcap/tidb/pkg/lightning/verification"
2827
"github.com/pingcap/tidb/pkg/meta/autoid"
2928
)
@@ -44,7 +43,7 @@ type TaskMeta struct {
4443
// files to the framework scheduler which might run on another instance.
4544
// we use a map from engine ID to chunks since we need support split_file for CSV,
4645
// so need to split them into engines before passing to scheduler.
47-
ChunkMap map[int32][]Chunk
46+
ChunkMap map[int32][]importer.Chunk
4847
}
4948

5049
// ImportStepMeta is the meta of import step.
@@ -53,7 +52,7 @@ type TaskMeta struct {
5352
type ImportStepMeta struct {
5453
// this is the engine ID, not the id in tidb_background_subtask table.
5554
ID int32
56-
Chunks []Chunk
55+
Chunks []importer.Chunk
5756
Checksum map[int64]Checksum // see KVGroupChecksum for definition of map key.
5857
Result Result
5958
// MaxIDs stores the max id that have been used during encoding for each allocator type.
@@ -142,7 +141,7 @@ func (sv *SharedVars) mergeIndexSummary(indexID int64, summary *external.WriterS
142141
// TaskExecutor will split the subtask into minimal tasks(Chunks -> Chunk)
143142
type importStepMinimalTask struct {
144143
Plan importer.Plan
145-
Chunk Chunk
144+
Chunk importer.Chunk
146145
SharedVars *SharedVars
147146
panicked *atomic.Bool
148147
}
@@ -158,19 +157,6 @@ func (t *importStepMinimalTask) String() string {
158157
return fmt.Sprintf("chunk:%s:%d", t.Chunk.Path, t.Chunk.Offset)
159158
}
160159

161-
// Chunk records the chunk information.
162-
type Chunk struct {
163-
Path string
164-
FileSize int64
165-
Offset int64
166-
EndOffset int64
167-
PrevRowIDMax int64
168-
RowIDMax int64
169-
Type mydump.SourceType
170-
Compression mydump.Compression
171-
Timestamp int64
172-
}
173-
174160
// Checksum records the checksum information.
175161
type Checksum struct {
176162
Sum uint64

pkg/disttask/importinto/scheduler.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/pingcap/tidb/pkg/errno"
3636
"github.com/pingcap/tidb/pkg/executor/importer"
3737
"github.com/pingcap/tidb/pkg/kv"
38-
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
3938
"github.com/pingcap/tidb/pkg/lightning/common"
4039
"github.com/pingcap/tidb/pkg/lightning/config"
4140
"github.com/pingcap/tidb/pkg/lightning/metric"
@@ -548,18 +547,6 @@ func updateMeta(task *proto.Task, taskMeta *TaskMeta) error {
548547
return nil
549548
}
550549

551-
// todo: converting back and forth, we should unify struct and remove this function later.
552-
func toChunkMap(engineCheckpoints map[int32]*checkpoints.EngineCheckpoint) map[int32][]Chunk {
553-
chunkMap := make(map[int32][]Chunk, len(engineCheckpoints))
554-
for id, ecp := range engineCheckpoints {
555-
chunkMap[id] = make([]Chunk, 0, len(ecp.Chunks))
556-
for _, chunkCheckpoint := range ecp.Chunks {
557-
chunkMap[id] = append(chunkMap[id], toChunk(*chunkCheckpoint))
558-
}
559-
}
560-
return chunkMap
561-
}
562-
563550
func getStepOfEncode(globalSort bool) proto.Step {
564551
if globalSort {
565552
return proto.ImportStepEncodeAndSort

pkg/disttask/importinto/scheduler_testkit_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestSchedulerExtLocalSort(t *testing.T) {
6969
},
7070
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
7171
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
72-
ChunkMap: map[int32][]importinto.Chunk{1: {{Path: "gs://test-load/1.csv"}}},
72+
ChunkMap: map[int32][]importer.Chunk{1: {{Path: "gs://test-load/1.csv"}}},
7373
}
7474
bs, err := logicalPlan.ToTaskMeta()
7575
require.NoError(t, err)
@@ -209,7 +209,7 @@ func TestSchedulerExtGlobalSort(t *testing.T) {
209209
},
210210
Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`,
211211
EligibleInstances: []*infosync.ServerInfo{{ID: "1"}},
212-
ChunkMap: map[int32][]importinto.Chunk{
212+
ChunkMap: map[int32][]importer.Chunk{
213213
1: {{Path: "gs://test-load/1.csv"}},
214214
2: {{Path: "gs://test-load/2.csv"}},
215215
},

pkg/disttask/importinto/wrapper.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
package importinto
1616

1717
import (
18+
"github.com/pingcap/tidb/pkg/executor/importer"
1819
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
1920
"github.com/pingcap/tidb/pkg/lightning/mydump"
2021
)
2122

22-
func toChunkCheckpoint(chunk Chunk) checkpoints.ChunkCheckpoint {
23+
func toChunkCheckpoint(chunk importer.Chunk) checkpoints.ChunkCheckpoint {
2324
return checkpoints.ChunkCheckpoint{
2425
Key: checkpoints.ChunkCheckpointKey{
2526
Path: chunk.Path,
@@ -41,8 +42,8 @@ func toChunkCheckpoint(chunk Chunk) checkpoints.ChunkCheckpoint {
4142
}
4243
}
4344

44-
func toChunk(chunkCheckpoint checkpoints.ChunkCheckpoint) Chunk {
45-
return Chunk{
45+
func toChunk(chunkCheckpoint checkpoints.ChunkCheckpoint) importer.Chunk {
46+
return importer.Chunk{
4647
Path: chunkCheckpoint.FileMeta.Path,
4748
FileSize: chunkCheckpoint.FileMeta.FileSize,
4849
Offset: chunkCheckpoint.Chunk.Offset,

pkg/executor/import_into.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,11 @@ func (e *ImportIntoExec) submitTask(ctx context.Context) (int64, *proto.TaskBase
214214
logutil.Logger(ctx).Info("get job importer", zap.Stringer("param", e.controller.Parameters),
215215
zap.Bool("dist-task-enabled", vardef.EnableDistTask.Load()))
216216
if importFromServer {
217-
ecp, err2 := e.controller.PopulateChunks(ctx)
217+
chunkMap, err2 := e.controller.PopulateChunks(ctx)
218218
if err2 != nil {
219219
return 0, nil, err2
220220
}
221-
return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, ecp)
221+
return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, chunkMap)
222222
}
223223
// if tidb_enable_dist_task=true, we import distributively, otherwise we import on current node.
224224
if vardef.EnableDistTask.Load() {

pkg/executor/importer/import.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,7 @@ func (e *LoadDataController) InitDataStore(ctx context.Context) error {
10141014
}
10151015
return nil
10161016
}
1017+
10171018
func (*LoadDataController) initExternalStore(ctx context.Context, u *url.URL, target string) (storage.ExternalStorage, error) {
10181019
b, err2 := storage.ParseBackendFromURL(u, nil)
10191020
if err2 != nil {

pkg/executor/importer/importer_testkit_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ func TestPopulateChunks(t *testing.T) {
370370
engines, err := ti.PopulateChunks(ctx)
371371
require.NoError(t, err)
372372
require.Len(t, engines, 3)
373-
require.Len(t, engines[0].Chunks, 2)
374-
require.Len(t, engines[1].Chunks, 1)
373+
require.Len(t, engines[0], 2)
374+
require.Len(t, engines[1], 1)
375+
require.Len(t, engines[common.IndexEngineID], 0)
375376
}

0 commit comments

Comments
 (0)