Skip to content

Commit ef0a693

Browse files
authored
executor: reuse chunk in hash join v2 during restoring (pingcap#56936) (pingcap#58018)
close pingcap#56828
1 parent 4fc3123 commit ef0a693

File tree

7 files changed

+137
-42
lines changed

7 files changed

+137
-42
lines changed

pkg/executor/join/base_join_probe.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
185185
return errors.New("Previous chunk is not probed yet")
186186
}
187187
}
188+
188189
j.currentChunk = chk
189190
logicalRows := chk.NumRows()
190191
// if chk.sel != nil, then physicalRows is different from logicalRows
@@ -312,6 +313,20 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
312313
}
313314

314315
func (j *baseJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
316+
defer func() {
317+
if j.ctx.spillHelper.areAllPartitionsSpilled() {
318+
// We will not call `Probe` function when all partitions are spilled.
319+
// So it's necessary to manually set `currentProbeRow` to avoid check fail.
320+
j.currentProbeRow = j.chunkRows
321+
}
322+
}()
323+
324+
if j.currentChunk != nil {
325+
if j.currentProbeRow < j.chunkRows {
326+
return errors.New("Previous chunk is not probed yet")
327+
}
328+
}
329+
315330
hashValueCol := chk.Column(0)
316331
serializedKeysCol := chk.Column(1)
317332
colNum := chk.NumCols()

pkg/executor/join/hash_join_spill_helper.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type hashJoinSpillHelper struct {
8181
spillTriggedInBuildingStageForTest bool
8282
spillTriggeredBeforeBuildingHashTableForTest bool
8383
allPartitionsSpilledForTest bool
84+
skipProbeInRestoreForTest bool
8485
}
8586

8687
func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, probeFieldTypes []*types.FieldType) *hashJoinSpillHelper {
@@ -373,6 +374,18 @@ func (h *hashJoinSpillHelper) init() {
373374

374375
h.buildRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
375376
h.probeRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
377+
378+
for _, worker := range h.hashJoinExec.BuildWorkers {
379+
if worker.restoredChkBuf == nil {
380+
worker.restoredChkBuf = chunk.NewEmptyChunk(h.buildSpillChkFieldTypes)
381+
}
382+
}
383+
384+
for _, worker := range h.hashJoinExec.ProbeWorkers {
385+
if worker.restoredChkBuf == nil {
386+
worker.restoredChkBuf = chunk.NewEmptyChunk(h.probeSpillFieldTypes)
387+
}
388+
}
376389
}
377390
}
378391

@@ -540,6 +553,10 @@ func (h *hashJoinSpillHelper) initTmpSpillBuildSideChunks() {
540553
}
541554
}
542555

556+
func (h *hashJoinSpillHelper) isProbeSkippedInRestoreForTest() bool {
557+
return h.skipProbeInRestoreForTest
558+
}
559+
543560
func (h *hashJoinSpillHelper) isRespillTriggeredForTest() bool {
544561
return h.spillRoundForTest > 1
545562
}

pkg/executor/join/hash_join_v2.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/pingcap/tidb/pkg/util/chunk"
4343
"github.com/pingcap/tidb/pkg/util/disk"
4444
"github.com/pingcap/tidb/pkg/util/execdetails"
45+
"github.com/pingcap/tidb/pkg/util/intest"
4546
"github.com/pingcap/tidb/pkg/util/memory"
4647
)
4748

@@ -359,6 +360,8 @@ type ProbeWorkerV2 struct {
359360
// We build individual joinProbe for each join worker when use chunk-based
360361
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
361362
JoinProbe ProbeV2
363+
364+
restoredChkBuf *chunk.Chunk
362365
}
363366

364367
func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) {
@@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
392395
}
393396
failpoint.Inject("ConsumeRandomPanic", nil)
394397

395-
// TODO reuse chunk
396-
chk, err := inDisk.GetChunk(i)
398+
err := inDisk.FillChunk(i, w.restoredChkBuf)
397399
if err != nil {
398400
joinResult.err = err
399401
break
@@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
407409

408410
start := time.Now()
409411
waitTime := int64(0)
410-
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult)
412+
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult)
411413
probeTime += int64(time.Since(start)) - waitTime
412414
if !ok {
413415
break
@@ -434,6 +436,7 @@ type BuildWorkerV2 struct {
434436
HasNullableKey bool
435437
WorkerID uint
436438
builder *rowTableBuilder
439+
restoredChkBuf *chunk.Chunk
437440
}
438441

439442
func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment {
@@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) {
449452
setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost)
450453
}
451454

452-
func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error {
455+
func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error {
453456
start := time.Now()
454-
err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
457+
err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
455458
if err != nil {
456459
return err
457460
}
@@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
476479
return nil
477480
}
478481

479-
var chk *chunk.Chunk
480-
481-
// TODO reuse chunk
482-
chk, err = inDisk.GetChunk(i)
482+
err = inDisk.FillChunk(i, b.restoredChkBuf)
483483
if err != nil {
484484
return err
485485
}
@@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
489489
return err
490490
}
491491

492-
err = b.processOneRestoredChunk(chk, cost)
492+
err = b.processOneRestoredChunk(cost)
493493
if err != nil {
494494
return err
495495
}
@@ -914,8 +914,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() {
914914
}
915915
}
916916

917-
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
918-
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk)
917+
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
918+
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf)
919919
if joinResult.err != nil {
920920
return false, 0, joinResult
921921
}
@@ -932,6 +932,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult
932932

933933
func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) {
934934
if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() {
935+
if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore {
936+
w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true
937+
}
935938
return true, 0, joinResult
936939
}
937940

pkg/executor/join/inner_join_spill_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func testInnerJoinSpillCase4(t *testing.T, ctx *mock.Context, expectedResult []c
147147
require.True(t, hashJoinExec.spillHelper.isSpillTriggedInBuildingStageForTest())
148148
require.True(t, hashJoinExec.spillHelper.areAllPartitionsSpilledForTest())
149149
require.True(t, hashJoinExec.spillHelper.isRespillTriggeredForTest())
150+
require.True(t, hashJoinExec.spillHelper.isProbeSkippedInRestoreForTest())
150151
checkResults(t, retTypes, result, expectedResult)
151152
}
152153

@@ -264,14 +265,14 @@ func TestInnerJoinSpillBasic(t *testing.T) {
264265

265266
params := []spillTestParam{
266267
// Normal case
267-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
268-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
268+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
269+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
269270
// rightUsed is empty
270-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
271-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
271+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
272+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
272273
// leftUsed is empty
273-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
274-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
274+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
275+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
275276
}
276277

277278
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
@@ -319,8 +320,8 @@ func TestInnerJoinSpillWithOtherCondition(t *testing.T) {
319320
otherCondition = append(otherCondition, sf)
320321

321322
params := []spillTestParam{
322-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
323-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
323+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
324+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
324325
}
325326

326327
err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)

pkg/executor/join/outer_join_spill_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,14 @@ func TestOuterJoinSpillBasic(t *testing.T) {
9393

9494
params := []spillTestParam{
9595
// Normal case
96-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
97-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
96+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
97+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
9898
// rightUsed is empty
99-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
100-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
99+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
100+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
101101
// leftUsed is empty
102-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
103-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
102+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
103+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
104104
}
105105

106106
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
@@ -146,8 +146,8 @@ func TestOuterJoinSpillWithSel(t *testing.T) {
146146

147147
params := []spillTestParam{
148148
// Normal case
149-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
150-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
149+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
150+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
151151
}
152152

153153
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
@@ -201,8 +201,8 @@ func TestOuterJoinSpillWithOtherCondition(t *testing.T) {
201201
otherCondition = append(otherCondition, sf)
202202

203203
params := []spillTestParam{
204-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
205-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
204+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
205+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
206206
}
207207

208208
err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)

pkg/util/chunk/chunk_in_disk.go

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,9 @@ func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 {
128128
return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx]
129129
}
130130

131-
// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
132-
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
131+
func (d *DataInDiskByChunks) readFromFisk(chkIdx int) error {
133132
if err := injectChunkInDiskRandomError(); err != nil {
134-
return nil, err
133+
return err
135134
}
136135

137136
reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx])
@@ -145,19 +144,39 @@ func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
145144

146145
readByteNum, err := io.ReadFull(reader, d.buf)
147146
if err != nil {
148-
return nil, err
147+
return err
149148
}
150149

151150
if int64(readByteNum) != chkSize {
152-
return nil, errors2.New("Fail to restore the spilled chunk")
151+
return errors2.New("Fail to restore the spilled chunk")
152+
}
153+
154+
return nil
155+
}
156+
157+
// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
158+
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
159+
err := d.readFromFisk(chkIdx)
160+
if err != nil {
161+
return nil, err
153162
}
154163

155164
chk := NewEmptyChunk(d.fieldTypes)
156165
d.deserializeDataToChunk(chk)
157-
158166
return chk, nil
159167
}
160168

169+
// FillChunk fills a Chunk from the DataInDiskByChunks by chkIdx.
170+
func (d *DataInDiskByChunks) FillChunk(srcChkIdx int, destChk *Chunk) error {
171+
err := d.readFromFisk(srcChkIdx)
172+
if err != nil {
173+
return err
174+
}
175+
176+
d.deserializeDataToChunk(destChk)
177+
return nil
178+
}
179+
161180
// Close releases the disk resource.
162181
func (d *DataInDiskByChunks) Close() {
163182
if d.dataFile.file != nil {
@@ -264,7 +283,11 @@ func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullM
264283

265284
func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) {
266285
selLen := int64(selSize) / intLen
267-
chk.sel = make([]int, selLen)
286+
if int64(cap(chk.sel)) < selLen {
287+
chk.sel = make([]int, selLen)
288+
} else {
289+
chk.sel = chk.sel[:selLen]
290+
}
268291
for i := range selLen {
269292
chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos]))
270293
*pos += intLen
@@ -299,9 +322,25 @@ func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) {
299322
func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) {
300323
for _, col := range chk.columns {
301324
length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos)
302-
col.nullBitmap = make([]byte, nullMapSize)
303-
col.data = make([]byte, dataSize)
304-
col.offsets = make([]int64, offsetSize/int64Len)
325+
326+
if int64(cap(col.nullBitmap)) < nullMapSize {
327+
col.nullBitmap = make([]byte, nullMapSize)
328+
} else {
329+
col.nullBitmap = col.nullBitmap[:nullMapSize]
330+
}
331+
332+
if int64(cap(col.data)) < dataSize {
333+
col.data = make([]byte, dataSize)
334+
} else {
335+
col.data = col.data[:dataSize]
336+
}
337+
338+
offsetsLen := offsetSize / int64Len
339+
if int64(cap(col.offsets)) < offsetsLen {
340+
col.offsets = make([]int64, offsetsLen)
341+
} else {
342+
col.offsets = col.offsets[:offsetsLen]
343+
}
305344

306345
col.length = int(length)
307346
copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize])

pkg/util/chunk/chunk_in_disk_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func checkChunk(t *testing.T, chk1, chk2 *Chunk) {
6666
}
6767
}
6868

69-
func TestDataInDiskByChunks(t *testing.T) {
69+
func testImpl(t *testing.T, isNewChunk bool) {
7070
numChk, numRow := 100, 1000
7171
chks, fields := initChunks(numChk, numRow)
7272
addAuxDataForChunks(chks)
@@ -78,9 +78,29 @@ func TestDataInDiskByChunks(t *testing.T) {
7878
require.NoError(t, err)
7979
}
8080

81+
chk := NewEmptyChunk(fields)
82+
var err error
8183
for i := range numChk {
82-
chk, err := dataInDiskByChunks.GetChunk(i)
84+
if isNewChunk {
85+
chk, err = dataInDiskByChunks.GetChunk(i)
86+
} else {
87+
chk.Reset()
88+
err = dataInDiskByChunks.FillChunk(i, chk)
89+
}
8390
require.NoError(t, err)
8491
checkChunk(t, chk, chks[i])
8592
}
8693
}
94+
95+
func testGetChunk(t *testing.T) {
96+
testImpl(t, true)
97+
}
98+
99+
func testFillChunk(t *testing.T) {
100+
testImpl(t, false)
101+
}
102+
103+
func TestDataInDiskByChunks(t *testing.T) {
104+
testGetChunk(t)
105+
testFillChunk(t)
106+
}

0 commit comments

Comments
 (0)