@@ -33,8 +33,10 @@ import (
33
33
"github.com/pingcap/tidb/pkg/util/chunk"
34
34
"github.com/pingcap/tidb/pkg/util/disk"
35
35
"github.com/pingcap/tidb/pkg/util/hack"
36
+ "github.com/pingcap/tidb/pkg/util/logutil"
36
37
"github.com/pingcap/tidb/pkg/util/memory"
37
38
"github.com/pingcap/tidb/pkg/util/set"
39
+ "go.uber.org/zap"
38
40
)
39
41
40
42
// HashAggInput indicates the input of hash agg exec.
@@ -144,6 +146,8 @@ type HashAggExec struct {
144
146
spillAction * AggSpillDiskAction
145
147
// isChildDrained indicates whether the all data from child has been taken out.
146
148
isChildDrained bool
149
+
150
+ invalidMemoryUsageForTrackingTest bool
147
151
}
148
152
149
153
// Close implements the Executor Close interface.
@@ -190,6 +194,10 @@ func (e *HashAggExec) Close() error {
190
194
channel .Clear (e .finalOutputCh )
191
195
e .executed = false
192
196
if e .memTracker != nil {
197
+ if e .memTracker .BytesConsumed () < 0 {
198
+ logutil .BgLogger ().Warn ("Memory tracker's counter is invalid" , zap .Int64 ("counter" , e .memTracker .BytesConsumed ()))
199
+ e .invalidMemoryUsageForTrackingTest = true
200
+ }
193
201
e .memTracker .ReplaceBytesUsed (0 )
194
202
}
195
203
e .parallelExecValid = false
@@ -255,7 +263,88 @@ func (e *HashAggExec) initForUnparallelExec() {
255
263
}
256
264
}
257
265
266
+ << << << < HEAD
258
267
func (e * HashAggExec ) initForParallelExec (_ sessionctx.Context ) {
268
+ == == == =
269
+ func (e * HashAggExec ) initPartialWorkers (partialConcurrency int , finalConcurrency int , ctx sessionctx .Context ) {
270
+ memUsage := int64 (0 )
271
+
272
+ for i := 0 ; i < partialConcurrency ; i ++ {
273
+ partialResultsMap := make ([]aggfuncs.AggPartialResultMapper , finalConcurrency )
274
+ for i := 0 ; i < finalConcurrency ; i ++ {
275
+ partialResultsMap [i ] = make (aggfuncs.AggPartialResultMapper )
276
+ }
277
+
278
+ partialResultsBuffer , groupKeyBuf := getBuffer ()
279
+ e .partialWorkers [i ] = HashAggPartialWorker {
280
+ baseHashAggWorker : newBaseHashAggWorker (e .finishCh , e .PartialAggFuncs , e .MaxChunkSize (), e .memTracker ),
281
+ idForTest : i ,
282
+ ctx : ctx ,
283
+ inputCh : e .partialInputChs [i ],
284
+ outputChs : e .partialOutputChs ,
285
+ giveBackCh : e .inputCh ,
286
+ BInMaps : make ([]int , finalConcurrency ),
287
+ partialResultsBuffer : * partialResultsBuffer ,
288
+ globalOutputCh : e .finalOutputCh ,
289
+ partialResultsMap : partialResultsMap ,
290
+ groupByItems : e .GroupByItems ,
291
+ chk : e .NewChunkWithCapacity (e .Children (0 ).RetFieldTypes (), 0 , e .MaxChunkSize ()),
292
+ groupKeyBuf : * groupKeyBuf ,
293
+ serializeHelpers : aggfuncs .NewSerializeHelper (),
294
+ isSpillPrepared : false ,
295
+ spillHelper : e .spillHelper ,
296
+ inflightChunkSync : e .inflightChunkSync ,
297
+ }
298
+
299
+ memUsage += e .partialWorkers [i ].chk .MemoryUsage ()
300
+
301
+ e .partialWorkers [i ].partialResultNumInRow = e .partialWorkers [i ].getPartialResultSliceLenConsiderByteAlign ()
302
+ for j := 0 ; j < finalConcurrency ; j ++ {
303
+ e .partialWorkers [i ].BInMaps [j ] = 0
304
+ }
305
+
306
+ // There is a bucket in the empty partialResultsMap.
307
+ failpoint .Inject ("ConsumeRandomPanic" , nil )
308
+ e .memTracker .Consume (hack .DefBucketMemoryUsageForMapStrToSlice * (1 << e .partialWorkers [i ].BInMap ))
309
+ if e .stats != nil {
310
+ e .partialWorkers [i ].stats = & AggWorkerStat {}
311
+ e .stats .PartialStats = append (e .stats .PartialStats , e .partialWorkers [i ].stats )
312
+ }
313
+ input := & HashAggInput {
314
+ chk : chunk .New (e .Children (0 ).RetFieldTypes (), 0 , e .MaxChunkSize ()),
315
+ giveBackCh : e .partialWorkers [i ].inputCh ,
316
+ }
317
+ memUsage += input .chk .MemoryUsage ()
318
+ e .inputCh <- input
319
+ }
320
+
321
+ e .memTracker .Consume (memUsage )
322
+ }
323
+
324
+ func (e * HashAggExec ) initFinalWorkers (finalConcurrency int ) {
325
+ for i := 0 ; i < finalConcurrency ; i ++ {
326
+ e .finalWorkers [i ] = HashAggFinalWorker {
327
+ baseHashAggWorker : newBaseHashAggWorker (e .finishCh , e .FinalAggFuncs , e .MaxChunkSize (), e .memTracker ),
328
+ partialResultMap : make (aggfuncs.AggPartialResultMapper ),
329
+ BInMap : 0 ,
330
+ inputCh : e .partialOutputChs [i ],
331
+ outputCh : e .finalOutputCh ,
332
+ finalResultHolderCh : make (chan * chunk.Chunk , 1 ),
333
+ spillHelper : e .spillHelper ,
334
+ restoredAggResultMapperMem : 0 ,
335
+ }
336
+ // There is a bucket in the empty partialResultsMap.
337
+ e .memTracker .Consume (hack .DefBucketMemoryUsageForMapStrToSlice * (1 << e .finalWorkers [i ].BInMap ))
338
+ if e .stats != nil {
339
+ e .finalWorkers [i ].stats = & AggWorkerStat {}
340
+ e .stats .FinalStats = append (e .stats .FinalStats , e .finalWorkers [i ].stats )
341
+ }
342
+ e .finalWorkers [i ].finalResultHolderCh <- chunk .New (e .RetFieldTypes (), 0 , e .MaxChunkSize ())
343
+ }
344
+ }
345
+
346
+ func (e * HashAggExec ) initForParallelExec (ctx sessionctx .Context ) error {
347
+ >> >> >> > 985609 a5a21 (executor : fix goroutine leak when exceed quota in hash agg (#58078 ))
259
348
sessionVars := e .Ctx ().GetSessionVars ()
260
349
finalConcurrency := sessionVars .HashAggFinalConcurrency ()
261
350
partialConcurrency := sessionVars .HashAggPartialConcurrency ()
@@ -351,6 +440,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context, waitGroup *sync.WaitGr
351
440
ok bool
352
441
err error
353
442
)
443
+
354
444
defer func () {
355
445
if r := recover (); r != nil {
356
446
recoveryHashAgg (e .finalOutputCh , r )
@@ -384,6 +474,14 @@ func (e *HashAggExec) fetchChildData(ctx context.Context, waitGroup *sync.WaitGr
384
474
failpoint .Inject ("ConsumeRandomPanic" , nil )
385
475
e .memTracker .Consume (chk .MemoryUsage () - mSize )
386
476
input .giveBackCh <- chk
477
+ << << << < HEAD
478
+ == == == =
479
+
480
+ if hasError := e .spillIfNeed (); hasError {
481
+ e .memTracker .Consume (- mSize )
482
+ return
483
+ }
484
+ >> >> >> > 985609 a5a21 (executor : fix goroutine leak when exceed quota in hash agg (#58078 ))
387
485
}
388
486
}
389
487
@@ -692,3 +790,21 @@ func (e *HashAggExec) initRuntimeStats() {
692
790
e .stats = stats
693
791
}
694
792
}
793
+ << << << < HEAD
794
+ == == == =
795
+
796
+ // IsSpillTriggeredForTest is for test.
797
+ func (e * HashAggExec ) IsSpillTriggeredForTest () bool {
798
+ for i := range e .spillHelper .lock .spilledChunksIO {
799
+ if len (e .spillHelper .lock .spilledChunksIO [i ]) > 0 {
800
+ return true
801
+ }
802
+ }
803
+ return false
804
+ }
805
+
806
+ // IsInvalidMemoryUsageTrackingForTest is for test
807
+ func (e * HashAggExec ) IsInvalidMemoryUsageTrackingForTest () bool {
808
+ return e .invalidMemoryUsageForTrackingTest
809
+ }
810
+ >> >> >> > 985609 a5a21 (executor : fix goroutine leak when exceed quota in hash agg (#58078 ))
0 commit comments