@@ -34,8 +34,10 @@ import (
34
34
"github.com/pingcap/tidb/pkg/util/chunk"
35
35
"github.com/pingcap/tidb/pkg/util/disk"
36
36
"github.com/pingcap/tidb/pkg/util/hack"
37
+ "github.com/pingcap/tidb/pkg/util/logutil"
37
38
"github.com/pingcap/tidb/pkg/util/memory"
38
39
"github.com/pingcap/tidb/pkg/util/set"
40
+ "go.uber.org/zap"
39
41
)
40
42
41
43
// HashAggInput indicates the input of hash agg exec.
@@ -154,6 +156,8 @@ type HashAggExec struct {
154
156
spillHelper * parallelHashAggSpillHelper
155
157
// isChildDrained indicates whether the all data from child has been taken out.
156
158
isChildDrained bool
159
+
160
+ invalidMemoryUsageForTrackingTest bool
157
161
}
158
162
159
163
// Close implements the Executor Close interface.
@@ -204,6 +208,10 @@ func (e *HashAggExec) Close() error {
204
208
channel .Clear (e .finalOutputCh )
205
209
e .executed .Store (false )
206
210
if e .memTracker != nil {
211
+ if e .memTracker .BytesConsumed () < 0 {
212
+ logutil .BgLogger ().Warn ("Memory tracker's counter is invalid" , zap .Int64 ("counter" , e .memTracker .BytesConsumed ()))
213
+ e .invalidMemoryUsageForTrackingTest = true
214
+ }
207
215
e .memTracker .ReplaceBytesUsed (0 )
208
216
}
209
217
e .parallelExecValid = false
@@ -285,6 +293,8 @@ func (e *HashAggExec) initForUnparallelExec() {
285
293
}
286
294
287
295
func (e * HashAggExec ) initPartialWorkers (partialConcurrency int , finalConcurrency int , ctx sessionctx.Context ) {
296
+ memUsage := int64 (0 )
297
+
288
298
for i := 0 ; i < partialConcurrency ; i ++ {
289
299
partialResultsMap := make ([]aggfuncs.AggPartialResultMapper , finalConcurrency )
290
300
for i := 0 ; i < finalConcurrency ; i ++ {
@@ -311,6 +321,8 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
311
321
inflightChunkSync : e .inflightChunkSync ,
312
322
}
313
323
324
+ memUsage += e .partialWorkers [i ].chk .MemoryUsage ()
325
+
314
326
e .partialWorkers [i ].partialResultNumInRow = e .partialWorkers [i ].getPartialResultSliceLenConsiderByteAlign ()
315
327
for j := 0 ; j < finalConcurrency ; j ++ {
316
328
e .partialWorkers [i ].BInMaps [j ] = 0
@@ -328,9 +340,15 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
328
340
chk : exec .NewFirstChunk (e .Children (0 )),
329
341
giveBackCh : e .partialWorkers [i ].inputCh ,
330
342
}
343
+ << << << < HEAD
331
344
e .memTracker .Consume (input .chk .MemoryUsage ())
345
+ == == == =
346
+ memUsage += input .chk .MemoryUsage ()
347
+ >> >> >> > 985609 a5a21 (executor : fix goroutine leak when exceed quota in hash agg (#58078 ))
332
348
e .inputCh <- input
333
349
}
350
+
351
+ e .memTracker .Consume (memUsage )
334
352
}
335
353
336
354
func (e * HashAggExec ) initFinalWorkers (finalConcurrency int ) {
@@ -442,6 +460,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context, waitGroup *sync.WaitGr
442
460
ok bool
443
461
err error
444
462
)
463
+
445
464
defer func () {
446
465
if r := recover (); r != nil {
447
466
recoveryHashAgg (e .finalOutputCh , r )
@@ -494,6 +513,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context, waitGroup *sync.WaitGr
494
513
input .giveBackCh <- chk
495
514
496
515
if hasError := e .spillIfNeed (); hasError {
516
+ e .memTracker .Consume (- mSize )
497
517
return
498
518
}
499
519
}
@@ -857,3 +877,8 @@ func (e *HashAggExec) IsSpillTriggeredForTest() bool {
857
877
}
858
878
return false
859
879
}
880
+
881
+ // IsInvalidMemoryUsageTrackingForTest is for test
882
+ func (e * HashAggExec ) IsInvalidMemoryUsageTrackingForTest () bool {
883
+ return e .invalidMemoryUsageForTrackingTest
884
+ }
0 commit comments