Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ func (*ImplProjection) OnImplement(expr *memo.GroupExpr, reqProp *property.Physi
return nil, nil
}
proj := plannercore.PhysicalProjection{
Exprs: logicProj.Exprs,
CalculateNoDelay: logicProj.CalculateNoDelay,
AvoidColumnEvaluator: logicProj.AvoidColumnEvaluator,
Exprs: logicProj.Exprs,
CalculateNoDelay: logicProj.CalculateNoDelay,
}.Init(logicProj.SCtx(), logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt), logicProj.QueryBlockOffset(), childProp)
proj.SetSchema(logicProp.Schema)
return []memo.Implementation{impl.NewProjectionImpl(proj)}, nil
Expand Down
10 changes: 4 additions & 6 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,9 +1069,8 @@ func constructInnerProj(prop *property.PhysicalProperty, proj *logicalop.Logical
return child
}
physicalProj := PhysicalProjection{
Exprs: proj.Exprs,
CalculateNoDelay: proj.CalculateNoDelay,
AvoidColumnEvaluator: proj.AvoidColumnEvaluator,
Exprs: proj.Exprs,
CalculateNoDelay: proj.CalculateNoDelay,
}.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), prop)
physicalProj.SetChildren(child)
physicalProj.SetSchema(proj.Schema())
Expand Down Expand Up @@ -2030,9 +2029,8 @@ func exhaustPhysicalPlans4LogicalProjection(lp base.LogicalPlan, prop *property.
ret := make([]base.PhysicalPlan, 0, len(newProps))
for _, newProp := range newProps {
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
}.Init(ctx, p.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), p.QueryBlockOffset(), newProp)
proj.SetSchema(p.Schema())
ret = append(ret, proj)
Expand Down
6 changes: 2 additions & 4 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ func (b *PlanBuilder) buildExpand(p base.LogicalPlan, gbyItems []expression.Expr
}
proj.SetSchema(projSchema)
proj.SetChildren(p)
// since expand will ref original col and make some change, do the copy in executor rather than ref the same chunk.column.
proj.AvoidColumnEvaluator = true
proj.Proj4Expand = true
newGbyItems := expression.RestoreGbyExpression(distinctGbyCols, gbyExprsRefPos)

Expand Down Expand Up @@ -1694,7 +1692,7 @@ func (b *PlanBuilder) buildProjection4Union(_ context.Context, u *LogicalUnionAl
}
}
b.optFlag |= flagEliminateProjection
proj := logicalop.LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(b.ctx, b.getSelectOffset())
proj := logicalop.LogicalProjection{Exprs: exprs}.Init(b.ctx, b.getSelectOffset())
proj.SetSchema(u.Schema().Clone())
// reset the schema type to make the "not null" flag right.
for i, expr := range exprs {
Expand Down Expand Up @@ -7265,7 +7263,7 @@ func (b *PlanBuilder) buildProjection4CTEUnion(_ context.Context, seed base.Logi
}
}
b.optFlag |= flagEliminateProjection
proj := logicalop.LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(b.ctx, b.getSelectOffset())
proj := logicalop.LogicalProjection{Exprs: exprs}.Init(b.ctx, b.getSelectOffset())
proj.SetSchema(resSchema)
proj.SetChildren(recur)
return proj, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/logical_union_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (p *LogicalUnionAll) PruneColumns(parentUsedCols []*expression.Column, opt
for j, col := range schema.Columns {
exprs[j] = col
}
proj := logicalop.LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(p.SCtx(), p.QueryBlockOffset())
proj := logicalop.LogicalProjection{Exprs: exprs}.Init(p.SCtx(), p.QueryBlockOffset())
proj.SetSchema(schema)

proj.SetChildren(child)
Expand Down
7 changes: 0 additions & 7 deletions pkg/planner/core/operator/logicalop/logical_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,6 @@ type LogicalProjection struct {
// See "https://dev.mysql.com/doc/refman/5.7/en/do.html" for more detail.
CalculateNoDelay bool

// AvoidColumnEvaluator is a temporary variable which is ONLY used to avoid
// building columnEvaluator for the expressions of Projection which is
// built by buildProjection4Union.
// This can be removed after column pool being supported.
// Related issue: TiDB#8141(https://github.com/pingcap/tidb/issues/8141)
AvoidColumnEvaluator bool

// Proj4Expand is used for expand to project same column reference, while these
// col may be filled with null so we couldn't just eliminate this projection itself.
Proj4Expand bool
Expand Down
17 changes: 17 additions & 0 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func postOptimize(ctx context.Context, sctx base.PlanContext, plan base.Physical
plan = InjectExtraProjection(plan)
mergeContinuousSelections(plan)
plan = eliminateUnionScanAndLock(sctx, plan)
plan = avoidColumnEvaluatorForProjBelowUnion(sctx, plan)
plan = enableParallelApply(sctx, plan)
handleFineGrainedShuffle(ctx, sctx, plan)
propagateProbeParents(plan, nil)
Expand Down Expand Up @@ -1085,6 +1086,22 @@ func physicalOptimize(logic base.LogicalPlan, planCounter *base.PlanCounterTp) (
return t.Plan(), cost, err
}

// avoidColumnEvaluatorForProjBelowUnion sets AvoidColumnEvaluator to false for the projection operator which is a child of Union operator.
func avoidColumnEvaluatorForProjBelowUnion(sctx base.PlanContext, p base.PhysicalPlan) base.PhysicalPlan {
iteratePhysicalPlan(p, func(p base.PhysicalPlan) bool {
switch x := p.(type) {
case *PhysicalUnionAll:
for _, child := range x.Children() {
if proj, ok := child.(*PhysicalProjection); ok {
proj.AvoidColumnEvaluator = true
}
}
}
return true
})
return p
}

// eliminateUnionScanAndLock set lock property for PointGet and BatchPointGet and eliminates UnionScan and Lock.
func eliminateUnionScanAndLock(sctx base.PlanContext, p base.PhysicalPlan) base.PhysicalPlan {
var pointGet *PointGetPlan
Expand Down
73 changes: 73 additions & 0 deletions pkg/planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,76 @@ func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
require.Equal(t, 1, len(correlated))
require.Equal(t, "test.t2.company_no", correlated[0].StringWithCtx(tk.Session().GetExprCtx().GetEvalCtx(), errors.RedactLogDisable))
}

func TestAvoidColumnEvaluatorForProjBelowUnion(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

getPhysicalPlan := func(sql string) base.Plan {
tk.MustExec(sql)
info := tk.Session().ShowProcess()
require.NotNil(t, info)
p, ok := info.Plan.(base.Plan)
require.True(t, ok)
return p
}

var findProjBelowUnion func(p base.Plan) (projsBelowUnion, normalProjs []*core.PhysicalProjection)
findProjBelowUnion = func(p base.Plan) (projsBelowUnion, normalProjs []*core.PhysicalProjection) {
if p == nil {
return projsBelowUnion, normalProjs
}
switch v := p.(type) {
case *core.PhysicalUnionAll:
for _, child := range v.Children() {
if proj, ok := child.(*core.PhysicalProjection); ok {
projsBelowUnion = append(projsBelowUnion, proj)
}
}
default:
for _, child := range p.(base.PhysicalPlan).Children() {
if proj, ok := child.(*core.PhysicalProjection); ok {
normalProjs = append(normalProjs, proj)
}
subProjsBelowUnion, subNormalProjs := findProjBelowUnion(child)
projsBelowUnion = append(projsBelowUnion, subProjsBelowUnion...)
normalProjs = append(normalProjs, subNormalProjs...)
}
}
return projsBelowUnion, normalProjs
}

checkResult := func(sql string) {
p := getPhysicalPlan(sql)
projsBelowUnion, normalProjs := findProjBelowUnion(p)
if proj, ok := p.(*core.PhysicalProjection); ok {
normalProjs = append(normalProjs, proj)
}
require.NotEmpty(t, projsBelowUnion)
for _, proj := range projsBelowUnion {
require.True(t, proj.AvoidColumnEvaluator)
}
for _, proj := range normalProjs {
require.False(t, proj.AvoidColumnEvaluator)
}
}

// Test setup
tk.MustExec("use test")
tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1 (cc1 int, cc2 text);`)
tk.MustExec(`insert into t1 values (1, 'aaaa'), (2, 'bbbb'), (3, 'cccc');`)
tk.MustExec(`create table t2 (cc1 int, cc2 text, primary key(cc1));`)
tk.MustExec(`insert into t2 values (2, '2');`)
tk.MustExec(`set tidb_executor_concurrency = 1;`)
tk.MustExec(`set tidb_window_concurrency = 100;`)

testCases := []string{
`select * from (SELECT DISTINCT cc2 as a, cc2 as b, cc1 as c FROM t2 UNION ALL SELECT count(1) over (partition by cc1), cc2, cc1 FROM t1) order by a, b, c;`,
`select a+1, b+1 from (select cc1 as a, cc2 as b from t1 union select cc2, cc1 from t1) tmp`,
}

for _, sql := range testCases {
checkResult(sql)
}
}
8 changes: 6 additions & 2 deletions pkg/planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,8 +1109,12 @@ func (ts *PhysicalTableScan) MemoryUsage() (sum int64) {
type PhysicalProjection struct {
physicalSchemaProducer

Exprs []expression.Expression
CalculateNoDelay bool
Exprs []expression.Expression
CalculateNoDelay bool

// AvoidColumnEvaluator is ONLY used to avoid building columnEvaluator
// for the expressions of Projection which is child of Union operator.
// Related issue: TiDB#8141(https://github.com/pingcap/tidb/issues/8141)
AvoidColumnEvaluator bool
}

Expand Down
5 changes: 0 additions & 5 deletions pkg/planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ func doPhysicalProjectionElimination(p base.PhysicalPlan) base.PhysicalPlan {
if p.Schema().Len() != 0 {
childProj.SetSchema(p.Schema())
}
// If any of the consecutive projection operators has the AvoidColumnEvaluator set to true,
// we need to set the AvoidColumnEvaluator of the remaining projection to true.
if proj.AvoidColumnEvaluator {
childProj.AvoidColumnEvaluator = true
}
}
for i, col := range p.Schema().Columns {
if p.SCtx().GetSessionVars().StmtCtx.ColRefFromUpdatePlan.Has(int(col.UniqueID)) && !child.Schema().Columns[i].Equal(nil, col) {
Expand Down
15 changes: 5 additions & 10 deletions pkg/planner/core/rule_inject_extra_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ func InjectProjBelowAgg(aggPlan base.PhysicalPlan, aggFuncs []*aggregation.AggFu
child := aggPlan.Children()[0]
prop := aggPlan.GetChildReqProps(0).CloneEssentialFields()
proj := PhysicalProjection{
Exprs: projExprs,
AvoidColumnEvaluator: false,
Exprs: projExprs,
}.Init(aggPlan.SCtx(), child.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), aggPlan.QueryBlockOffset(), prop)
proj.SetSchema(expression.NewSchema(projSchemaCols...))
proj.SetChildren(child)
Expand Down Expand Up @@ -241,8 +240,7 @@ func InjectProjBelowSort(p base.PhysicalPlan, orderByItems []*util.ByItems) base
topProjExprs = append(topProjExprs, col)
}
topProj := PhysicalProjection{
Exprs: topProjExprs,
AvoidColumnEvaluator: false,
Exprs: topProjExprs,
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset(), nil)
topProj.SetSchema(p.Schema().Clone())
topProj.SetChildren(p)
Expand Down Expand Up @@ -274,8 +272,7 @@ func InjectProjBelowSort(p base.PhysicalPlan, orderByItems []*util.ByItems) base

childProp := p.GetChildReqProps(0).CloneEssentialFields()
bottomProj := PhysicalProjection{
Exprs: bottomProjExprs,
AvoidColumnEvaluator: false,
Exprs: bottomProjExprs,
}.Init(p.SCtx(), childPlan.StatsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), p.QueryBlockOffset(), childProp)
bottomProj.SetSchema(expression.NewSchema(bottomProjSchemaCols...))
bottomProj.SetChildren(childPlan)
Expand Down Expand Up @@ -324,8 +321,7 @@ func TurnNominalSortIntoProj(p base.PhysicalPlan, onlyColumn bool, orderByItems

childProp := p.GetChildReqProps(0).CloneEssentialFields()
bottomProj := PhysicalProjection{
Exprs: bottomProjExprs,
AvoidColumnEvaluator: false,
Exprs: bottomProjExprs,
}.Init(p.SCtx(), childPlan.StatsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), p.QueryBlockOffset(), childProp)
bottomProj.SetSchema(expression.NewSchema(bottomProjSchemaCols...))
bottomProj.SetChildren(childPlan)
Expand All @@ -337,8 +333,7 @@ func TurnNominalSortIntoProj(p base.PhysicalPlan, onlyColumn bool, orderByItems
topProjExprs = append(topProjExprs, col)
}
topProj := PhysicalProjection{
Exprs: topProjExprs,
AvoidColumnEvaluator: false,
Exprs: topProjExprs,
}.Init(p.SCtx(), childPlan.StatsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), p.QueryBlockOffset(), childProp)
topProj.SetSchema(childPlan.Schema().Clone())
topProj.SetChildren(bottomProj)
Expand Down
5 changes: 2 additions & 3 deletions pkg/planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,9 +1496,8 @@ func (p *basePhysicalAgg) convertAvgForMPP() *PhysicalProjection {
exprs = append(exprs, p.schema.Columns[i])
}
proj := PhysicalProjection{
Exprs: exprs,
CalculateNoDelay: false,
AvoidColumnEvaluator: false,
Exprs: exprs,
CalculateNoDelay: false,
}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset(), p.GetChildReqProps(0).CloneEssentialFields())
proj.SetSchema(p.schema)

Expand Down