Skip to content

Commit a7d79ba

Browse files
wjhuang2016ti-srebot
authored andcommitted
cherry pick pingcap#33168 to release-4.0
Signed-off-by: ti-srebot <[email protected]>
1 parent 75f81d2 commit a7d79ba

File tree

7 files changed

+405
-8
lines changed

7 files changed

+405
-8
lines changed

executor/aggregate_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ func (s *testSuiteAgg) TestHaving(c *C) {
892892
tk.MustQuery("select 1 from t group by c1 having sum(abs(c2 + c3)) = c1").Check(testkit.Rows("1"))
893893
}
894894

895+
<<<<<<< HEAD
895896
func (s *testSuiteAgg) TestIssue26496(c *C) {
896897
tk := testkit.NewTestKitWithInit(c, s.store)
897898

@@ -903,6 +904,13 @@ func (s *testSuiteAgg) TestIssue26496(c *C) {
903904

904905
func (s *testSuiteAgg) TestAggEliminator(c *C) {
905906
tk := testkit.NewTestKitWithInit(c, s.store)
907+
=======
908+
func TestAggEliminator(t *testing.T) {
909+
store, clean := testkit.CreateMockStore(t)
910+
defer clean()
911+
tk := testkit.NewTestKit(t, store)
912+
tk.MustExec("use test")
913+
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
906914

907915
tk.MustExec("create table t(a int primary key, b int)")
908916
tk.MustQuery("select min(a), min(a) from t").Check(testkit.Rows("<nil> <nil>"))

planner/core/logical_plan_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ func (b *PlanBuilder) buildSelection(ctx context.Context, p LogicalPlan, where a
938938

939939
conditions := splitWhere(where)
940940
expressions := make([]expression.Expression, 0, len(conditions))
941-
selection := LogicalSelection{buildByHaving: aggMapper != nil}.Init(b.ctx, b.getSelectOffset())
941+
selection := LogicalSelection{}.Init(b.ctx, b.getSelectOffset())
942942
for _, cond := range conditions {
943943
expr, np, err := b.rewrite(ctx, cond, p, aggMapper, false)
944944
if err != nil {

planner/core/logical_plans.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,6 @@ type LogicalSelection struct {
407407
// but after we converted to CNF(Conjunctive normal form), it can be
408408
// split into a list of AND conditions.
409409
Conditions []expression.Expression
410-
411-
// having selection can't be pushed down, because it must above the aggregation.
412-
buildByHaving bool
413410
}
414411

415412
// ExtractCorrelatedCols implements LogicalPlan interface.

planner/core/rule_predicate_push_down.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func splitSetGetVarFunc(filters []expression.Expression) ([]expression.Expressio
7575
func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
7676
var child LogicalPlan
7777
var retConditions []expression.Expression
78+
<<<<<<< HEAD
7879
if p.buildByHaving {
7980
retConditions, child = p.children[0].PredicatePushDown(predicates)
8081
retConditions = append(retConditions, p.Conditions...)
@@ -83,6 +84,13 @@ func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression)
8384
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...))
8485
retConditions = append(retConditions, canNotBePushDown...)
8586
}
87+
=======
88+
var originConditions []expression.Expression
89+
canBePushDown, canNotBePushDown := splitSetGetVarFunc(p.Conditions)
90+
originConditions = canBePushDown
91+
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...), opt)
92+
retConditions = append(retConditions, canNotBePushDown...)
93+
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
8694
if len(retConditions) > 0 {
8795
p.Conditions = expression.PropagateConstant(p.ctx, retConditions)
8896
// Return table dual when filter is constant false or null.
@@ -597,3 +605,196 @@ func (p *LogicalMemTable) PredicatePushDown(predicates []expression.Expression)
597605
func (*ppdSolver) name() string {
598606
return "predicate_push_down"
599607
}
608+
<<<<<<< HEAD
609+
=======
610+
611+
func appendTableDualTraceStep(replaced LogicalPlan, dual LogicalPlan, conditions []expression.Expression, opt *logicalOptimizeOp) {
612+
action := func() string {
613+
return fmt.Sprintf("%v_%v is replaced by %v_%v", replaced.TP(), replaced.ID(), dual.TP(), dual.ID())
614+
}
615+
reason := func() string {
616+
buffer := bytes.NewBufferString("The conditions[")
617+
for i, cond := range conditions {
618+
if i > 0 {
619+
buffer.WriteString(",")
620+
}
621+
buffer.WriteString(cond.String())
622+
}
623+
buffer.WriteString("] are constant false or null")
624+
return buffer.String()
625+
}
626+
opt.appendStepToCurrent(dual.ID(), dual.TP(), reason, action)
627+
}
628+
629+
func appendSelectionPredicatePushDownTraceStep(p *LogicalSelection, conditions []expression.Expression, opt *logicalOptimizeOp) {
630+
action := func() string {
631+
return fmt.Sprintf("%v_%v is removed", p.TP(), p.ID())
632+
}
633+
reason := func() string {
634+
return ""
635+
}
636+
if len(conditions) > 0 {
637+
reason = func() string {
638+
buffer := bytes.NewBufferString("The conditions[")
639+
for i, cond := range conditions {
640+
if i > 0 {
641+
buffer.WriteString(",")
642+
}
643+
buffer.WriteString(cond.String())
644+
}
645+
buffer.WriteString(fmt.Sprintf("] in %v_%v are pushed down", p.TP(), p.ID()))
646+
return buffer.String()
647+
}
648+
}
649+
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
650+
}
651+
652+
func appendDataSourcePredicatePushDownTraceStep(ds *DataSource, opt *logicalOptimizeOp) {
653+
if len(ds.pushedDownConds) < 1 {
654+
return
655+
}
656+
reason := func() string {
657+
return ""
658+
}
659+
action := func() string {
660+
buffer := bytes.NewBufferString("The conditions[")
661+
for i, cond := range ds.pushedDownConds {
662+
if i > 0 {
663+
buffer.WriteString(",")
664+
}
665+
buffer.WriteString(cond.String())
666+
}
667+
buffer.WriteString(fmt.Sprintf("] are pushed down across %v_%v", ds.TP(), ds.ID()))
668+
return buffer.String()
669+
}
670+
opt.appendStepToCurrent(ds.ID(), ds.TP(), reason, action)
671+
}
672+
673+
func appendAddSelectionTraceStep(p LogicalPlan, child LogicalPlan, sel *LogicalSelection, opt *logicalOptimizeOp) {
674+
reason := func() string {
675+
return ""
676+
}
677+
action := func() string {
678+
return fmt.Sprintf("add %v_%v to connect %v_%v and %v_%v", sel.TP(), sel.ID(), p.TP(), p.ID(), child.TP(), child.ID())
679+
}
680+
opt.appendStepToCurrent(sel.ID(), sel.TP(), reason, action)
681+
}
682+
683+
// AddPrefix4ShardIndexes add expression prefix for shard index. e.g. an index is test.uk(tidb_shard(a), a).
684+
// It transforms the sql "SELECT * FROM test WHERE a = 10" to
685+
// "SELECT * FROM test WHERE tidb_shard(a) = val AND a = 10", val is the value of tidb_shard(10).
686+
// It also transforms the sql "SELECT * FROM test WHERE a IN (10, 20, 30)" to
687+
// "SELECT * FROM test WHERE tidb_shard(a) = val1 AND a = 10 OR tidb_shard(a) = val2 AND a = 20"
688+
// @param[in] conds the original condtion of this datasource
689+
// @retval - the new condition after adding expression prefix
690+
func (ds *DataSource) AddPrefix4ShardIndexes(sc sessionctx.Context, conds []expression.Expression) []expression.Expression {
691+
if !ds.containExprPrefixUk {
692+
return conds
693+
}
694+
695+
var err error
696+
newConds := conds
697+
698+
for _, path := range ds.possibleAccessPaths {
699+
if !path.IsUkShardIndexPath {
700+
continue
701+
}
702+
newConds, err = ds.addExprPrefixCond(sc, path, newConds)
703+
if err != nil {
704+
logutil.BgLogger().Error("Add tidb_shard expression failed",
705+
zap.Error(err),
706+
zap.Uint64("connection id", sc.GetSessionVars().ConnectionID),
707+
zap.String("database name", ds.DBName.L),
708+
zap.String("table name", ds.tableInfo.Name.L),
709+
zap.String("index name", path.Index.Name.L))
710+
return conds
711+
}
712+
}
713+
714+
return newConds
715+
}
716+
717+
func (ds *DataSource) addExprPrefixCond(sc sessionctx.Context, path *util.AccessPath,
718+
conds []expression.Expression) ([]expression.Expression, error) {
719+
IdxCols, IdxColLens :=
720+
expression.IndexInfo2PrefixCols(ds.Columns, ds.schema.Columns, path.Index)
721+
if len(IdxCols) == 0 {
722+
return conds, nil
723+
}
724+
725+
adder := &exprPrefixAdder{
726+
sctx: sc,
727+
OrigConds: conds,
728+
cols: IdxCols,
729+
lengths: IdxColLens,
730+
}
731+
732+
return adder.addExprPrefix4ShardIndex()
733+
}
734+
735+
// AddExprPrefix4ShardIndex
736+
// if original condition is a LogicOr expression, such as `WHERE a = 1 OR a = 10`,
737+
// call the function AddExprPrefix4DNFCond to add prefix expression tidb_shard(a) = xxx for shard index.
738+
// Otherwise, if the condition is `WHERE a = 1`, `WHERE a = 1 AND b = 10`, `WHERE a IN (1, 2, 3)`......,
739+
// call the function AddExprPrefix4CNFCond to add prefix expression for shard index.
740+
func (adder *exprPrefixAdder) addExprPrefix4ShardIndex() ([]expression.Expression, error) {
741+
if len(adder.OrigConds) == 1 {
742+
if sf, ok := adder.OrigConds[0].(*expression.ScalarFunction); ok && sf.FuncName.L == ast.LogicOr {
743+
return adder.addExprPrefix4DNFCond(sf)
744+
}
745+
}
746+
return adder.addExprPrefix4CNFCond(adder.OrigConds)
747+
}
748+
749+
// AddExprPrefix4CNFCond
750+
// add the prefix expression for CNF condition, e.g. `WHERE a = 1`, `WHERE a = 1 AND b = 10`, ......
751+
// @param[in] conds the original condtion of the datasoure. e.g. `WHERE t1.a = 1 AND t1.b = 10 AND t2.a = 20`.
752+
// if current datasource is `t1`, conds is {t1.a = 1, t1.b = 10}. if current datasource is
753+
// `t2`, conds is {t2.a = 20}
754+
// @return - the new condition after adding expression prefix
755+
func (adder *exprPrefixAdder) addExprPrefix4CNFCond(conds []expression.Expression) ([]expression.Expression, error) {
756+
newCondtionds, err := ranger.AddExpr4EqAndInCondition(adder.sctx,
757+
conds, adder.cols)
758+
759+
return newCondtionds, err
760+
}
761+
762+
// AddExprPrefix4DNFCond
763+
// add the prefix expression for DNF condition, e.g. `WHERE a = 1 OR a = 10`, ......
764+
// The condition returned is `WHERE (tidb_shard(a) = 214 AND a = 1) OR (tidb_shard(a) = 142 AND a = 10)`
765+
// @param[in] condition the original condtion of the datasoure. e.g. `WHERE a = 1 OR a = 10`.
766+
// condtion is `a = 1 OR a = 10`
767+
// @return - the new condition after adding expression prefix. It's still a LogicOr expression.
768+
func (adder *exprPrefixAdder) addExprPrefix4DNFCond(condition *expression.ScalarFunction) ([]expression.Expression, error) {
769+
var err error
770+
dnfItems := expression.FlattenDNFConditions(condition)
771+
newAccessItems := make([]expression.Expression, 0, len(dnfItems))
772+
773+
for _, item := range dnfItems {
774+
if sf, ok := item.(*expression.ScalarFunction); ok {
775+
var accesses []expression.Expression
776+
if sf.FuncName.L == ast.LogicAnd {
777+
cnfItems := expression.FlattenCNFConditions(sf)
778+
accesses, err = adder.addExprPrefix4CNFCond(cnfItems)
779+
if err != nil {
780+
return []expression.Expression{condition}, err
781+
}
782+
newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...))
783+
} else if sf.FuncName.L == ast.EQ || sf.FuncName.L == ast.In {
784+
// only add prefix expression for EQ or IN function
785+
accesses, err = adder.addExprPrefix4CNFCond([]expression.Expression{sf})
786+
if err != nil {
787+
return []expression.Expression{condition}, err
788+
}
789+
newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...))
790+
} else {
791+
newAccessItems = append(newAccessItems, item)
792+
}
793+
} else {
794+
newAccessItems = append(newAccessItems, item)
795+
}
796+
}
797+
798+
return []expression.Expression{expression.ComposeDNFCondition(adder.sctx, newAccessItems...)}, nil
799+
}
800+
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)

planner/core/testdata/ordered_result_mode_suite_out.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,12 +396,21 @@
396396
},
397397
{
398398
"Plan": [
399+
<<<<<<< HEAD
399400
"Selection_8 6400.00 root lt(Column#6, 20)",
400401
"└─Sort_9 8000.00 root Column#5:asc, Column#6:asc, Column#7:asc",
401402
" └─HashAgg_15 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7",
402403
" └─TableReader_16 8000.00 root data:HashAgg_11",
403404
" └─HashAgg_11 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13",
404405
" └─TableFullScan_14 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
406+
=======
407+
"Sort_9 6400.00 root Column#5, Column#6, Column#7",
408+
"└─Selection_11 6400.00 root lt(Column#6, 20)",
409+
" └─HashAgg_16 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7",
410+
" └─TableReader_17 8000.00 root data:HashAgg_12",
411+
" └─HashAgg_12 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13",
412+
" └─TableFullScan_15 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
413+
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
405414
]
406415
},
407416
{

planner/core/testdata/plan_suite_unexported_out.json

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"Name": "TestEagerAggregation",
44
"Cases": [
55
"DataScan(t)->Aggr(sum(test.t.a),sum(plus(test.t.a, 1)),count(test.t.a))->Projection",
6-
"DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Projection->Sel([gt(Column#13, 0)])->Sort->Projection",
6+
"DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Sel([gt(Column#13, 0)])->Projection->Sort->Projection",
77
"Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
88
"Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
99
"Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26),firstrow(test.t.a))->Projection",
@@ -89,7 +89,7 @@
8989
"DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(cast(test.t.a, decimal(20,0) BINARY), Column#13)])->Projection->Projection",
9090
"DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(test.t.a, 1)])->Projection->Projection",
9191
"Dual->Sel([gt(test.t.a, 1)])->Projection",
92-
"DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Projection->Sel([lt(Column#13, 1)])",
92+
"DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Sel([lt(Column#13, 1)])->Projection",
9393
"Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection",
9494
"Dual->Projection",
9595
"DataScan(t)->Projection->Projection->Window(min(test.t.a)->Column#14)->Sel([lt(test.t.a, 10) eq(test.t.b, Column#14)])->Projection->Projection",
@@ -194,7 +194,11 @@
194194
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection",
195195
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection",
196196
"TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection",
197+
<<<<<<< HEAD
197198
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection",
199+
=======
200+
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection",
201+
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
198202
"[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'",
199203
"[planner:1247]Reference 'sum_a' not supported (reference to window function)",
200204
"[planner:3579]Window name 'w2' is not defined.",
@@ -267,7 +271,11 @@
267271
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection",
268272
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection",
269273
"TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection",
274+
<<<<<<< HEAD
270275
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection",
276+
=======
277+
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection",
278+
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
271279
"[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'",
272280
"[planner:1247]Reference 'sum_a' not supported (reference to window function)",
273281
"[planner:3579]Window name 'w2' is not defined.",
@@ -493,12 +501,12 @@
493501
"test.t.f"
494502
]
495503
],
496-
"4": [
504+
"5": [
497505
[
498506
"test.t.f"
499507
]
500508
],
501-
"5": [
509+
"6": [
502510
[
503511
"test.t.f"
504512
]

0 commit comments

Comments
 (0)