@@ -146,7 +146,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
146
146
return nil
147
147
}
148
148
149
- func (e * IndexNestedLoopHashJoin ) startWorkers (ctx context.Context ) {
149
+ func (e * IndexNestedLoopHashJoin ) startWorkers (ctx context.Context , initBatchSize int ) {
150
150
concurrency := e .Ctx ().GetSessionVars ().IndexLookupJoinConcurrency ()
151
151
if e .stats != nil {
152
152
e .stats .concurrency = concurrency
@@ -164,7 +164,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
164
164
}
165
165
e .joinChkResourceCh = make ([]chan * chunk.Chunk , concurrency )
166
166
e .WorkerWg .Add (1 )
167
- ow := e .newOuterWorker (innerCh )
167
+ ow := e .newOuterWorker (innerCh , initBatchSize )
168
168
go util .WithRecovery (func () { ow .run (e .ctxWithCancel ) }, e .finishJoinWorkers )
169
169
170
170
for i := 0 ; i < concurrency ; i ++ {
@@ -221,7 +221,7 @@ func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() {
221
221
// Next implements the IndexNestedLoopHashJoin Executor interface.
222
222
func (e * IndexNestedLoopHashJoin ) Next (ctx context.Context , req * chunk.Chunk ) error {
223
223
if ! e .prepared {
224
- e .startWorkers (ctx )
224
+ e .startWorkers (ctx , req . RequiredRows () )
225
225
e .prepared = true
226
226
}
227
227
req .Reset ()
@@ -411,14 +411,16 @@ func (*indexHashJoinOuterWorker) pushToChan(ctx context.Context, task *indexHash
411
411
return false
412
412
}
413
413
414
- func (e * IndexNestedLoopHashJoin ) newOuterWorker (innerCh chan * indexHashJoinTask ) * indexHashJoinOuterWorker {
414
+ func (e * IndexNestedLoopHashJoin ) newOuterWorker (innerCh chan * indexHashJoinTask , initBatchSize int ) * indexHashJoinOuterWorker {
415
+ maxBatchSize := e .Ctx ().GetSessionVars ().IndexJoinBatchSize
416
+ batchSize := min (initBatchSize , maxBatchSize )
415
417
ow := & indexHashJoinOuterWorker {
416
418
outerWorker : outerWorker {
417
419
OuterCtx : e .OuterCtx ,
418
420
ctx : e .Ctx (),
419
421
executor : e .Children (0 ),
420
- batchSize : 32 ,
421
- maxBatchSize : e . Ctx (). GetSessionVars (). IndexJoinBatchSize ,
422
+ batchSize : batchSize ,
423
+ maxBatchSize : maxBatchSize ,
422
424
parentMemTracker : e .memTracker ,
423
425
lookup : & e .IndexLookUpJoin ,
424
426
},
0 commit comments