Skip to content

Commit f0070f5

Browse files
executor: reuse chunk in hash join v2 during restoring (#56936)
close #56828
1 parent 5a25eea commit f0070f5

9 files changed

+143
-48
lines changed

pkg/executor/join/base_join_probe.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
185185
return errors.New("Previous chunk is not probed yet")
186186
}
187187
}
188+
188189
j.currentChunk = chk
189190
logicalRows := chk.NumRows()
190191
// if chk.sel != nil, then physicalRows is different from logicalRows
@@ -312,6 +313,20 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
312313
}
313314

314315
func (j *baseJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
316+
defer func() {
317+
if j.ctx.spillHelper.areAllPartitionsSpilled() {
318+
// We will not call `Probe` function when all partitions are spilled.
319+
// So it's necessary to manually set `currentProbeRow` to avoid check fail.
320+
j.currentProbeRow = j.chunkRows
321+
}
322+
}()
323+
324+
if j.currentChunk != nil {
325+
if j.currentProbeRow < j.chunkRows {
326+
return errors.New("Previous chunk is not probed yet")
327+
}
328+
}
329+
315330
hashValueCol := chk.Column(0)
316331
serializedKeysCol := chk.Column(1)
317332
colNum := chk.NumCols()

pkg/executor/join/hash_join_spill_helper.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type hashJoinSpillHelper struct {
8181
spillTriggedInBuildingStageForTest bool
8282
spillTriggeredBeforeBuildingHashTableForTest bool
8383
allPartitionsSpilledForTest bool
84+
skipProbeInRestoreForTest bool
8485
}
8586

8687
func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, probeFieldTypes []*types.FieldType) *hashJoinSpillHelper {
@@ -373,6 +374,18 @@ func (h *hashJoinSpillHelper) init() {
373374

374375
h.buildRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
375376
h.probeRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
377+
378+
for _, worker := range h.hashJoinExec.BuildWorkers {
379+
if worker.restoredChkBuf == nil {
380+
worker.restoredChkBuf = chunk.NewEmptyChunk(h.buildSpillChkFieldTypes)
381+
}
382+
}
383+
384+
for _, worker := range h.hashJoinExec.ProbeWorkers {
385+
if worker.restoredChkBuf == nil {
386+
worker.restoredChkBuf = chunk.NewEmptyChunk(h.probeSpillFieldTypes)
387+
}
388+
}
376389
}
377390
}
378391

@@ -540,6 +553,10 @@ func (h *hashJoinSpillHelper) initTmpSpillBuildSideChunks() {
540553
}
541554
}
542555

556+
func (h *hashJoinSpillHelper) isProbeSkippedInRestoreForTest() bool {
557+
return h.skipProbeInRestoreForTest
558+
}
559+
543560
func (h *hashJoinSpillHelper) isRespillTriggeredForTest() bool {
544561
return h.spillRoundForTest > 1
545562
}

pkg/executor/join/hash_join_v2.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/pingcap/tidb/pkg/util/chunk"
4343
"github.com/pingcap/tidb/pkg/util/disk"
4444
"github.com/pingcap/tidb/pkg/util/execdetails"
45+
"github.com/pingcap/tidb/pkg/util/intest"
4546
"github.com/pingcap/tidb/pkg/util/memory"
4647
)
4748

@@ -359,6 +360,8 @@ type ProbeWorkerV2 struct {
359360
// We build individual joinProbe for each join worker when use chunk-based
360361
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
361362
JoinProbe ProbeV2
363+
364+
restoredChkBuf *chunk.Chunk
362365
}
363366

364367
func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) {
@@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
392395
}
393396
failpoint.Inject("ConsumeRandomPanic", nil)
394397

395-
// TODO reuse chunk
396-
chk, err := inDisk.GetChunk(i)
398+
err := inDisk.FillChunk(i, w.restoredChkBuf)
397399
if err != nil {
398400
joinResult.err = err
399401
break
@@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
407409

408410
start := time.Now()
409411
waitTime := int64(0)
410-
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult)
412+
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult)
411413
probeTime += int64(time.Since(start)) - waitTime
412414
if !ok {
413415
break
@@ -434,6 +436,7 @@ type BuildWorkerV2 struct {
434436
HasNullableKey bool
435437
WorkerID uint
436438
builder *rowTableBuilder
439+
restoredChkBuf *chunk.Chunk
437440
}
438441

439442
func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment {
@@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) {
449452
setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost)
450453
}
451454

452-
func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error {
455+
func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error {
453456
start := time.Now()
454-
err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
457+
err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
455458
if err != nil {
456459
return err
457460
}
@@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
476479
return nil
477480
}
478481

479-
var chk *chunk.Chunk
480-
481-
// TODO reuse chunk
482-
chk, err = inDisk.GetChunk(i)
482+
err = inDisk.FillChunk(i, b.restoredChkBuf)
483483
if err != nil {
484484
return err
485485
}
@@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
489489
return err
490490
}
491491

492-
err = b.processOneRestoredChunk(chk, cost)
492+
err = b.processOneRestoredChunk(cost)
493493
if err != nil {
494494
return err
495495
}
@@ -914,8 +914,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() {
914914
}
915915
}
916916

917-
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
918-
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk)
917+
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
918+
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf)
919919
if joinResult.err != nil {
920920
return false, 0, joinResult
921921
}
@@ -932,6 +932,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult
932932

933933
func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) {
934934
if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() {
935+
if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore {
936+
w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true
937+
}
935938
return true, 0, joinResult
936939
}
937940

pkg/executor/join/inner_join_spill_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func testInnerJoinSpillCase4(t *testing.T, ctx *mock.Context, expectedResult []c
147147
require.True(t, hashJoinExec.spillHelper.isSpillTriggedInBuildingStageForTest())
148148
require.True(t, hashJoinExec.spillHelper.areAllPartitionsSpilledForTest())
149149
require.True(t, hashJoinExec.spillHelper.isRespillTriggeredForTest())
150+
require.True(t, hashJoinExec.spillHelper.isProbeSkippedInRestoreForTest())
150151
checkResults(t, retTypes, result, expectedResult)
151152
}
152153

@@ -267,14 +268,14 @@ func TestInnerJoinSpillBasic(t *testing.T) {
267268

268269
params := []spillTestParam{
269270
// Normal case
270-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
271-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
271+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
272+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
272273
// rightUsed is empty
273-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
274-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
274+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
275+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
275276
// leftUsed is empty
276-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
277-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
277+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
278+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
278279
}
279280

280281
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
@@ -322,8 +323,8 @@ func TestInnerJoinSpillWithOtherCondition(t *testing.T) {
322323
otherCondition = append(otherCondition, sf)
323324

324325
params := []spillTestParam{
325-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
326-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
326+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
327+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
327328
}
328329

329330
err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)

pkg/executor/join/left_outer_semi_join_probe_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -556,9 +556,9 @@ func TestLeftOuterSemiJoinSpill(t *testing.T) {
556556
joinType := logicalop.LeftOuterSemiJoin
557557
params := []spillTestParam{
558558
// basic case
559-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
559+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 100000, 10000}},
560560
// with other condition
561-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 3500000, 750000, 10000}},
561+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 3500000, 100000, 10000}},
562562
}
563563

564564
for _, param := range params {

pkg/executor/join/outer_join_spill_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,14 @@ func TestOuterJoinSpillBasic(t *testing.T) {
9393

9494
params := []spillTestParam{
9595
// Normal case
96-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
97-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
96+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
97+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
9898
// rightUsed is empty
99-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
100-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
99+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
100+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
101101
// leftUsed is empty
102-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
103-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
102+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
103+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
104104
}
105105

106106
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
@@ -146,8 +146,8 @@ func TestOuterJoinSpillWithSel(t *testing.T) {
146146

147147
params := []spillTestParam{
148148
// Normal case
149-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
150-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
149+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
150+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
151151
}
152152

153153
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
@@ -201,8 +201,8 @@ func TestOuterJoinSpillWithOtherCondition(t *testing.T) {
201201
otherCondition = append(otherCondition, sf)
202202

203203
params := []spillTestParam{
204-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
205-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
204+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
205+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
206206
}
207207

208208
err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)

pkg/executor/join/semi_join_probe_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,11 +403,11 @@ func TestSemiSpill(t *testing.T) {
403403
joinTypes := []logicalop.JoinType{logicalop.SemiJoin}
404404
params := []spillTestParam{
405405
// basic case
406-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3000000, 600000, 10000}},
407-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3500000, 600000, 10000}},
406+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3000000, 100000, 10000}},
407+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3500000, 100000, 10000}},
408408
// with other condition
409-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 600000, 10000}},
410-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 600000, 10000}},
409+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}},
410+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}},
411411
}
412412

413413
for _, joinType := range joinTypes {

pkg/util/chunk/chunk_in_disk.go

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,9 @@ func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 {
128128
return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx]
129129
}
130130

131-
// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
132-
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
131+
func (d *DataInDiskByChunks) readFromFisk(chkIdx int) error {
133132
if err := injectChunkInDiskRandomError(); err != nil {
134-
return nil, err
133+
return err
135134
}
136135

137136
reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx])
@@ -145,19 +144,39 @@ func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
145144

146145
readByteNum, err := io.ReadFull(reader, d.buf)
147146
if err != nil {
148-
return nil, err
147+
return err
149148
}
150149

151150
if int64(readByteNum) != chkSize {
152-
return nil, errors2.New("Fail to restore the spilled chunk")
151+
return errors2.New("Fail to restore the spilled chunk")
152+
}
153+
154+
return nil
155+
}
156+
157+
// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
158+
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
159+
err := d.readFromFisk(chkIdx)
160+
if err != nil {
161+
return nil, err
153162
}
154163

155164
chk := NewEmptyChunk(d.fieldTypes)
156165
d.deserializeDataToChunk(chk)
157-
158166
return chk, nil
159167
}
160168

169+
// FillChunk fills a Chunk from the DataInDiskByChunks by chkIdx.
170+
func (d *DataInDiskByChunks) FillChunk(srcChkIdx int, destChk *Chunk) error {
171+
err := d.readFromFisk(srcChkIdx)
172+
if err != nil {
173+
return err
174+
}
175+
176+
d.deserializeDataToChunk(destChk)
177+
return nil
178+
}
179+
161180
// Close releases the disk resource.
162181
func (d *DataInDiskByChunks) Close() {
163182
if d.dataFile.file != nil {
@@ -264,7 +283,11 @@ func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullM
264283

265284
func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) {
266285
selLen := int64(selSize) / intLen
267-
chk.sel = make([]int, selLen)
286+
if int64(cap(chk.sel)) < selLen {
287+
chk.sel = make([]int, selLen)
288+
} else {
289+
chk.sel = chk.sel[:selLen]
290+
}
268291
for i := range selLen {
269292
chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos]))
270293
*pos += intLen
@@ -299,9 +322,25 @@ func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) {
299322
func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) {
300323
for _, col := range chk.columns {
301324
length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos)
302-
col.nullBitmap = make([]byte, nullMapSize)
303-
col.data = make([]byte, dataSize)
304-
col.offsets = make([]int64, offsetSize/int64Len)
325+
326+
if int64(cap(col.nullBitmap)) < nullMapSize {
327+
col.nullBitmap = make([]byte, nullMapSize)
328+
} else {
329+
col.nullBitmap = col.nullBitmap[:nullMapSize]
330+
}
331+
332+
if int64(cap(col.data)) < dataSize {
333+
col.data = make([]byte, dataSize)
334+
} else {
335+
col.data = col.data[:dataSize]
336+
}
337+
338+
offsetsLen := offsetSize / int64Len
339+
if int64(cap(col.offsets)) < offsetsLen {
340+
col.offsets = make([]int64, offsetsLen)
341+
} else {
342+
col.offsets = col.offsets[:offsetsLen]
343+
}
305344

306345
col.length = int(length)
307346
copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize])

0 commit comments

Comments
 (0)