Skip to content

Commit 7419112

Browse files
authored
planner: inherit the index join info and stats bottom-up for index join inner side (#61148)
ref #60106
1 parent 555b81c commit 7419112

File tree

4 files changed

+121
-14
lines changed

4 files changed

+121
-14
lines changed

pkg/planner/core/exhaust_physical_plans.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1215,7 +1215,7 @@ func buildDataSource2IndexScanByIndexJoinProp(
12151215
// here we don't need to construct physical index join here anymore, because we will encapsulate it bottom-up.
12161216
// chosenPath and lastColManager of indexJoinResult should be returned to the caller (seen by index join to keep
12171217
// index join aware of indexColLens and compareFilters).
1218-
completeIndexJoinFeedBackInfo(innerTask.(*CopTask), indexJoinResult, indexJoinResult.chosenRanges.Range(), keyOff2IdxOff)
1218+
completeIndexJoinFeedBackInfo(innerTask.(*CopTask), indexJoinResult, indexJoinResult.chosenRanges, keyOff2IdxOff)
12191219
return innerTask
12201220
}
12211221

@@ -2157,6 +2157,13 @@ func tryToGetIndexJoin(p *logicalop.LogicalJoin, prop *property.PhysicalProperty
21572157
return filterIndexJoinBySessionVars(p.SCtx(), candidates), false
21582158
}
21592159

2160+
func enumerationContainIndexJoin(candidates []base.PhysicalPlan) bool {
2161+
return slices.ContainsFunc(candidates, func(candidate base.PhysicalPlan) bool {
2162+
_, _, ok := getIndexJoinSideAndMethod(candidate)
2163+
return ok
2164+
})
2165+
}
2166+
21602167
// handleFilterIndexJoinHints is trying to avoid generating index join or index hash join when no-index-join related
21612168
// hint is specified in the query. So we can do it in physic enumeration phase.
21622169
func handleFilterIndexJoinHints(p *logicalop.LogicalJoin, candidates []base.PhysicalPlan) []base.PhysicalPlan {
@@ -2186,6 +2193,10 @@ func recordIndexJoinHintWarnings(lp base.LogicalPlan, prop *property.PhysicalPro
21862193
if !ok {
21872194
return nil
21882195
}
2196+
if !p.PreferAny(h.PreferRightAsINLJInner, h.PreferRightAsINLHJInner, h.PreferRightAsINLMJInner,
2197+
h.PreferLeftAsINLJInner, h.PreferLeftAsINLHJInner, h.PreferLeftAsINLMJInner) {
2198+
return nil // no force index join hints
2199+
}
21892200
// Cannot find any valid index join plan with these force hints.
21902201
// Print warning message if any hints cannot work.
21912202
// If the required property is not empty, we will enforce it and try the hint again.

pkg/planner/core/task.go

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ var HeavyFunctionNameMap = map[string]struct{}{
6262
}
6363

6464
func attachPlan2Task(p base.PhysicalPlan, t base.Task) base.Task {
65+
// since almost all current physical plan will be attached to bottom encapsulated task.
66+
// we do the stats inheritance here for all the index join inner task.
67+
inheritStatsFromBottomTaskForIndexJoinInner(p, t)
6568
switch v := t.(type) {
6669
case *CopTask:
6770
if v.indexPlanFinished {
@@ -130,8 +133,8 @@ func (p *PhysicalUnionScan) Attach2Task(tasks ...base.Task) base.Task {
130133
sel.SetChildren(pj.Children()...)
131134
p.SetChildren(sel)
132135
p.SetStats(task.Plan().StatsInfo())
133-
rt, _ := task.(*RootTask)
134-
rt.SetPlan(p)
136+
rt := task.(*RootTask)
137+
rt.SetPlan(p) // root task plan current is p headed.
135138
pj.SetChildren(p)
136139
return pj.Attach2Task(task)
137140
}
@@ -160,6 +163,8 @@ func (p *PhysicalApply) Attach2Task(tasks ...base.Task) base.Task {
160163
p.schema = BuildPhysicalJoinSchema(p.JoinType, p)
161164
t := &RootTask{}
162165
t.SetPlan(p)
166+
// inherit left and right child's warnings.
167+
t.warnings.CopyFrom(&lTask.(*RootTask).warnings, &rTask.(*RootTask).warnings)
163168
return t
164169
}
165170

@@ -200,6 +205,7 @@ func indexHashJoinAttach2TaskV2(p *PhysicalIndexHashJoin, tasks ...base.Task) ba
200205
}
201206
t := &RootTask{}
202207
t.SetPlan(p)
208+
t.warnings.CopyFrom(&outerTask.(*RootTask).warnings, &innerTask.(*RootTask).warnings)
203209
return t
204210
}
205211

@@ -232,6 +238,7 @@ func indexJoinAttach2TaskV2(p *PhysicalIndexJoin, tasks ...base.Task) base.Task
232238
}
233239
t := &RootTask{}
234240
t.SetPlan(p)
241+
t.warnings.CopyFrom(&outerTask.(*RootTask).warnings, &innerTask.(*RootTask).warnings)
235242
return t
236243
}
237244

@@ -264,6 +271,7 @@ func (p *PhysicalHashJoin) Attach2Task(tasks ...base.Task) base.Task {
264271
p.SetChildren(lTask.Plan(), rTask.Plan())
265272
task := &RootTask{}
266273
task.SetPlan(p)
274+
task.warnings.CopyFrom(&rTask.(*RootTask).warnings, &lTask.(*RootTask).warnings)
267275
return task
268276
}
269277

@@ -504,6 +512,7 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...base.Task) base.Task {
504512
partTp: outerTask.partTp,
505513
hashCols: outerTask.hashCols,
506514
}
515+
task.warnings.CopyFrom(&rTask.warnings, &lTask.warnings)
507516
// Current TiFlash doesn't support receive Join executors' schema info directly from TiDB.
508517
// Instead, it calculates Join executors' output schema using algorithm like BuildPhysicalJoinSchema which
509518
// produces full semantic schema.
@@ -594,6 +603,7 @@ func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...base.Task) base.Task {
594603
indexPlanFinished: true,
595604
tablePlan: p,
596605
}
606+
task.warnings.CopyFrom(&rTask.warnings, &lTask.warnings)
597607
return task
598608
}
599609

@@ -604,6 +614,7 @@ func (p *PhysicalMergeJoin) Attach2Task(tasks ...base.Task) base.Task {
604614
p.SetChildren(lTask.Plan(), rTask.Plan())
605615
t := &RootTask{}
606616
t.SetPlan(p)
617+
t.warnings.CopyFrom(&rTask.(*RootTask).warnings, &lTask.(*RootTask).warnings)
607618
return t
608619
}
609620

@@ -1503,6 +1514,48 @@ func (sel *PhysicalSelection) Attach2Task(tasks ...base.Task) base.Task {
15031514
return attachPlan2Task(sel, t)
15041515
}
15051516

1517+
func inheritStatsFromBottomElemForIndexJoinInner(p base.PhysicalPlan, indexJoinInfo *IndexJoinInfo, stats *property.StatsInfo) {
1518+
var isIndexJoin bool
1519+
switch p.(type) {
1520+
case *PhysicalIndexJoin, *PhysicalIndexHashJoin, *PhysicalIndexMergeJoin:
1521+
isIndexJoin = true
1522+
default:
1523+
}
1524+
// indexJoinInfo != nil means the child Task comes from an index join inner side.
1525+
// !isIndexJoin means the childTask only be passed through to indexJoin as an END.
1526+
if !isIndexJoin && indexJoinInfo != nil {
1527+
switch p.(type) {
1528+
case *PhysicalSelection:
1529+
// todo: for simplicity, we can just inherit it from child.
1530+
p.StatsInfo().ScaleByExpectCnt(stats.RowCount)
1531+
case *PhysicalProjection:
1532+
// mainly about the rowEst, proj doesn't change that.
1533+
p.StatsInfo().ScaleByExpectCnt(stats.RowCount)
1534+
case *PhysicalHashAgg, *PhysicalStreamAgg:
1535+
// todo: for simplicity, we can just inherit it from child.
1536+
p.StatsInfo().ScaleByExpectCnt(stats.RowCount)
1537+
case *PhysicalUnionScan:
1538+
// todo: for simplicity, we can just inherit it from child.
1539+
p.StatsInfo().ScaleByExpectCnt(stats.RowCount)
1540+
default:
1541+
p.StatsInfo().ScaleByExpectCnt(stats.RowCount)
1542+
}
1543+
}
1544+
}
1545+
1546+
func inheritStatsFromBottomTaskForIndexJoinInner(p base.PhysicalPlan, t base.Task) {
1547+
var indexJoinInfo *IndexJoinInfo
1548+
switch v := t.(type) {
1549+
case *CopTask:
1550+
indexJoinInfo = v.IndexJoinInfo
1551+
case *RootTask:
1552+
indexJoinInfo = v.IndexJoinInfo
1553+
default:
1554+
// index join's inner side couldn't be a mppTask, leave it.
1555+
}
1556+
inheritStatsFromBottomElemForIndexJoinInner(p, indexJoinInfo, t.Plan().StatsInfo())
1557+
}
1558+
15061559
// CheckAggCanPushCop checks whether the aggFuncs and groupByItems can
15071560
// be pushed down to coprocessor.
15081561
func CheckAggCanPushCop(sctx base.PlanContext, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression, storeType kv.StoreType) bool {
@@ -2179,6 +2232,10 @@ func (p *PhysicalStreamAgg) Attach2Task(tasks ...base.Task) base.Task {
21792232
if partialAgg != nil {
21802233
if cop.tablePlan != nil {
21812234
cop.finishIndexPlan()
2235+
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
2236+
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
2237+
// join inner side. note: partialAgg will share stats with finalAgg.
2238+
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.tablePlan.StatsInfo())
21822239
partialAgg.SetChildren(cop.tablePlan)
21832240
cop.tablePlan = partialAgg
21842241
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
@@ -2189,10 +2246,15 @@ func (p *PhysicalStreamAgg) Attach2Task(tasks ...base.Task) base.Task {
21892246
// the partial agg, and the schema will be broken.
21902247
cop.needExtraProj = false
21912248
} else {
2249+
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
2250+
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
2251+
// join inner side. note: partialAgg will share stats with finalAgg.
2252+
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.indexPlan.StatsInfo())
21922253
partialAgg.SetChildren(cop.indexPlan)
21932254
cop.indexPlan = partialAgg
21942255
}
21952256
}
2257+
// COP Task -> Root Task, warnings inherited inside.
21962258
t = cop.ConvertToRootTask(p.SCtx())
21972259
attachPlan2Task(finalAgg, t)
21982260
}
@@ -2680,6 +2742,10 @@ func (p *PhysicalHashAgg) Attach2Task(tasks ...base.Task) base.Task {
26802742
if partialAgg != nil {
26812743
if cop.tablePlan != nil {
26822744
cop.finishIndexPlan()
2745+
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
2746+
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
2747+
// join inner side. note: partialAgg will share stats with finalAgg.
2748+
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.tablePlan.StatsInfo())
26832749
partialAgg.SetChildren(cop.tablePlan)
26842750
cop.tablePlan = partialAgg
26852751
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
@@ -2690,6 +2756,10 @@ func (p *PhysicalHashAgg) Attach2Task(tasks ...base.Task) base.Task {
26902756
// the partial agg, and the schema will be broken.
26912757
cop.needExtraProj = false
26922758
} else {
2759+
// the partialAgg attachment didn't follow the attachPlan2Task function, so here we actively call
2760+
// inheritStatsFromBottomForIndexJoinInner(p, t) to inherit stats from the bottom plan for index
2761+
// join inner side. note: partialAgg will share stats with finalAgg.
2762+
inheritStatsFromBottomElemForIndexJoinInner(partialAgg, cop.IndexJoinInfo, cop.indexPlan.StatsInfo())
26932763
partialAgg.SetChildren(cop.indexPlan)
26942764
cop.indexPlan = partialAgg
26952765
}
@@ -2744,17 +2814,20 @@ func (p *PhysicalCTEStorage) Attach2Task(tasks ...base.Task) base.Task {
27442814
t := tasks[0].Copy()
27452815
if mpp, ok := t.(*MppTask); ok {
27462816
p.SetChildren(t.Plan())
2747-
return &MppTask{
2817+
nt := &MppTask{
27482818
p: p,
27492819
partTp: mpp.partTp,
27502820
hashCols: mpp.hashCols,
27512821
tblColHists: mpp.tblColHists,
27522822
}
2823+
nt.warnings.CopyFrom(&mpp.warnings)
2824+
return nt
27532825
}
27542826
t.ConvertToRootTask(p.SCtx())
27552827
p.SetChildren(t.Plan())
27562828
ta := &RootTask{}
27572829
ta.SetPlan(p)
2830+
ta.warnings.CopyFrom(&t.(*RootTask).warnings)
27582831
return ta
27592832
}
27602833

@@ -2782,6 +2855,21 @@ func (p *PhysicalSequence) Attach2Task(tasks ...base.Task) base.Task {
27822855
hashCols: lastTask.hashCols,
27832856
tblColHists: lastTask.tblColHists,
27842857
}
2858+
tmpWarnings := make([]*simpleWarnings, 0, len(tasks))
2859+
for _, t := range tasks {
2860+
if mpp, ok := t.(*MppTask); ok {
2861+
tmpWarnings = append(tmpWarnings, &mpp.warnings)
2862+
continue
2863+
}
2864+
if root, ok := t.(*RootTask); ok {
2865+
tmpWarnings = append(tmpWarnings, &root.warnings)
2866+
continue
2867+
}
2868+
if cop, ok := t.(*CopTask); ok {
2869+
tmpWarnings = append(tmpWarnings, &cop.warnings)
2870+
}
2871+
}
2872+
mppTask.warnings.CopyFrom(tmpWarnings...)
27852873
return mppTask
27862874
}
27872875

@@ -2880,11 +2968,13 @@ func (t *MppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *MppTask
28802968
sender.SetChildren(t.p)
28812969
receiver := PhysicalExchangeReceiver{}.Init(ctx, t.p.StatsInfo())
28822970
receiver.SetChildren(sender)
2883-
return &MppTask{
2971+
nt := &MppTask{
28842972
p: receiver,
28852973
partTp: prop.MPPPartitionTp,
28862974
hashCols: prop.MPPPartitionCols,
28872975
}
2976+
nt.warnings.CopyFrom(&t.warnings)
2977+
return nt
28882978
}
28892979

28902980
// IndexJoinInfo is generated by index join's inner ds, which will build their own index choice based

pkg/planner/core/task_base.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ var (
3737
_ base.Task = &CopTask{}
3838
)
3939

40-
var _ context.WarnGetterAppender = &simpleWarnings{}
41-
4240
type simpleWarnings struct {
4341
warnings []*context.SQLWarn
4442
}
@@ -189,6 +187,11 @@ func (t *RootTask) MemoryUsage() (sum int64) {
189187
return sum
190188
}
191189

190+
// AppendWarning appends a warning
191+
func (t *RootTask) AppendWarning(err error) {
192+
t.warnings.AppendWarning(err)
193+
}
194+
192195
// ************************************* RootTask End ******************************************
193196

194197
// ************************************* MPPTask Start ******************************************
@@ -263,6 +266,11 @@ func (t *MppTask) MemoryUsage() (sum int64) {
263266
return
264267
}
265268

269+
// AppendWarning appends a warning
270+
func (t *MppTask) AppendWarning(err error) {
271+
t.warnings.AppendWarning(err)
272+
}
273+
266274
// ConvertToRootTaskImpl implements Task interface.
267275
func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) (rt *RootTask) {
268276
defer func() {
@@ -367,6 +375,11 @@ type CopTask struct {
367375
warnings simpleWarnings
368376
}
369377

378+
// AppendWarning appends a warning
379+
func (t *CopTask) AppendWarning(err error) {
380+
t.warnings.AppendWarning(err)
381+
}
382+
370383
// Invalid implements Task interface.
371384
func (t *CopTask) Invalid() bool {
372385
return t.tablePlan == nil && t.indexPlan == nil && len(t.idxMergePartPlans) == 0

pkg/util/context/warn.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ type WarnAppender interface {
8585
AppendNote(msg error)
8686
}
8787

88-
// WarnGetterAppender provides a function to add a warning and get all warnings.
89-
type WarnGetterAppender interface {
90-
WarnAppender
91-
// GetWarnings gets all warnings. The slice is not copied, so it should not be modified.
92-
GetWarnings() []SQLWarn
93-
}
94-
9588
// WarnHandler provides a handler to append and get warnings.
9689
type WarnHandler interface {
9790
WarnAppender

0 commit comments

Comments
 (0)