-
Notifications
You must be signed in to change notification settings - Fork 6k
executor: reuse chunk in hash join v2 during restoring #56936
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
01b8bb7
b703792
6ec9634
1f44417
31e5c03
db0d483
b627e78
6956b0b
fec5f84
bba8434
697b8dc
851f998
7c066a1
7ee28e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ import ( | |
"github.com/pingcap/tidb/pkg/util/chunk" | ||
"github.com/pingcap/tidb/pkg/util/disk" | ||
"github.com/pingcap/tidb/pkg/util/execdetails" | ||
"github.com/pingcap/tidb/pkg/util/intest" | ||
"github.com/pingcap/tidb/pkg/util/memory" | ||
) | ||
|
||
|
@@ -359,6 +360,8 @@ type ProbeWorkerV2 struct { | |
// We build individual joinProbe for each join worker when use chunk-based | ||
// execution, to avoid the concurrency of joiner.chk and joiner.selected. | ||
JoinProbe ProbeV2 | ||
|
||
restoredChk *chunk.Chunk | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. restoredChkBuf |
||
} | ||
|
||
func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) { | ||
|
@@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) { | |
} | ||
failpoint.Inject("ConsumeRandomPanic", nil) | ||
|
||
// TODO reuse chunk | ||
chk, err := inDisk.GetChunk(i) | ||
err := inDisk.FillChunk(i, w.restoredChk) | ||
if err != nil { | ||
joinResult.err = err | ||
break | ||
|
@@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) { | |
|
||
start := time.Now() | ||
waitTime := int64(0) | ||
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult) | ||
ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult) | ||
probeTime += int64(time.Since(start)) - waitTime | ||
if !ok { | ||
break | ||
|
@@ -434,6 +436,7 @@ type BuildWorkerV2 struct { | |
HasNullableKey bool | ||
WorkerID uint | ||
builder *rowTableBuilder | ||
restoredChk *chunk.Chunk | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. restoredChkBuf ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done |
||
} | ||
|
||
func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment { | ||
|
@@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) { | |
setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost) | ||
} | ||
|
||
func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error { | ||
func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error { | ||
start := time.Now() | ||
err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber)) | ||
err := b.builder.processOneRestoredChunk(b.restoredChk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber)) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i | |
return nil | ||
} | ||
|
||
var chk *chunk.Chunk | ||
|
||
// TODO reuse chunk | ||
chk, err = inDisk.GetChunk(i) | ||
err = inDisk.FillChunk(i, b.restoredChk) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i | |
return err | ||
} | ||
|
||
err = b.processOneRestoredChunk(chk, cost) | ||
err = b.processOneRestoredChunk(cost) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -923,8 +923,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() { | |
} | ||
} | ||
|
||
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { | ||
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk) | ||
func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { | ||
joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChk) | ||
if joinResult.err != nil { | ||
return false, 0, joinResult | ||
} | ||
|
@@ -941,6 +941,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult | |
|
||
func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) { | ||
if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() { | ||
if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore { | ||
w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true | ||
} | ||
return true, 0, joinResult | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,14 +93,14 @@ func TestOuterJoinSpillBasic(t *testing.T) { | |
|
||
params := []spillTestParam{ | ||
// Normal case | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, | ||
// rightUsed is empty | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, | ||
// leftUsed is empty | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, | ||
} | ||
|
||
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) | ||
|
@@ -146,8 +146,8 @@ func TestOuterJoinSpillWithSel(t *testing.T) { | |
|
||
params := []spillTestParam{ | ||
// Normal case | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}}, | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}}, | ||
} | ||
|
||
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) | ||
|
@@ -201,8 +201,8 @@ func TestOuterJoinSpillWithOtherCondition(t *testing.T) { | |
otherCondition = append(otherCondition, sf) | ||
|
||
params := []spillTestParam{ | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, | ||
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, | ||
{false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why change these? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I find that one test case is not covered before and add it in this pr. |
||
} | ||
|
||
err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -128,10 +128,9 @@ func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 { | |||||
return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx] | ||||||
} | ||||||
|
||||||
// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx. | ||||||
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { | ||||||
func (d *DataInDiskByChunks) readFromFisk(chkIdx int) error { | ||||||
if err := injectChunkInDiskRandomError(); err != nil { | ||||||
return nil, err | ||||||
return err | ||||||
} | ||||||
|
||||||
reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx]) | ||||||
|
@@ -145,19 +144,39 @@ func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { | |||||
|
||||||
readByteNum, err := io.ReadFull(reader, d.buf) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
return err | ||||||
} | ||||||
|
||||||
if int64(readByteNum) != chkSize { | ||||||
return nil, errors2.New("Fail to restore the spilled chunk") | ||||||
return errors2.New("Fail to restore the spilled chunk") | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx. | ||||||
func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { | ||||||
err := d.readFromFisk(chkIdx) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
chk := NewEmptyChunk(d.fieldTypes) | ||||||
d.deserializeDataToChunk(chk) | ||||||
|
||||||
return chk, nil | ||||||
} | ||||||
|
||||||
// FillChunk fills a Chunk from the DataInDiskByChunks by chkIdx. | ||||||
func (d *DataInDiskByChunks) FillChunk(chkIdx int, chk *Chunk) error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
err := d.readFromFisk(chkIdx) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
d.deserializeDataToChunk(chk) | ||||||
return nil | ||||||
} | ||||||
|
||||||
// Close releases the disk resource. | ||||||
func (d *DataInDiskByChunks) Close() { | ||||||
if d.dataFile.file != nil { | ||||||
|
@@ -264,7 +283,11 @@ func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullM | |||||
|
||||||
func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) { | ||||||
selLen := int64(selSize) / intLen | ||||||
chk.sel = make([]int, selLen) | ||||||
if int64(cap(chk.sel)) < selLen { | ||||||
chk.sel = make([]int, selLen) | ||||||
} else { | ||||||
chk.sel = chk.sel[:selLen] | ||||||
} | ||||||
for i := range selLen { | ||||||
chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos])) | ||||||
*pos += intLen | ||||||
|
@@ -299,9 +322,25 @@ func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) { | |||||
func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) { | ||||||
for _, col := range chk.columns { | ||||||
length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos) | ||||||
col.nullBitmap = make([]byte, nullMapSize) | ||||||
col.data = make([]byte, dataSize) | ||||||
col.offsets = make([]int64, offsetSize/int64Len) | ||||||
|
||||||
if int64(cap(col.nullBitmap)) < nullMapSize { | ||||||
col.nullBitmap = make([]byte, nullMapSize) | ||||||
} else { | ||||||
col.nullBitmap = col.nullBitmap[:nullMapSize] | ||||||
} | ||||||
|
||||||
if int64(cap(col.data)) < dataSize { | ||||||
col.data = make([]byte, dataSize) | ||||||
} else { | ||||||
col.data = col.data[:dataSize] | ||||||
} | ||||||
|
||||||
offsetsLen := offsetSize / int64Len | ||||||
if int64(cap(col.offsets)) < offsetsLen { | ||||||
col.offsets = make([]int64, offsetsLen) | ||||||
} else { | ||||||
col.offsets = col.offsets[:offsetsLen] | ||||||
} | ||||||
|
||||||
col.length = int(length) | ||||||
copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize]) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find that one test case is not covered before and add it in this pr.