Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
return errors.New("Previous chunk is not probed yet")
}
}

j.currentChunk = chk
logicalRows := chk.NumRows()
// if chk.sel != nil, then physicalRows is different from logicalRows
Expand Down Expand Up @@ -312,6 +313,20 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
}

func (j *baseJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
defer func() {
if j.ctx.spillHelper.areAllPartitionsSpilled() {
// We will not call `Probe` function when all partitions are spilled.
// So it's necessary to manually set `currentProbeRow` to avoid check fail.
j.currentProbeRow = j.chunkRows
}
}()

if j.currentChunk != nil {
if j.currentProbeRow < j.chunkRows {
return errors.New("Previous chunk is not probed yet")
}
}

hashValueCol := chk.Column(0)
serializedKeysCol := chk.Column(1)
colNum := chk.NumCols()
Expand Down
17 changes: 17 additions & 0 deletions pkg/executor/join/hash_join_spill_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type hashJoinSpillHelper struct {
spillTriggedInBuildingStageForTest bool
spillTriggeredBeforeBuildingHashTableForTest bool
allPartitionsSpilledForTest bool
skipProbeInRestoreForTest bool
}

func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, probeFieldTypes []*types.FieldType) *hashJoinSpillHelper {
Expand Down Expand Up @@ -373,6 +374,18 @@ func (h *hashJoinSpillHelper) init() {

h.buildRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)
h.probeRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency)

for _, worker := range h.hashJoinExec.BuildWorkers {
if worker.restoredChkBuf == nil {
worker.restoredChkBuf = chunk.NewEmptyChunk(h.buildSpillChkFieldTypes)
}
}

for _, worker := range h.hashJoinExec.ProbeWorkers {
if worker.restoredChkBuf == nil {
worker.restoredChkBuf = chunk.NewEmptyChunk(h.probeSpillFieldTypes)
}
}
}
}

Expand Down Expand Up @@ -540,6 +553,10 @@ func (h *hashJoinSpillHelper) initTmpSpillBuildSideChunks() {
}
}

func (h *hashJoinSpillHelper) isProbeSkippedInRestoreForTest() bool {
return h.skipProbeInRestoreForTest
}

func (h *hashJoinSpillHelper) isRespillTriggeredForTest() bool {
return h.spillRoundForTest > 1
}
Expand Down
27 changes: 15 additions & 12 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
)

Expand Down Expand Up @@ -359,6 +360,8 @@ type ProbeWorkerV2 struct {
// We build individual joinProbe for each join worker when use chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
JoinProbe ProbeV2

restoredChkBuf *chunk.Chunk
}

func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) {
Expand Down Expand Up @@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {
}
failpoint.Inject("ConsumeRandomPanic", nil)

// TODO reuse chunk
chk, err := inDisk.GetChunk(i)
err := inDisk.FillChunk(i, w.restoredChkBuf)
if err != nil {
joinResult.err = err
break
Expand All @@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) {

start := time.Now()
waitTime := int64(0)
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult)
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult)
probeTime += int64(time.Since(start)) - waitTime
if !ok {
break
Expand All @@ -434,6 +436,7 @@ type BuildWorkerV2 struct {
HasNullableKey bool
WorkerID uint
builder *rowTableBuilder
restoredChkBuf *chunk.Chunk
}

func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment {
Expand All @@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) {
setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost)
}

func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error {
func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error {
start := time.Now()
err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber))
if err != nil {
return err
}
Expand All @@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
return nil
}

var chk *chunk.Chunk

// TODO reuse chunk
chk, err = inDisk.GetChunk(i)
err = inDisk.FillChunk(i, b.restoredChkBuf)
if err != nil {
return err
}
Expand All @@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i
return err
}

err = b.processOneRestoredChunk(chk, cost)
err = b.processOneRestoredChunk(cost)
if err != nil {
return err
}
Expand Down Expand Up @@ -914,8 +914,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() {
}
}

func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk)
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) {
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf)
if joinResult.err != nil {
return false, 0, joinResult
}
Expand All @@ -932,6 +932,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult

func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) {
if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() {
if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore {
w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true
}
return true, 0, joinResult
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/executor/join/inner_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func testInnerJoinSpillCase4(t *testing.T, ctx *mock.Context, expectedResult []c
require.True(t, hashJoinExec.spillHelper.isSpillTriggedInBuildingStageForTest())
require.True(t, hashJoinExec.spillHelper.areAllPartitionsSpilledForTest())
require.True(t, hashJoinExec.spillHelper.isRespillTriggeredForTest())
require.True(t, hashJoinExec.spillHelper.isProbeSkippedInRestoreForTest())
checkResults(t, retTypes, result, expectedResult)
}

Expand Down Expand Up @@ -264,14 +265,14 @@ func TestInnerJoinSpillBasic(t *testing.T) {

params := []spillTestParam{
// Normal case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// rightUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// leftUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
}

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down Expand Up @@ -319,8 +320,8 @@ func TestInnerJoinSpillWithOtherCondition(t *testing.T) {
otherCondition = append(otherCondition, sf)

params := []spillTestParam{
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
}

err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down
20 changes: 10 additions & 10 deletions pkg/executor/join/outer_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestOuterJoinSpillBasic(t *testing.T) {

params := []spillTestParam{
// Normal case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// rightUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
// leftUsed is empty
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}},
}

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down Expand Up @@ -146,8 +146,8 @@ func TestOuterJoinSpillWithSel(t *testing.T) {

params := []spillTestParam{
// Normal case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}},
}

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down Expand Up @@ -201,8 +201,8 @@ func TestOuterJoinSpillWithOtherCondition(t *testing.T) {
otherCondition = append(otherCondition, sf)

params := []spillTestParam{
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}},
}

err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`)
Expand Down
59 changes: 49 additions & 10 deletions pkg/util/chunk/chunk_in_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 {
return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx]
}

// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
func (d *DataInDiskByChunks) readFromFisk(chkIdx int) error {
if err := injectChunkInDiskRandomError(); err != nil {
return nil, err
return err
}

reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx])
Expand All @@ -145,19 +144,39 @@ func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {

readByteNum, err := io.ReadFull(reader, d.buf)
if err != nil {
return nil, err
return err
}

if int64(readByteNum) != chkSize {
return nil, errors2.New("Fail to restore the spilled chunk")
return errors2.New("Fail to restore the spilled chunk")
}

return nil
}

// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx.
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) {
err := d.readFromFisk(chkIdx)
if err != nil {
return nil, err
}

chk := NewEmptyChunk(d.fieldTypes)
d.deserializeDataToChunk(chk)

return chk, nil
}

// FillChunk fills a Chunk from the DataInDiskByChunks by chkIdx.
func (d *DataInDiskByChunks) FillChunk(srcChkIdx int, destChk *Chunk) error {
err := d.readFromFisk(srcChkIdx)
if err != nil {
return err
}

d.deserializeDataToChunk(destChk)
return nil
}

// Close releases the disk resource.
func (d *DataInDiskByChunks) Close() {
if d.dataFile.file != nil {
Expand Down Expand Up @@ -264,7 +283,11 @@ func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullM

func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) {
selLen := int64(selSize) / intLen
chk.sel = make([]int, selLen)
if int64(cap(chk.sel)) < selLen {
chk.sel = make([]int, selLen)
} else {
chk.sel = chk.sel[:selLen]
}
for i := range selLen {
chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos]))
*pos += intLen
Expand Down Expand Up @@ -299,9 +322,25 @@ func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) {
func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) {
for _, col := range chk.columns {
length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos)
col.nullBitmap = make([]byte, nullMapSize)
col.data = make([]byte, dataSize)
col.offsets = make([]int64, offsetSize/int64Len)

if int64(cap(col.nullBitmap)) < nullMapSize {
col.nullBitmap = make([]byte, nullMapSize)
} else {
col.nullBitmap = col.nullBitmap[:nullMapSize]
}

if int64(cap(col.data)) < dataSize {
col.data = make([]byte, dataSize)
} else {
col.data = col.data[:dataSize]
}

offsetsLen := offsetSize / int64Len
if int64(cap(col.offsets)) < offsetsLen {
col.offsets = make([]int64, offsetsLen)
} else {
col.offsets = col.offsets[:offsetsLen]
}

col.length = int(length)
copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize])
Expand Down
24 changes: 22 additions & 2 deletions pkg/util/chunk/chunk_in_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func checkChunk(t *testing.T, chk1, chk2 *Chunk) {
}
}

func TestDataInDiskByChunks(t *testing.T) {
func testImpl(t *testing.T, isNewChunk bool) {
numChk, numRow := 100, 1000
chks, fields := initChunks(numChk, numRow)
addAuxDataForChunks(chks)
Expand All @@ -78,9 +78,29 @@ func TestDataInDiskByChunks(t *testing.T) {
require.NoError(t, err)
}

chk := NewEmptyChunk(fields)
var err error
for i := range numChk {
chk, err := dataInDiskByChunks.GetChunk(i)
if isNewChunk {
chk, err = dataInDiskByChunks.GetChunk(i)
} else {
chk.Reset()
err = dataInDiskByChunks.FillChunk(i, chk)
}
require.NoError(t, err)
checkChunk(t, chk, chks[i])
}
}

func testGetChunk(t *testing.T) {
testImpl(t, true)
}

func testFillChunk(t *testing.T) {
testImpl(t, false)
}

func TestDataInDiskByChunks(t *testing.T) {
testGetChunk(t)
testFillChunk(t)
}