Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,10 @@ func compareCandidates(sctx base.PlanContext, prop *property.PhysicalProperty, l
}

func isMatchProp(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) bool {
if ds.Table.Type().IsClusterTable() && !prop.IsSortItemEmpty() {
// TableScan with cluster table can't keep order.
return false
}
if prop.VectorProp.VSInfo != nil && path.Index != nil && path.Index.VectorInfo != nil {
if path.Index == nil || path.Index.VectorInfo == nil {
return false
Expand Down
77 changes: 76 additions & 1 deletion pkg/planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
Expand Down Expand Up @@ -1089,14 +1090,88 @@ func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *MppTask) bool {
return true
}

// Attach2Task implements physical plan
// For https://github.com/pingcap/tidb/issues/51723,
// This function only supports `CLUSTER_SLOW_QUERY`,
// it will change plan from
// TopN -> TableReader -> TableFullScan[cop] to
// TopN -> TableReader -> Limit[cop] -> TableFullScan[cop] + keepOrder
func (p *PhysicalTopN) pushLimitDownToTiDBCop(copTsk *CopTask) (base.Task, bool) {
if copTsk.indexPlan != nil || copTsk.tablePlan == nil {
return nil, false
}

var (
selOnTblScan *PhysicalSelection
selSelectivity float64
tblScan *PhysicalTableScan
err error
ok bool
)

copTsk.tablePlan, err = copTsk.tablePlan.Clone(p.SCtx())
if err != nil {
return nil, false
}
finalTblScanPlan := copTsk.tablePlan
for len(finalTblScanPlan.Children()) > 0 {
selOnTblScan, _ = finalTblScanPlan.(*PhysicalSelection)
finalTblScanPlan = finalTblScanPlan.Children()[0]
}

if tblScan, ok = finalTblScanPlan.(*PhysicalTableScan); !ok {
return nil, false
}

// Check the table is `CLUSTER_SLOW_QUERY` or not.
if tblScan.Table.Name.O != infoschema.ClusterTableSlowLog {
return nil, false
}

colsProp, ok := GetPropByOrderByItems(p.ByItems)
if !ok {
return nil, false
}
if len(colsProp.SortItems) != 1 || !colsProp.SortItems[0].Col.Equal(p.SCtx().GetExprCtx().GetEvalCtx(), tblScan.HandleCols.GetCol(0)) {
return nil, false
}
if selOnTblScan != nil && tblScan.StatsInfo().RowCount > 0 {
selSelectivity = selOnTblScan.StatsInfo().RowCount / tblScan.StatsInfo().RowCount
}
tblScan.Desc = colsProp.SortItems[0].Desc
tblScan.KeepOrder = true

childProfile := copTsk.Plan().StatsInfo()
newCount := p.Offset + p.Count
stats := util.DeriveLimitStats(childProfile, float64(newCount))
pushedLimit := PhysicalLimit{
Count: newCount,
}.Init(p.SCtx(), stats, p.QueryBlockOffset())
pushedLimit.SetSchema(copTsk.tablePlan.Schema())
copTsk = attachPlan2Task(pushedLimit, copTsk).(*CopTask)
child := pushedLimit.Children()[0]
child.SetStats(child.StatsInfo().ScaleByExpectCnt(float64(newCount)))
if selSelectivity > 0 && selSelectivity < 1 {
scaledRowCount := child.StatsInfo().RowCount / selSelectivity
tblScan.SetStats(tblScan.StatsInfo().ScaleByExpectCnt(scaledRowCount))
}
rootTask := copTsk.ConvertToRootTask(p.SCtx())
return attachPlan2Task(p, rootTask), true
}

// Attach2Task implements the PhysicalPlan interface.
func (p *PhysicalTopN) Attach2Task(tasks ...base.Task) base.Task {
t := tasks[0].Copy()
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*CopTask); ok && needPushDown && copTask.getStoreType() == kv.TiDB && len(copTask.rootTaskConds) == 0 {
newTask, changed := p.pushLimitDownToTiDBCop(copTask)
if changed {
return newTask
}
}
if copTask, ok := t.(*CopTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 {
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
// push it to table plan.
Expand Down
48 changes: 48 additions & 0 deletions tests/integrationtest/r/infoschema/cluster_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,51 @@ select /*+ ignore_index(t, a) */ * from t where a = 1;
id a
create session binding from history using plan digest '20cf414ff6bd6fff3de17a266966020e81099b9fd1a29c4fd4b8aaf212f5c2c0';
drop binding for sql digest '83de0854921816c038565229b8008f5d679d373d16bf6b2a5cacd5937e11ea21';
explain select * from information_schema.cluster_slow_query order by time limit 1;
id estRows task access object operator info
TopN_7 1.00 root information_schema.cluster_slow_query.time, offset:0, count:1
└─TableReader_16 1.00 root data:Limit_15
└─Limit_15 1.00 cop[tidb] offset:0, count:1
└─MemTableScan_14 1.00 cop[tidb] table:CLUSTER_SLOW_QUERY
explain select * from information_schema.cluster_slow_query order by time;
id estRows task access object operator info
Sort_4 10000.00 root information_schema.cluster_slow_query.time
└─TableReader_8 10000.00 root data:MemTableScan_7
└─MemTableScan_7 10000.00 cop[tidb] table:CLUSTER_SLOW_QUERY
explain select * from information_schema.cluster_slow_query order by time desc limit 1;
id estRows task access object operator info
TopN_7 1.00 root information_schema.cluster_slow_query.time:desc, offset:0, count:1
└─TableReader_16 1.00 root data:Limit_15
└─Limit_15 1.00 cop[tidb] offset:0, count:1
└─MemTableScan_14 1.00 cop[tidb] table:CLUSTER_SLOW_QUERY
explain select * from information_schema.cluster_slow_query order by time desc;
id estRows task access object operator info
Sort_4 10000.00 root information_schema.cluster_slow_query.time:desc
└─TableReader_8 10000.00 root data:MemTableScan_7
└─MemTableScan_7 10000.00 cop[tidb] table:CLUSTER_SLOW_QUERY
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time limit 1;
id estRows task access object operator info
TopN_8 1.00 root information_schema.cluster_slow_query.time, offset:0, count:1
└─TableReader_18 1.00 root data:Limit_17
└─Limit_17 1.00 cop[tidb] offset:0, count:1
└─Selection_16 1.00 cop[tidb] ne(information_schema.cluster_slow_query.query, "x")
└─MemTableScan_15 1.50 cop[tidb] table:CLUSTER_SLOW_QUERY
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time;
id estRows task access object operator info
Sort_5 166.42 root information_schema.cluster_slow_query.time
└─TableReader_10 166.42 root data:Selection_9
└─Selection_9 166.42 cop[tidb] ne(information_schema.cluster_slow_query.query, "x")
└─MemTableScan_8 250.00 cop[tidb] table:CLUSTER_SLOW_QUERY
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time desc limit 1;
id estRows task access object operator info
TopN_8 1.00 root information_schema.cluster_slow_query.time:desc, offset:0, count:1
└─TableReader_18 1.00 root data:Limit_17
└─Limit_17 1.00 cop[tidb] offset:0, count:1
└─Selection_16 1.00 cop[tidb] ne(information_schema.cluster_slow_query.query, "x")
└─MemTableScan_15 1.50 cop[tidb] table:CLUSTER_SLOW_QUERY
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time desc;
id estRows task access object operator info
Sort_5 166.42 root information_schema.cluster_slow_query.time:desc
└─TableReader_10 166.42 root data:Selection_9
└─Selection_9 166.42 cop[tidb] ne(information_schema.cluster_slow_query.query, "x")
└─MemTableScan_8 250.00 cop[tidb] table:CLUSTER_SLOW_QUERY
10 changes: 10 additions & 0 deletions tests/integrationtest/t/infoschema/cluster_tables.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,13 @@ select /*+ ignore_index(t, a) */ * from t where a = 1;
create session binding from history using plan digest '20cf414ff6bd6fff3de17a266966020e81099b9fd1a29c4fd4b8aaf212f5c2c0';
drop binding for sql digest '83de0854921816c038565229b8008f5d679d373d16bf6b2a5cacd5937e11ea21';

# TestIssue51723
explain select * from information_schema.cluster_slow_query order by time limit 1;
explain select * from information_schema.cluster_slow_query order by time;
explain select * from information_schema.cluster_slow_query order by time desc limit 1;
explain select * from information_schema.cluster_slow_query order by time desc;
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time limit 1;
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time;
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time desc limit 1;
explain select * from information_schema.cluster_slow_query WHERE (time between '2020-09-24 15:23:41.421396' and '2020-09-25 17:57:35.047111') and query != 'x' order by time desc;