Skip to content

Commit e277439

Browse files
xzhangxian1008ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#56936
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 2f9739a commit e277439

9 files changed

+1145
-42
lines changed

pkg/executor/join/base_join_probe.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
185185
return errors.New("Previous chunk is not probed yet")
186186
}
187187
}
188+
188189
j.currentChunk = chk
189190
logicalRows := chk.NumRows()
190191
// if chk.sel != nil, then physicalRows is different from logicalRows
@@ -312,6 +313,20 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
312313
}
313314

314315
func (j *baseJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
316+
defer func() {
317+
if j.ctx.spillHelper.areAllPartitionsSpilled() {
318+
// We will not call `Probe` function when all partitions are spilled.
319+
// So it's necessary to manually set `currentProbeRow` to avoid check fail.
320+
j.currentProbeRow = j.chunkRows
321+
}
322+
}()
323+
324+
if j.currentChunk != nil {
325+
if j.currentProbeRow < j.chunkRows {
326+
return errors.New("Previous chunk is not probed yet")
327+
}
328+
}
329+
315330
hashValueCol := chk.Column(0)
316331
serializedKeysCol := chk.Column(1)
317332
colNum := chk.NumCols()

pkg/executor/join/hash_join_spill_helper.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type hashJoinSpillHelper struct {
8181
spillTriggedInBuildingStageForTest bool
8282
spillTriggeredBeforeBuildingHashTableForTest bool
8383
allPartitionsSpilledForTest bool
84+
skipProbeInRestoreForTest bool
8485
}
8586

8687
func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, probeFieldTypes []*types.FieldType) *hashJoinSpillHelper {
@@ -373,6 +374,18 @@ func (h *hashJoinSpillHelper) init() {
373374

374375
h.buildRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
375376
h.probeRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
377+
378+
for _, worker := range h.hashJoinExec.BuildWorkers {
379+
if worker.restoredChkBuf == nil {
380+
worker.restoredChkBuf = chunk.NewEmptyChunk(h.buildSpillChkFieldTypes)
381+
}
382+
}
383+
384+
for _, worker := range h.hashJoinExec.ProbeWorkers {
385+
if worker.restoredChkBuf == nil {
386+
worker.restoredChkBuf = chunk.NewEmptyChunk(h.probeSpillFieldTypes)
387+
}
388+
}
376389
}
377390
}
378391

@@ -540,6 +553,10 @@ func (h *hashJoinSpillHelper) initTmpSpillBuildSideChunks() {
540553
}
541554
}
542555

556+
func (h *hashJoinSpillHelper) isProbeSkippedInRestoreForTest() bool {
557+
return h.skipProbeInRestoreForTest
558+
}
559+
543560
func (h *hashJoinSpillHelper) isRespillTriggeredForTest() bool {
544561
return h.spillRoundForTest > 1
545562
}

pkg/executor/join/hash_join_v2.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/pingcap/tidb/pkg/util/chunk"
4343
"github.com/pingcap/tidb/pkg/util/disk"
4444
"github.com/pingcap/tidb/pkg/util/execdetails"
45+
"github.com/pingcap/tidb/pkg/util/intest"
4546
"github.com/pingcap/tidb/pkg/util/memory"
4647
)
4748

@@ -359,6 +360,8 @@ type ProbeWorkerV2 struct {
359360
// We build individual joinProbe for each join worker when use chunk-based
360361
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
361362
JoinProbe ProbeV2
363+
364+
restoredChkBuf *chunk.Chunk
362365
}
363366

364367
func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) {
@@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
392395
}
393396
failpoint.Inject("ConsumeRandomPanic", nil)
394397

395-
// TODO reuse chunk
396-
chk, err := inDisk.GetChunk(i)
398+
err := inDisk.FillChunk(i, w.restoredChkBuf)
397399
if err != nil {
398400
joinResult.err = err
399401
break
@@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
407409

408410
start := time.Now()
409411
waitTime := int64(0)
410-
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult)
412+
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult)
411413
probeTime += int64(time.Since(start)) - waitTime
412414
if !ok {
413415
break
@@ -434,6 +436,7 @@ type BuildWorkerV2 struct {
434436
HasNullableKey bool
435437
WorkerID uint
436438
builder *rowTableBuilder
439+
restoredChkBuf *chunk.Chunk
437440
}
438441

439442
func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment {
@@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) {
449452
setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost)
450453
}
451454

452-
func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error {
455+
func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error {
453456
start := time.Now()
454-
err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
457+
err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
455458
if err != nil {
456459
return err
457460
}
@@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
476479
return nil
477480
}
478481

479-
var chk *chunk.Chunk
480-
481-
// TODO reuse chunk
482-
chk, err = inDisk.GetChunk(i)
482+
err = inDisk.FillChunk(i, b.restoredChkBuf)
483483
if err != nil {
484484
return err
485485
}
@@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
489489
return err
490490
}
491491

492-
err = b.processOneRestoredChunk(chk, cost)
492+
err = b.processOneRestoredChunk(cost)
493493
if err != nil {
494494
return err
495495
}
@@ -914,8 +914,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() {
914914
}
915915
}
916916

917-
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
918-
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk)
917+
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
918+
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf)
919919
if joinResult.err != nil {
920920
return false, 0, joinResult
921921
}
@@ -932,6 +932,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult
932932

933933
func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) {
934934
if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() {
935+
if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore {
936+
w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true
937+
}
935938
return true, 0, joinResult
936939
}
937940

pkg/executor/join/inner_join_spill_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func testInnerJoinSpillCase4(t *testing.T, ctx *mock.Context, expectedResult []c
147147
require.True(t, hashJoinExec.spillHelper.isSpillTriggedInBuildingStageForTest())
148148
require.True(t, hashJoinExec.spillHelper.areAllPartitionsSpilledForTest())
149149
require.True(t, hashJoinExec.spillHelper.isRespillTriggeredForTest())
150+
require.True(t, hashJoinExec.spillHelper.isProbeSkippedInRestoreForTest())
150151
checkResults(t, retTypes, result, expectedResult)
151152
}
152153

@@ -264,14 +265,14 @@ func TestInnerJoinSpillBasic(t *testing.T) {
264265

265266
params := []spillTestParam{
266267
// Normal case
267-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
268-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
268+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
269+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
269270
// rightUsed is empty
270-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
271-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
271+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
272+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
272273
// leftUsed is empty
273-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
274-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
274+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
275+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
275276
}
276277

277278
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
@@ -319,8 +320,8 @@ func TestInnerJoinSpillWithOtherCondition(t *testing.T) {
319320
otherCondition = append(otherCondition, sf)
320321

321322
params := []spillTestParam{
322-
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
323-
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
323+
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
324+
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
324325
}
325326

326327
err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)

0 commit comments

Comments
 (0)