Skip to content

Commit d2ee6e9

Browse files
authored
Merge pull request pingcap#3 from windtalker/hanfei/join-merge
basic support for multi table broadcast join
2 parents bb22eb6 + 8a51e02 commit d2ee6e9

File tree

7 files changed

+114
-48
lines changed

7 files changed

+114
-48
lines changed

planner/core/common_plans.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,8 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st
855855
buildSide = plan.InnerChildIdx ^ 1
856856
case *PhysicalIndexHashJoin:
857857
buildSide = plan.InnerChildIdx ^ 1
858+
case *PhysicalBroadCastJoin:
859+
buildSide = plan.InnerChildIdx
858860
}
859861

860862
if buildSide != -1 {

planner/core/exhaust_physical_plans.go

Lines changed: 84 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ import (
3737
)
3838

3939
func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
40+
if prop.IsFlashOnlyProp() {
41+
return nil
42+
}
4043
childProp := prop.Clone()
4144
us := PhysicalUnionScan{
4245
Conditions: p.conditions,
@@ -1416,11 +1419,26 @@ func (p *LogicalJoin) tryToGetIndexJoin(prop *property.PhysicalProperty) (indexJ
14161419
// If the hint is not matched, it will get other candidates.
14171420
// If the hint is not figured, we will pick all candidates.
14181421
func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
1422+
if prop.IsFlashOnlyProp() && ((p.preferJoinType & preferMergeJoin) > 0 || (p.preferJoinType & preferHashJoin) > 0) {
1423+
return nil
1424+
}
1425+
joins := make([]PhysicalPlan, 0, 5)
1426+
if p.ctx.GetSessionVars().AllowBCJ {
1427+
broadCastJoins := p.tryToGetBroadCastJoin(prop)
1428+
if (p.preferJoinType & preferBCJoin) > 0 {
1429+
logutil.BgLogger().Info("prefer bc join", zap.Int("bc count", len(broadCastJoins)))
1430+
return broadCastJoins
1431+
}
1432+
joins = append(joins, broadCastJoins...)
1433+
}
1434+
if prop.IsFlashOnlyProp() {
1435+
return joins
1436+
}
1437+
14191438
mergeJoins := p.GetMergeJoin(prop, p.schema, p.Stats(), p.children[0].statsInfo(), p.children[1].statsInfo())
14201439
if (p.preferJoinType & preferMergeJoin) > 0 {
14211440
return mergeJoins
14221441
}
1423-
joins := make([]PhysicalPlan, 0, 5)
14241442
joins = append(joins, mergeJoins...)
14251443

14261444
indexJoins, forced := p.tryToGetIndexJoin(prop)
@@ -1435,31 +1453,25 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) []Ph
14351453
return hashJoins
14361454
}
14371455
joins = append(joins, hashJoins...)
1438-
1439-
if p.ctx.GetSessionVars().AllowBCJ {
1440-
broadCastJoins := p.tryToGetBroadCastJoin(prop)
1441-
joins = append(joins, broadCastJoins...)
1442-
if (p.preferJoinType & preferBCJoin) > 0 {
1443-
logutil.BgLogger().Info("prefer bc join", zap.Int("bc count", len(broadCastJoins)))
1444-
return broadCastJoins
1445-
}
1446-
}
14471456
return joins
14481457
}
14491458

1450-
func (p *LogicalJoin) tryToGetBroadCastJoin(prop * property.PhysicalProperty) []PhysicalPlan {
1451-
child0, ok0 := p.children[0].(*DataSource)
1452-
if !ok0 {
1453-
return nil
1459+
func getAllDataSourceRowCount(plan LogicalPlan) int64 {
1460+
if ds, ok := plan.(*DataSource); ok {
1461+
return ds.statsInfo().Count()
14541462
}
1455-
child1, ok1 := p.children[1].(*DataSource)
1456-
if !ok1 {
1457-
return nil
1463+
ret := int64(0)
1464+
for _, child := range plan.Children() {
1465+
ret += getAllDataSourceRowCount(child)
14581466
}
1467+
return ret
1468+
}
1469+
1470+
func (p *LogicalJoin) tryToGetBroadCastJoin(prop * property.PhysicalProperty) []PhysicalPlan {
14591471
if !prop.IsEmpty() {
14601472
return nil
14611473
}
1462-
if prop.TaskTp != property.RootTaskType && prop.TaskTp != property.CopTiFlashLocalReadTaskType && prop.TaskTp != property.CopTiFlashGlobalReadTaskType {
1474+
if prop.TaskTp != property.RootTaskType && !prop.IsFlashOnlyProp() {
14631475
return nil
14641476
}
14651477

@@ -1476,20 +1488,28 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop * property.PhysicalProperty) []
14761488
LeftJoinKeys: lkeys,
14771489
RightJoinKeys: rkeys,
14781490
}
1479-
if child0.stats.Count() < child1.stats.Count() {
1491+
// todo: currently, build side is the one has less rowcont and global read side
1492+
// is the one has less datasource row count(which mean less remote read), need
1493+
// to use cbo to decide the build side and global read side
1494+
if p.children[0].statsInfo().Count() < p.children[1].statsInfo().Count() {
14801495
baseJoin.InnerChildIdx = 0
14811496
} else {
14821497
baseJoin.InnerChildIdx = 1
14831498
}
1499+
globalIndex := baseJoin.InnerChildIdx
1500+
if prop.TaskTp != property.CopTiFlashGlobalReadTaskType && getAllDataSourceRowCount(p.children[globalIndex]) > getAllDataSourceRowCount(p.children[1 - globalIndex]) {
1501+
globalIndex = 1 - globalIndex
1502+
}
14841503
childrenReqProps := make([]*property.PhysicalProperty, 2)
1485-
childrenReqProps[baseJoin.InnerChildIdx] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType}
1504+
childrenReqProps[globalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType}
14861505
if prop.TaskTp == property.CopTiFlashGlobalReadTaskType {
1487-
childrenReqProps[1-baseJoin.InnerChildIdx] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType}
1506+
childrenReqProps[1-globalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType}
14881507
} else {
1489-
childrenReqProps[1-baseJoin.InnerChildIdx] = &property.PhysicalProperty{TaskTp: property.CopTiFlashLocalReadTaskType}
1508+
childrenReqProps[1-globalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashLocalReadTaskType}
14901509
}
14911510
join := PhysicalBroadCastJoin {
14921511
basePhysicalJoin: baseJoin,
1512+
globalChildIndex: globalIndex,
14931513
}.Init(p.ctx, p.stats, p.blockOffset, childrenReqProps...)
14941514
results := make([]PhysicalPlan, 0, 1)
14951515
results = append(results, join)
@@ -1500,6 +1520,9 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop * property.PhysicalProperty) []
15001520
// When a sort column will be replaced by scalar function, we refuse it.
15011521
// When a sort column will be replaced by a constant, we just remove it.
15021522
func (p *LogicalProjection) TryToGetChildProp(prop *property.PhysicalProperty) (*property.PhysicalProperty, bool) {
1523+
if prop.IsFlashOnlyProp() {
1524+
return nil, false
1525+
}
15031526
newProp := &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: prop.ExpectedCnt}
15041527
newCols := make([]property.Item, 0, len(prop.Items))
15051528
for _, col := range prop.Items {
@@ -1529,9 +1552,10 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
15291552
return []PhysicalPlan{proj}
15301553
}
15311554

1532-
func (lt *LogicalTopN) getPhysTopN() []PhysicalPlan {
1533-
ret := make([]PhysicalPlan, 0, 3)
1534-
for _, tp := range wholeTaskTypes {
1555+
func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPlan {
1556+
allTaskTypes := prop.GetAllPossibleChildTaskTypes()
1557+
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
1558+
for _, tp := range allTaskTypes {
15351559
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64}
15361560
topN := PhysicalTopN{
15371561
ByItems: lt.ByItems,
@@ -1543,14 +1567,15 @@ func (lt *LogicalTopN) getPhysTopN() []PhysicalPlan {
15431567
return ret
15441568
}
15451569

1546-
func (lt *LogicalTopN) getPhysLimits() []PhysicalPlan {
1547-
prop, canPass := GetPropByOrderByItems(lt.ByItems)
1570+
func (lt *LogicalTopN) getPhysLimits(prop *property.PhysicalProperty) []PhysicalPlan {
1571+
p, canPass := GetPropByOrderByItems(lt.ByItems)
15481572
if !canPass {
15491573
return nil
15501574
}
1575+
allTaskTypes := prop.GetAllPossibleChildTaskTypes()
15511576
ret := make([]PhysicalPlan, 0, 3)
1552-
for _, tp := range wholeTaskTypes {
1553-
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), Items: prop.Items}
1577+
for _, tp := range allTaskTypes {
1578+
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), Items: p.Items}
15541579
limit := PhysicalLimit{
15551580
Count: lt.Count,
15561581
Offset: lt.Offset,
@@ -1562,7 +1587,7 @@ func (lt *LogicalTopN) getPhysLimits() []PhysicalPlan {
15621587

15631588
// MatchItems checks if this prop's columns can match by items totally.
15641589
func MatchItems(p *property.PhysicalProperty, items []*ByItems) bool {
1565-
if len(items) < len(p.Items) {
1590+
if len(items) < len(p.Items) || p.IsFlashOnlyProp() {
15661591
return false
15671592
}
15681593
for i, col := range p.Items {
@@ -1576,7 +1601,7 @@ func MatchItems(p *property.PhysicalProperty, items []*ByItems) bool {
15761601

15771602
func (lt *LogicalTopN) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
15781603
if MatchItems(prop, lt.ByItems) {
1579-
return append(lt.getPhysTopN(), lt.getPhysLimits()...)
1604+
return append(lt.getPhysTopN(prop), lt.getPhysLimits(prop)...)
15801605
}
15811606
return nil
15821607
}
@@ -1587,7 +1612,7 @@ func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHa
15871612
}
15881613

15891614
func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
1590-
if !prop.AllColsFromSchema(la.children[0].Schema()) { // for convenient, we don't pass through any prop
1615+
if !prop.AllColsFromSchema(la.children[0].Schema()) || prop.IsFlashOnlyProp() { // for convenient, we don't pass through any prop
15911616
return nil
15921617
}
15931618
join := la.GetHashJoin(prop)
@@ -1604,6 +1629,9 @@ func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) []
16041629
}
16051630

16061631
func (p *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
1632+
if prop.IsFlashOnlyProp() {
1633+
return nil
1634+
}
16071635
var byItems []property.Item
16081636
byItems = append(byItems, p.PartitionBy...)
16091637
byItems = append(byItems, p.OrderBy...)
@@ -1637,8 +1665,12 @@ func (la *LogicalAggregation) canPushToCop() bool {
16371665
}
16381666

16391667
func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
1668+
if prop.IsFlashOnlyProp() {
1669+
return nil
1670+
}
16401671
_, desc := prop.AllSameOrder()
1641-
enforcedAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes))
1672+
allTaskTypes := prop.GetAllPossibleChildTaskTypes()
1673+
enforcedAggs := make([]PhysicalPlan, 0, len(allTaskTypes))
16421674
childProp := &property.PhysicalProperty{
16431675
ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
16441676
Enforced: true,
@@ -1665,6 +1697,10 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
16651697
}
16661698

16671699
func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
1700+
// todo support CopTiFlash task type in stream agg
1701+
if prop.IsFlashOnlyProp() {
1702+
return nil
1703+
}
16681704
all, desc := prop.AllSameOrder()
16691705
if !all {
16701706
return nil
@@ -1680,7 +1716,8 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
16801716
return nil
16811717
}
16821718

1683-
streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(wholeTaskTypes)-1)+len(wholeTaskTypes))
1719+
allTaskTypes := prop.GetAllPossibleChildTaskTypes()
1720+
streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(allTaskTypes)-1)+len(allTaskTypes))
16841721
childProp := &property.PhysicalProperty{
16851722
ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
16861723
}
@@ -1721,11 +1758,14 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
17211758
if !prop.IsEmpty() {
17221759
return nil
17231760
}
1724-
hashAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes))
1725-
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType, property.CopTiFlashLocalReadTaskType, property.CopTiFlashGlobalReadTaskType}
1761+
hashAggs := make([]PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes()))
1762+
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType, property.CopTiFlashLocalReadTaskType}
17261763
if !la.aggHints.preferAggToCop {
17271764
taskTypes = append(taskTypes, property.RootTaskType)
17281765
}
1766+
if prop.IsFlashOnlyProp() {
1767+
taskTypes = []property.TaskType{prop.TaskTp}
1768+
}
17291769
for _, taskTp := range taskTypes {
17301770
agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp})
17311771
agg.SetSchema(la.schema.Clone())
@@ -1795,8 +1835,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P
17951835
if !prop.IsEmpty() {
17961836
return nil
17971837
}
1798-
ret := make([]PhysicalPlan, 0, len(wholeTaskTypes))
1799-
for _, tp := range wholeTaskTypes {
1838+
allTaskTypes := prop.GetAllPossibleChildTaskTypes()
1839+
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
1840+
for _, tp := range allTaskTypes {
18001841
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset)}
18011842
limit := PhysicalLimit{
18021843
Offset: p.Offset,
@@ -1808,6 +1849,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P
18081849
}
18091850

18101851
func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
1852+
if prop.IsFlashOnlyProp() {
1853+
return nil
1854+
}
18111855
childProp := prop.Clone()
18121856
lock := PhysicalLock{
18131857
Lock: p.Lock,
@@ -1819,7 +1863,7 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) []Ph
18191863

18201864
func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
18211865
// TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order.
1822-
if !prop.IsEmpty() {
1866+
if !prop.IsEmpty() || prop.IsFlashOnlyProp() {
18231867
return nil
18241868
}
18251869
chReqProps := make([]*property.PhysicalProperty, 0, len(p.children))
@@ -1860,7 +1904,7 @@ func (ls *LogicalSort) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P
18601904
}
18611905

18621906
func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
1863-
if !prop.IsEmpty() {
1907+
if !prop.IsEmpty() || prop.IsFlashOnlyProp() {
18641908
return nil
18651909
}
18661910
mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2})

planner/core/explain.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ func (p *PhysicalTableScan) OperatorInfo(normalized bool) string {
213213
if p.stats.StatsVersion == statistics.PseudoVersion && !normalized {
214214
buffer.WriteString("stats:pseudo, ")
215215
}
216+
if p.IsGlobalRead {
217+
buffer.WriteString("global read, ")
218+
}
216219
buffer.Truncate(buffer.Len() - 2)
217220
return buffer.String()
218221
}

planner/core/find_best_task.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,6 @@ var aggFuncFactor = map[string]float64{
6060
"default": 1.5,
6161
}
6262

63-
// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get
64-
// these tasks one by one.
65-
var wholeTaskTypes = [...]property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType, property.RootTaskType}
66-
6763
var invalidTask = &rootTask{cst: math.MaxFloat64}
6864

6965
// GetPropByOrderByItems will check if this sort property can be pushed or not. In order to simplify the problem, we only
@@ -146,7 +142,7 @@ func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTas
146142
return bestTask, nil
147143
}
148144

149-
if prop.TaskTp != property.RootTaskType {
145+
if prop.TaskTp != property.RootTaskType && prop.TaskTp != property.CopTiFlashLocalReadTaskType && prop.TaskTp != property.CopTiFlashGlobalReadTaskType {
150146
// Currently all plan cannot totally push down.
151147
p.storeTask(prop, invalidTask)
152148
return invalidTask, nil

planner/core/physical_plans.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (p *PhysicalTableReader) GetTableScan() *PhysicalTableScan {
8787
curPlan = curPlan.Children()[0]
8888
} else {
8989
join := curPlan.(*PhysicalBroadCastJoin)
90-
curPlan = join.children[1-join.InnerChildIdx]
90+
curPlan = join.children[1-join.globalChildIndex]
9191
}
9292
}
9393
}
@@ -438,6 +438,7 @@ type PhysicalMergeJoin struct {
438438

439439
type PhysicalBroadCastJoin struct {
440440
basePhysicalJoin
441+
globalChildIndex int
441442
}
442443

443444
// PhysicalLock is the physical operator of lock, which is used for `select ... for update` clause.

planner/property/physical_property.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import (
2020
"github.com/pingcap/tidb/util/codec"
2121
)
2222

23+
// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get
24+
// these tasks one by one.
25+
var wholeTaskTypes = []TaskType{CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType}
26+
2327
// Item wraps the column and its order.
2428
type Item struct {
2529
Col *expression.Column
@@ -83,6 +87,19 @@ func (p *PhysicalProperty) AllColsFromSchema(schema *expression.Schema) bool {
8387
return true
8488
}
8589

90+
// IsFlashOnlyProp return true if this physical property is only allowed to generate flash related task
91+
func (p *PhysicalProperty) IsFlashOnlyProp() bool {
92+
return p.TaskTp == CopTiFlashLocalReadTaskType || p.TaskTp == CopTiFlashGlobalReadTaskType
93+
}
94+
95+
func (p *PhysicalProperty) GetAllPossibleChildTaskTypes() []TaskType {
96+
if p.TaskTp == RootTaskType {
97+
return wholeTaskTypes
98+
}
99+
// todo for CopSingleReadTaskType and CopDoubleReadTaskType, this function should never be called
100+
return []TaskType{p.TaskTp}
101+
}
102+
86103
// IsPrefix checks whether the order property is the prefix of another.
87104
func (p *PhysicalProperty) IsPrefix(prop *PhysicalProperty) bool {
88105
if len(p.Items) > len(prop.Items) {

planner/property/task_type.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@ const (
2929
CopDoubleReadTaskType
3030

3131
// CopTiFlashLocalReadTaskType stands for flash coprocessor that read data locally,
32-
// and only a part of the data is read in one cop task
32+
// and only a part of the data is read in one cop task, if the current task type is
33+
// CopTiFlashLocalReadTaskType, all its children prop's task type is CopTiFlashLocalReadTaskType
3334
CopTiFlashLocalReadTaskType
3435

3536
// CopTiFlashGlobalReadTaskType stands for flash coprocessor that read data globally
36-
// and all the data of given table will be read in one cop task
37+
// and all the data of given table will be read in one cop task, if the current task
38+
// type is CopTiFlashGlobalReadTaskType, all its children prop's task type is
39+
// CopTiFlashGlobalReadTaskType
3740
CopTiFlashGlobalReadTaskType
3841
)
3942

0 commit comments

Comments
 (0)