Skip to content

Commit 7216734

Browse files
authored
[fix)(colocate join) fix wrong use of colocate join (#37361)
1 parent 5dd2780 commit 7216734

File tree

5 files changed

+137
-14
lines changed

5 files changed

+137
-14
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public PhysicalProperties visitPhysicalHashJoin(
279279
case RIGHT_SEMI_JOIN:
280280
case RIGHT_ANTI_JOIN:
281281
case RIGHT_OUTER_JOIN:
282-
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
282+
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
283283
return new PhysicalProperties(rightHashSpec);
284284
} else {
285285
// retain left shuffle type, since coordinator use left most node to schedule fragment

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ public Boolean visitPhysicalHashJoin(
293293
Optional<PhysicalProperties> updatedForLeft = Optional.empty();
294294
Optional<PhysicalProperties> updatedForRight = Optional.empty();
295295

296-
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
296+
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
297297
// check colocate join with scan
298298
return true;
299299
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) {

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,17 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) {
8383
List<Slot> output = olapScan.getOutput();
8484
List<Slot> baseOutput = olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId());
8585
List<ExprId> hashColumns = Lists.newArrayList();
86-
for (Slot slot : output) {
87-
for (Column column : hashDistributionInfo.getDistributionColumns()) {
86+
for (Column column : hashDistributionInfo.getDistributionColumns()) {
87+
for (Slot slot : output) {
8888
if (((SlotReference) slot).getColumn().get().getNameWithoutMvPrefix()
8989
.equals(column.getName())) {
9090
hashColumns.add(slot.getExprId());
9191
}
9292
}
9393
}
9494
if (hashColumns.size() != hashDistributionInfo.getDistributionColumns().size()) {
95-
for (Slot slot : baseOutput) {
96-
for (Column column : hashDistributionInfo.getDistributionColumns()) {
95+
for (Column column : hashDistributionInfo.getDistributionColumns()) {
96+
for (Slot slot : baseOutput) {
9797
// If the length of the column in the bucket key changes after DDL, the length cannot be
9898
// determined. As a result, some bucket fields are lost in the query execution plan.
9999
// So here we use the column name to avoid this problem
@@ -109,8 +109,8 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) {
109109
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
110110
List<Slot> output = olapScan.getOutput();
111111
List<ExprId> hashColumns = Lists.newArrayList();
112-
for (Slot slot : output) {
113-
for (Column column : hashDistributionInfo.getDistributionColumns()) {
112+
for (Column column : hashDistributionInfo.getDistributionColumns()) {
113+
for (Slot slot : output) {
114114
// If the length of the column in the bucket key changes after DDL, the length cannot be
115115
// determined. As a result, some bucket fields are lost in the query execution plan.
116116
// So here we use the column name to avoid this problem

fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
3434
import org.apache.doris.nereids.trees.expressions.Not;
3535
import org.apache.doris.nereids.trees.expressions.Slot;
36+
import org.apache.doris.nereids.trees.expressions.SlotReference;
3637
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
3738
import org.apache.doris.nereids.trees.plans.JoinType;
3839
import org.apache.doris.nereids.trees.plans.Plan;
@@ -55,6 +56,7 @@
5556
import java.util.HashSet;
5657
import java.util.List;
5758
import java.util.Map;
59+
import java.util.Objects;
5860
import java.util.Set;
5961
import java.util.stream.Collectors;
6062

@@ -272,13 +274,14 @@ public static boolean shouldColocateJoin(AbstractPhysicalJoin<PhysicalPlan, Phys
272274
return false;
273275
}
274276
return couldColocateJoin((DistributionSpecHash) leftDistributionSpec,
275-
(DistributionSpecHash) rightDistributionSpec);
277+
(DistributionSpecHash) rightDistributionSpec, join.getHashJoinConjuncts());
276278
}
277279

278280
/**
279281
* could do colocate join with left and right child distribution spec.
280282
*/
281-
public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec) {
283+
public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec,
284+
List<Expression> conjuncts) {
282285
if (ConnectContext.get() == null
283286
|| ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
284287
return false;
@@ -300,12 +303,50 @@ public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, Distr
300303
boolean noNeedCheckColocateGroup = hitSameIndex && (leftTablePartitions.equals(rightTablePartitions))
301304
&& (leftTablePartitions.size() <= 1);
302305
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
303-
if (noNeedCheckColocateGroup
304-
|| (colocateIndex.isSameGroup(leftTableId, rightTableId)
305-
&& !colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) {
306+
if (noNeedCheckColocateGroup) {
306307
return true;
307308
}
308-
return false;
309+
if (!colocateIndex.isSameGroup(leftTableId, rightTableId)
310+
|| colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) {
311+
return false;
312+
}
313+
314+
Set<Integer> equalIndices = new HashSet<>();
315+
for (Expression expr : conjuncts) {
316+
// only simple equal predicate can use colocate join
317+
if (!(expr instanceof EqualPredicate)) {
318+
return false;
319+
}
320+
Expression leftChild = ((EqualPredicate) expr).left();
321+
Expression rightChild = ((EqualPredicate) expr).right();
322+
if (!(leftChild instanceof SlotReference) || !(rightChild instanceof SlotReference)) {
323+
return false;
324+
}
325+
326+
SlotReference leftSlot = (SlotReference) leftChild;
327+
SlotReference rightSlot = (SlotReference) rightChild;
328+
Integer leftIndex = null;
329+
Integer rightIndex = null;
330+
if (leftSlot.getTable().isPresent() && leftSlot.getTable().get().getId() == leftHashSpec.getTableId()) {
331+
leftIndex = leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
332+
rightIndex = rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
333+
} else {
334+
leftIndex = rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
335+
rightIndex = leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
336+
}
337+
if (!Objects.equals(leftIndex, rightIndex)) {
338+
return false;
339+
}
340+
if (leftIndex != null) {
341+
equalIndices.add(leftIndex);
342+
}
343+
}
344+
// on conditions must contain all distributed columns
345+
if (equalIndices.containsAll(leftHashSpec.getExprIdToEquivalenceSet().values())) {
346+
return true;
347+
} else {
348+
return false;
349+
}
309350
}
310351

311352
public static Set<ExprId> getJoinOutputExprIdSet(Plan left, Plan right) {
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_colocate_join_of_column_order") {
19+
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
20+
// distributed by k1,k2
21+
sql """
22+
CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t1` (
23+
`k1` varchar(64) NULL,
24+
`k2` varchar(64) NULL,
25+
`v` int NULL
26+
) ENGINE=OLAP
27+
DUPLICATE KEY(`k1`,`k2`)
28+
COMMENT 'OLAP'
29+
DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 1
30+
PROPERTIES (
31+
"replication_num" = "1",
32+
"colocate_with" = "group_column_order"
33+
);
34+
"""
35+
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
36+
// distributed by k2,k1
37+
sql """
38+
CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t2` (
39+
`k1` varchar(64) NULL,
40+
`k2` varchar(64) NULL,
41+
`v` int NULL
42+
) ENGINE=OLAP
43+
DUPLICATE KEY(`k1`,`k2`)
44+
COMMENT 'OLAP'
45+
DISTRIBUTED BY HASH(`k2`,`k1`) BUCKETS 1
46+
PROPERTIES (
47+
"replication_num" = "1",
48+
"colocate_with" = "group_column_order"
49+
);
50+
"""
51+
sql """insert into test_colocate_join_of_column_order_t1 values('k1','k2',11);"""
52+
sql """insert into test_colocate_join_of_column_order_t2 values('k1','k2',11);"""
53+
54+
sql """set enable_nereids_planner=true; """
55+
explain {
56+
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1 and a.k2=b.k2;")
57+
notContains "COLOCATE"
58+
}
59+
explain {
60+
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2;")
61+
notContains "COLOCATE"
62+
}
63+
explain {
64+
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1;")
65+
notContains "COLOCATE"
66+
}
67+
explain {
68+
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;")
69+
notContains "COLOCATE"
70+
}
71+
explain {
72+
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;")
73+
contains "COLOCATE"
74+
}
75+
explain {
76+
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1 and a.v=b.v;")
77+
contains "COLOCATE"
78+
}
79+
80+
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
81+
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
82+
}

0 commit comments

Comments
 (0)