From 30d6794e88e0e1d98529ca8b47698189b42b8227 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 23 Dec 2024 11:44:36 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #58078 Signed-off-by: ti-chi-bot --- pkg/executor/aggregate/agg_hash_executor.go | 25 +++++++++++++++++++++ pkg/executor/aggregate/agg_spill_test.go | 6 +++++ 2 files changed, 31 insertions(+) diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index 4ffe6e706335b..1a2f16aaa3c59 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -34,8 +34,10 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/hack" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/set" + "go.uber.org/zap" ) // HashAggInput indicates the input of hash agg exec. @@ -154,6 +156,8 @@ type HashAggExec struct { spillHelper *parallelHashAggSpillHelper // isChildDrained indicates whether the all data from child has been taken out. isChildDrained bool + + invalidMemoryUsageForTrackingTest bool } // Close implements the Executor Close interface. @@ -204,6 +208,10 @@ func (e *HashAggExec) Close() error { channel.Clear(e.finalOutputCh) e.executed.Store(false) if e.memTracker != nil { + if e.memTracker.BytesConsumed() < 0 { + logutil.BgLogger().Warn("Memory tracker's counter is invalid", zap.Int64("counter", e.memTracker.BytesConsumed())) + e.invalidMemoryUsageForTrackingTest = true + } e.memTracker.ReplaceBytesUsed(0) } e.parallelExecValid = false @@ -285,6 +293,8 @@ func (e *HashAggExec) initForUnparallelExec() { } func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrency int, ctx sessionctx.Context) { + memUsage := int64(0) + for i := 0; i < partialConcurrency; i++ { partialResultsMap := make([]aggfuncs.AggPartialResultMapper, finalConcurrency) for i := 0; i < finalConcurrency; i++ { @@ -311,6 +321,8 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc inflightChunkSync: e.inflightChunkSync, } + memUsage += e.partialWorkers[i].chk.MemoryUsage() + e.partialWorkers[i].partialResultNumInRow = e.partialWorkers[i].getPartialResultSliceLenConsiderByteAlign() for j := 0; j < finalConcurrency; j++ { e.partialWorkers[i].BInMaps[j] = 0 @@ -328,9 +340,15 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc chk: exec.NewFirstChunk(e.Children(0)), giveBackCh: e.partialWorkers[i].inputCh, } +<<<<<<< HEAD e.memTracker.Consume(input.chk.MemoryUsage()) +======= + memUsage += input.chk.MemoryUsage() +>>>>>>> 985609a5a21 (executor: fix goroutine leak when exceed quota in hash agg (#58078)) e.inputCh <- input } + + e.memTracker.Consume(memUsage) } func (e *HashAggExec) initFinalWorkers(finalConcurrency int) { @@ -442,6 +460,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context, waitGroup *sync.WaitGr ok bool err error ) + defer func() { if r := recover(); r != nil { recoveryHashAgg(e.finalOutputCh, r) @@ -494,6 +513,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context, waitGroup *sync.WaitGr input.giveBackCh <- chk if hasError := e.spillIfNeed(); hasError { + e.memTracker.Consume(-mSize) return } } @@ -857,3 +877,8 @@ func (e *HashAggExec) IsSpillTriggeredForTest() bool { } return false } + +// IsInvalidMemoryUsageTrackingForTest is for test +func (e *HashAggExec) IsInvalidMemoryUsageTrackingForTest() bool { + return e.invalidMemoryUsageForTrackingTest +} diff --git a/pkg/executor/aggregate/agg_spill_test.go b/pkg/executor/aggregate/agg_spill_test.go index 253856249cc6d..600e735a758ce 100644 --- a/pkg/executor/aggregate/agg_spill_test.go +++ b/pkg/executor/aggregate/agg_spill_test.go @@ -150,6 +150,7 @@ func generateResult(t *testing.T, ctx *mock.Context, dataSource *testutil.MockDa resultRows = append(resultRows, chk.GetRow(i)) } } + require.False(t, aggExec.IsInvalidMemoryUsageTrackingForTest()) aggExec.Close() require.False(t, aggExec.IsSpillTriggeredForTest()) @@ -315,6 +316,7 @@ func executeCorrecResultTest(t *testing.T, ctx *mock.Context, aggExec *aggregate resultRows = append(resultRows, chk.GetRow(i)) } } + require.False(t, aggExec.IsInvalidMemoryUsageTrackingForTest()) aggExec.Close() require.True(t, aggExec.IsSpillTriggeredForTest()) @@ -351,6 +353,7 @@ func fallBackActionTest(t *testing.T) { } chk.Reset() } + require.False(t, aggExec.IsInvalidMemoryUsageTrackingForTest()) aggExec.Close() require.Less(t, 0, newRootExceedAction.GetTriggeredNum()) } @@ -373,6 +376,7 @@ func randomFailTest(t *testing.T, ctx *mock.Context, aggExec *aggregate.HashAggE go func() { time.Sleep(time.Duration(rand.Int31n(300)) * time.Millisecond) once.Do(func() { + require.False(t, aggExec.IsInvalidMemoryUsageTrackingForTest()) aggExec.Close() }) goRoutineWaiter.Done() @@ -382,6 +386,7 @@ func randomFailTest(t *testing.T, ctx *mock.Context, aggExec *aggregate.HashAggE err := aggExec.Next(tmpCtx, chk) if err != nil { once.Do(func() { + require.False(t, aggExec.IsInvalidMemoryUsageTrackingForTest()) err = aggExec.Close() require.Equal(t, nil, err) }) @@ -393,6 +398,7 @@ func randomFailTest(t *testing.T, ctx *mock.Context, aggExec *aggregate.HashAggE chk.Reset() } once.Do(func() { + require.False(t, aggExec.IsInvalidMemoryUsageTrackingForTest()) aggExec.Close() }) } From fb3941db029c89e96c3216dd2686f12e5a6f78af Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 24 Jun 2025 16:40:52 +0800 Subject: [PATCH 2/2] resolve conflict --- pkg/executor/aggregate/agg_hash_executor.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index 1a2f16aaa3c59..f2b0925f299b8 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -34,10 +34,8 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/hack" - "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/set" - "go.uber.org/zap" ) // HashAggInput indicates the input of hash agg exec. @@ -209,10 +207,10 @@ func (e *HashAggExec) Close() error { e.executed.Store(false) if e.memTracker != nil { if e.memTracker.BytesConsumed() < 0 { - logutil.BgLogger().Warn("Memory tracker's counter is invalid", zap.Int64("counter", e.memTracker.BytesConsumed())) e.invalidMemoryUsageForTrackingTest = true + } else { + e.memTracker.ReplaceBytesUsed(0) } - e.memTracker.ReplaceBytesUsed(0) } e.parallelExecValid = false if e.parallelAggSpillAction != nil { @@ -340,11 +338,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc chk: exec.NewFirstChunk(e.Children(0)), giveBackCh: e.partialWorkers[i].inputCh, } -<<<<<<< HEAD - e.memTracker.Consume(input.chk.MemoryUsage()) -======= memUsage += input.chk.MemoryUsage() ->>>>>>> 985609a5a21 (executor: fix goroutine leak when exceed quota in hash agg (#58078)) e.inputCh <- input }