diff --git a/pkg/executor/join/base_join_probe.go b/pkg/executor/join/base_join_probe.go index 431ca68adaeb8..01bf4119edb02 100644 --- a/pkg/executor/join/base_join_probe.go +++ b/pkg/executor/join/base_join_probe.go @@ -185,6 +185,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) { return errors.New("Previous chunk is not probed yet") } } + j.currentChunk = chk logicalRows := chk.NumRows() // if chk.sel != nil, then physicalRows is different from logicalRows @@ -312,6 +313,20 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) { } func (j *baseJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error { + defer func() { + if j.ctx.spillHelper.areAllPartitionsSpilled() { + // We will not call `Probe` function when all partitions are spilled. + // So it's necessary to manually set `currentProbeRow` to avoid check fail. + j.currentProbeRow = j.chunkRows + } + }() + + if j.currentChunk != nil { + if j.currentProbeRow < j.chunkRows { + return errors.New("Previous chunk is not probed yet") + } + } + hashValueCol := chk.Column(0) serializedKeysCol := chk.Column(1) colNum := chk.NumCols() diff --git a/pkg/executor/join/hash_join_spill_helper.go b/pkg/executor/join/hash_join_spill_helper.go index 68bd84d54c8d6..e2612f0d4e11f 100644 --- a/pkg/executor/join/hash_join_spill_helper.go +++ b/pkg/executor/join/hash_join_spill_helper.go @@ -81,6 +81,7 @@ type hashJoinSpillHelper struct { spillTriggedInBuildingStageForTest bool spillTriggeredBeforeBuildingHashTableForTest bool allPartitionsSpilledForTest bool + skipProbeInRestoreForTest bool } func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, probeFieldTypes []*types.FieldType) *hashJoinSpillHelper { @@ -373,6 +374,18 @@ func (h *hashJoinSpillHelper) init() { h.buildRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency) h.probeRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency) + + for _, worker := range h.hashJoinExec.BuildWorkers { + if worker.restoredChkBuf == nil { + worker.restoredChkBuf = chunk.NewEmptyChunk(h.buildSpillChkFieldTypes) + } + } + + for _, worker := range h.hashJoinExec.ProbeWorkers { + if worker.restoredChkBuf == nil { + worker.restoredChkBuf = chunk.NewEmptyChunk(h.probeSpillFieldTypes) + } + } } } @@ -540,6 +553,10 @@ func (h *hashJoinSpillHelper) initTmpSpillBuildSideChunks() { } } +func (h *hashJoinSpillHelper) isProbeSkippedInRestoreForTest() bool { + return h.skipProbeInRestoreForTest +} + func (h *hashJoinSpillHelper) isRespillTriggeredForTest() bool { return h.spillRoundForTest > 1 } diff --git a/pkg/executor/join/hash_join_v2.go b/pkg/executor/join/hash_join_v2.go index 6387f717c4b23..ca18d0d58eb63 100644 --- a/pkg/executor/join/hash_join_v2.go +++ b/pkg/executor/join/hash_join_v2.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/execdetails" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" ) @@ -359,6 +360,8 @@ type ProbeWorkerV2 struct { // We build individual joinProbe for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. JoinProbe ProbeV2 + + restoredChkBuf *chunk.Chunk } func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) { @@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) { } failpoint.Inject("ConsumeRandomPanic", nil) - // TODO reuse chunk - chk, err := inDisk.GetChunk(i) + err := inDisk.FillChunk(i, w.restoredChkBuf) if err != nil { joinResult.err = err break @@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) { start := time.Now() waitTime := int64(0) - ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult) + ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult) probeTime += int64(time.Since(start)) - waitTime if !ok { break @@ -434,6 +436,7 @@ type BuildWorkerV2 struct { HasNullableKey bool WorkerID uint builder *rowTableBuilder + restoredChkBuf *chunk.Chunk } func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment { @@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) { setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost) } -func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error { +func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error { start := time.Now() - err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber)) + err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber)) if err != nil { return err } @@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i return nil } - var chk *chunk.Chunk - - // TODO reuse chunk - chk, err = inDisk.GetChunk(i) + err = inDisk.FillChunk(i, b.restoredChkBuf) if err != nil { return err } @@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i return err } - err = b.processOneRestoredChunk(chk, cost) + err = b.processOneRestoredChunk(cost) if err != nil { return err } @@ -914,8 +914,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() { } } -func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { - joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk) +func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { + joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf) if joinResult.err != nil { return false, 0, joinResult } @@ -932,6 +932,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) { if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() { + if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore { + w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true + } return true, 0, joinResult } diff --git a/pkg/executor/join/inner_join_spill_test.go b/pkg/executor/join/inner_join_spill_test.go index 3451376e4e647..769e728749aa7 100644 --- a/pkg/executor/join/inner_join_spill_test.go +++ b/pkg/executor/join/inner_join_spill_test.go @@ -147,6 +147,7 @@ func testInnerJoinSpillCase4(t *testing.T, ctx *mock.Context, expectedResult []c require.True(t, hashJoinExec.spillHelper.isSpillTriggedInBuildingStageForTest()) require.True(t, hashJoinExec.spillHelper.areAllPartitionsSpilledForTest()) require.True(t, hashJoinExec.spillHelper.isRespillTriggeredForTest()) + require.True(t, hashJoinExec.spillHelper.isProbeSkippedInRestoreForTest()) checkResults(t, retTypes, result, expectedResult) } @@ -267,14 +268,14 @@ func TestInnerJoinSpillBasic(t *testing.T) { params := []spillTestParam{ // Normal case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // rightUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // leftUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, } err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) @@ -322,8 +323,8 @@ func TestInnerJoinSpillWithOtherCondition(t *testing.T) { otherCondition = append(otherCondition, sf) params := []spillTestParam{ - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, } err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) diff --git a/pkg/executor/join/left_outer_semi_join_probe_test.go b/pkg/executor/join/left_outer_semi_join_probe_test.go index 33b424810c1e6..08b5ec74aa3a8 100644 --- a/pkg/executor/join/left_outer_semi_join_probe_test.go +++ b/pkg/executor/join/left_outer_semi_join_probe_test.go @@ -556,9 +556,9 @@ func TestLeftOuterSemiJoinSpill(t *testing.T) { joinType := logicalop.LeftOuterSemiJoin params := []spillTestParam{ // basic case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 100000, 10000}}, // with other condition - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 3500000, 750000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 3500000, 100000, 10000}}, } for _, param := range params { diff --git a/pkg/executor/join/outer_join_spill_test.go b/pkg/executor/join/outer_join_spill_test.go index a81ad9a2dc6a6..63c08cb8533b5 100644 --- a/pkg/executor/join/outer_join_spill_test.go +++ b/pkg/executor/join/outer_join_spill_test.go @@ -93,14 +93,14 @@ func TestOuterJoinSpillBasic(t *testing.T) { params := []spillTestParam{ // Normal case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // rightUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // leftUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, } err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) @@ -146,8 +146,8 @@ func TestOuterJoinSpillWithSel(t *testing.T) { params := []spillTestParam{ // Normal case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}}, } err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) @@ -201,8 +201,8 @@ func TestOuterJoinSpillWithOtherCondition(t *testing.T) { otherCondition = append(otherCondition, sf) params := []spillTestParam{ - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, } err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) diff --git a/pkg/executor/join/semi_join_probe_test.go b/pkg/executor/join/semi_join_probe_test.go index f5f3c43711c28..d5d3eb908fb1c 100644 --- a/pkg/executor/join/semi_join_probe_test.go +++ b/pkg/executor/join/semi_join_probe_test.go @@ -403,11 +403,11 @@ func TestSemiSpill(t *testing.T) { joinTypes := []logicalop.JoinType{logicalop.SemiJoin} params := []spillTestParam{ // basic case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3000000, 600000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3500000, 600000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3000000, 100000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3500000, 100000, 10000}}, // with other condition - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 600000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 600000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}}, } for _, joinType := range joinTypes { diff --git a/pkg/util/chunk/chunk_in_disk.go b/pkg/util/chunk/chunk_in_disk.go index 268f1e1eca25f..1e49e36d1bfd4 100644 --- a/pkg/util/chunk/chunk_in_disk.go +++ b/pkg/util/chunk/chunk_in_disk.go @@ -128,10 +128,9 @@ func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 { return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx] } -// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx. -func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { +func (d *DataInDiskByChunks) readFromFisk(chkIdx int) error { if err := injectChunkInDiskRandomError(); err != nil { - return nil, err + return err } reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx]) @@ -145,19 +144,39 @@ func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { readByteNum, err := io.ReadFull(reader, d.buf) if err != nil { - return nil, err + return err } if int64(readByteNum) != chkSize { - return nil, errors2.New("Fail to restore the spilled chunk") + return errors2.New("Fail to restore the spilled chunk") + } + + return nil +} + +// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx. +func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { + err := d.readFromFisk(chkIdx) + if err != nil { + return nil, err } chk := NewEmptyChunk(d.fieldTypes) d.deserializeDataToChunk(chk) - return chk, nil } +// FillChunk fills a Chunk from the DataInDiskByChunks by chkIdx. +func (d *DataInDiskByChunks) FillChunk(srcChkIdx int, destChk *Chunk) error { + err := d.readFromFisk(srcChkIdx) + if err != nil { + return err + } + + d.deserializeDataToChunk(destChk) + return nil +} + // Close releases the disk resource. func (d *DataInDiskByChunks) Close() { if d.dataFile.file != nil { @@ -264,7 +283,11 @@ func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullM func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) { selLen := int64(selSize) / intLen - chk.sel = make([]int, selLen) + if int64(cap(chk.sel)) < selLen { + chk.sel = make([]int, selLen) + } else { + chk.sel = chk.sel[:selLen] + } for i := range selLen { chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos])) *pos += intLen @@ -299,9 +322,25 @@ func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) { func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) { for _, col := range chk.columns { length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos) - col.nullBitmap = make([]byte, nullMapSize) - col.data = make([]byte, dataSize) - col.offsets = make([]int64, offsetSize/int64Len) + + if int64(cap(col.nullBitmap)) < nullMapSize { + col.nullBitmap = make([]byte, nullMapSize) + } else { + col.nullBitmap = col.nullBitmap[:nullMapSize] + } + + if int64(cap(col.data)) < dataSize { + col.data = make([]byte, dataSize) + } else { + col.data = col.data[:dataSize] + } + + offsetsLen := offsetSize / int64Len + if int64(cap(col.offsets)) < offsetsLen { + col.offsets = make([]int64, offsetsLen) + } else { + col.offsets = col.offsets[:offsetsLen] + } col.length = int(length) copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize]) diff --git a/pkg/util/chunk/chunk_in_disk_test.go b/pkg/util/chunk/chunk_in_disk_test.go index 46dd5745c9ee8..c780c96bedd1a 100644 --- a/pkg/util/chunk/chunk_in_disk_test.go +++ b/pkg/util/chunk/chunk_in_disk_test.go @@ -66,7 +66,7 @@ func checkChunk(t *testing.T, chk1, chk2 *Chunk) { } } -func TestDataInDiskByChunks(t *testing.T) { +func testImpl(t *testing.T, isNewChunk bool) { numChk, numRow := 100, 1000 chks, fields := initChunks(numChk, numRow) addAuxDataForChunks(chks) @@ -78,9 +78,29 @@ func TestDataInDiskByChunks(t *testing.T) { require.NoError(t, err) } + chk := NewEmptyChunk(fields) + var err error for i := range numChk { - chk, err := dataInDiskByChunks.GetChunk(i) + if isNewChunk { + chk, err = dataInDiskByChunks.GetChunk(i) + } else { + chk.Reset() + err = dataInDiskByChunks.FillChunk(i, chk) + } require.NoError(t, err) checkChunk(t, chk, chks[i]) } } + +func testGetChunk(t *testing.T) { + testImpl(t, true) +} + +func testFillChunk(t *testing.T) { + testImpl(t, false) +} + +func TestDataInDiskByChunks(t *testing.T) { + testGetChunk(t) + testFillChunk(t) +}