From 76ed9c0ea36095b7053d7dc2336c13ddd03ade16 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 22 Nov 2024 20:38:52 +0800 Subject: [PATCH 1/5] fix --- pkg/executor/aggregate/agg_hash_partial_worker.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/executor/aggregate/agg_hash_partial_worker.go b/pkg/executor/aggregate/agg_hash_partial_worker.go index dd9898691bb49..7a981051762d3 100644 --- a/pkg/executor/aggregate/agg_hash_partial_worker.go +++ b/pkg/executor/aggregate/agg_hash_partial_worker.go @@ -72,6 +72,17 @@ type HashAggPartialWorker struct { } func (w *HashAggPartialWorker) getChildInput() bool { + needDone := false + + defer func() { + if err := recover(); err != nil { + if needDone { + w.inflightChunkSync.Done() + } + panic(err) + } + }() + select { case <-w.finishCh: return false @@ -80,6 +91,10 @@ func (w *HashAggPartialWorker) getChildInput() bool { return false } + needDone = true + + w.intestDuringPartialWorkerRun() + sizeBefore := w.chk.MemoryUsage() w.chk.SwapColumns(chk) w.memTracker.Consume(w.chk.MemoryUsage() - sizeBefore) From 476ed5d777cddc884f0d87a5af973b5710fb5775 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 25 Nov 2024 11:15:58 +0800 Subject: [PATCH 2/5] move --- .../aggregate/agg_hash_partial_worker.go | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/pkg/executor/aggregate/agg_hash_partial_worker.go b/pkg/executor/aggregate/agg_hash_partial_worker.go index 7a981051762d3..3d3dd3470c264 100644 --- a/pkg/executor/aggregate/agg_hash_partial_worker.go +++ b/pkg/executor/aggregate/agg_hash_partial_worker.go @@ -71,18 +71,7 @@ type HashAggPartialWorker struct { inflightChunkSync *sync.WaitGroup } -func (w *HashAggPartialWorker) getChildInput() bool { - needDone := false - - defer func() { - if err := recover(); err != nil { - if needDone { - w.inflightChunkSync.Done() - } - panic(err) - } - }() - +func (w *HashAggPartialWorker) getChildInput(needDone *bool) bool { select { case <-w.finishCh: return false @@ -91,7 +80,7 @@ func (w *HashAggPartialWorker) getChildInput() bool { return false } - needDone = true + *needDone = true w.intestDuringPartialWorkerRun() @@ -113,16 +102,21 @@ func (w *HashAggPartialWorker) fetchChunkAndProcess(ctx sessionctx.Context, hasE return false } + needDone := false + defer func() { + if needDone { + w.inflightChunkSync.Done() + } + }() + waitStart := time.Now() - ok := w.getChildInput() + ok := w.getChildInput(&needDone) updateWaitTime(w.stats, waitStart) if !ok { return false } - defer w.inflightChunkSync.Done() - execStart := time.Now() if err := w.updatePartialResult(ctx, w.chk, len(w.partialResultsMap)); err != nil { *hasError = true From 17cebd91048a267f9912933d7d83f502ce7a0eff Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 25 Nov 2024 11:33:29 +0800 Subject: [PATCH 3/5] tweaking --- .../aggregate/agg_hash_partial_worker.go | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/executor/aggregate/agg_hash_partial_worker.go b/pkg/executor/aggregate/agg_hash_partial_worker.go index 3d3dd3470c264..656e075a45602 100644 --- a/pkg/executor/aggregate/agg_hash_partial_worker.go +++ b/pkg/executor/aggregate/agg_hash_partial_worker.go @@ -71,29 +71,16 @@ type HashAggPartialWorker struct { inflightChunkSync *sync.WaitGroup } -func (w *HashAggPartialWorker) getChildInput(needDone *bool) bool { +func (w *HashAggPartialWorker) getChildInput() (*chunk.Chunk, bool) { select { case <-w.finishCh: - return false + return nil, false case chk, ok := <-w.inputCh: if !ok { - return false - } - - *needDone = true - - w.intestDuringPartialWorkerRun() - - sizeBefore := w.chk.MemoryUsage() - w.chk.SwapColumns(chk) - w.memTracker.Consume(w.chk.MemoryUsage() - sizeBefore) - - w.giveBackCh <- &HashAggInput{ - chk: chk, - giveBackCh: w.inputCh, + return nil, false } + return chk, true } - return true } func (w *HashAggPartialWorker) fetchChunkAndProcess(ctx sessionctx.Context, hasError *bool, needShuffle *bool) bool { @@ -110,13 +97,26 @@ func (w *HashAggPartialWorker) fetchChunkAndProcess(ctx sessionctx.Context, hasE }() waitStart := time.Now() - ok := w.getChildInput(&needDone) + chk, ok := w.getChildInput() updateWaitTime(w.stats, waitStart) if !ok { return false } + defer w.inflightChunkSync.Done() + + w.intestDuringPartialWorkerRun() + + sizeBefore := w.chk.MemoryUsage() + w.chk.SwapColumns(chk) + w.memTracker.Consume(w.chk.MemoryUsage() - sizeBefore) + + w.giveBackCh <- &HashAggInput{ + chk: chk, + giveBackCh: w.inputCh, + } + execStart := time.Now() if err := w.updatePartialResult(ctx, w.chk, len(w.partialResultsMap)); err != nil { *hasError = true From a2b8dd7e990a300d35dba11c3d02af6f2fdf1661 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 25 Nov 2024 13:26:51 +0800 Subject: [PATCH 4/5] fix --- pkg/executor/aggregate/agg_hash_partial_worker.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/executor/aggregate/agg_hash_partial_worker.go b/pkg/executor/aggregate/agg_hash_partial_worker.go index 656e075a45602..02786f2b48397 100644 --- a/pkg/executor/aggregate/agg_hash_partial_worker.go +++ b/pkg/executor/aggregate/agg_hash_partial_worker.go @@ -89,13 +89,6 @@ func (w *HashAggPartialWorker) fetchChunkAndProcess(ctx sessionctx.Context, hasE return false } - needDone := false - defer func() { - if needDone { - w.inflightChunkSync.Done() - } - }() - waitStart := time.Now() chk, ok := w.getChildInput() updateWaitTime(w.stats, waitStart) From 9256bbd688ee1ffcc547f741a755b7e42ff298b7 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 25 Nov 2024 16:40:30 +0800 Subject: [PATCH 5/5] tweaking --- pkg/executor/aggregate/agg_hash_partial_worker.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/executor/aggregate/agg_hash_partial_worker.go b/pkg/executor/aggregate/agg_hash_partial_worker.go index 02786f2b48397..284f4275001db 100644 --- a/pkg/executor/aggregate/agg_hash_partial_worker.go +++ b/pkg/executor/aggregate/agg_hash_partial_worker.go @@ -91,13 +91,12 @@ func (w *HashAggPartialWorker) fetchChunkAndProcess(ctx sessionctx.Context, hasE waitStart := time.Now() chk, ok := w.getChildInput() - updateWaitTime(w.stats, waitStart) - if !ok { return false } defer w.inflightChunkSync.Done() + updateWaitTime(w.stats, waitStart) w.intestDuringPartialWorkerRun()