Skip to content

Commit 1bc3eb5

Browse files
seawindedataroaring
authored andcommitted
[feature](mtmv) Support variants rewrite by materialized view (#37929)
Support variants rewrite by materialized view Such as the mv def is as following: CREATE MATERIALIZED VIEW test1 BUILD IMMEDIATE REFRESH AUTO ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT id, type, actor, payload, payload['pull_request'] FROM github_events1; the query following can rewrite successfully and the data is right SELECT id, type, floor(cast(actor['id'] as int) + 100.5), actor['display_login'], payload['pull_request']['id'] FROM github_events1;
1 parent bcea911 commit 1bc3eb5

File tree

11 files changed

+1687
-45
lines changed

11 files changed

+1687
-45
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewAggregateRule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
119119
queryTopPlan,
120120
materializationContext.getShuttledExprToScanExprMapping(),
121121
viewToQuerySlotMapping,
122-
true,
123122
queryStructInfo.getTableBitSet());
124123
boolean isRewrittenQueryExpressionValid = true;
125124
if (!rewrittenQueryExpressions.isEmpty()) {

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewJoinRule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
4949
queryStructInfo.getTopPlan(),
5050
materializationContext.getShuttledExprToScanExprMapping(),
5151
targetToSourceMapping,
52-
true,
5352
queryStructInfo.getTableBitSet()
5453
);
5554
// Can not rewrite, bail out

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java

Lines changed: 119 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@
4444
import org.apache.doris.nereids.trees.expressions.Slot;
4545
import org.apache.doris.nereids.trees.expressions.SlotReference;
4646
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
47+
import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
4748
import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable;
4849
import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable;
4950
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
5051
import org.apache.doris.nereids.trees.expressions.literal.Literal;
52+
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
5153
import org.apache.doris.nereids.trees.plans.JoinType;
5254
import org.apache.doris.nereids.trees.plans.Plan;
5355
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
@@ -56,6 +58,7 @@
5658
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
5759
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
5860
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
61+
import org.apache.doris.nereids.types.VariantType;
5962
import org.apache.doris.nereids.util.ExpressionUtils;
6063
import org.apache.doris.nereids.util.TypeUtils;
6164
import org.apache.doris.qe.SessionVariable;
@@ -114,7 +117,7 @@ public List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
114117
continue;
115118
}
116119
// check mv plan is valid or not
117-
if (!isMaterializationValid(cascadesContext, context)) {
120+
if (!isMaterializationValid(queryPlan, cascadesContext, context)) {
118121
continue;
119122
}
120123
// get query struct infos according to the view strut info, if valid query struct infos is empty, bail out
@@ -238,7 +241,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
238241
// Try to rewrite compensate predicates by using mv scan
239242
List<Expression> rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(),
240243
queryPlan, materializationContext.getShuttledExprToScanExprMapping(),
241-
viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet());
244+
viewToQuerySlotMapping, queryStructInfo.getTableBitSet());
242245
if (rewriteCompensatePredicates.isEmpty()) {
243246
materializationContext.recordFailReason(queryStructInfo,
244247
"Rewrite compensate predicate by view fail",
@@ -521,33 +524,20 @@ protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInf
521524
* @param sourcePlan the source plan witch the source expression belong to
522525
* @param targetExpressionMapping target expression mapping, if finding the expression in key set of the mapping
523526
* then use the corresponding value of mapping to replace it
524-
* @param targetExpressionNeedSourceBased if targetExpressionNeedSourceBased is true,
525-
* we should make the target expression map key to source based,
526-
* Note: the key expression in targetExpressionMapping should be shuttled. with the method
527-
* ExpressionUtils.shuttleExpressionWithLineage.
528-
* example as following:
529-
* source target
530-
* project(slot 1, 2) project(slot 3, 2, 1)
531-
* scan(table) scan(table)
532-
* then
533-
* transform source to:
534-
* project(slot 2, 1)
535-
* target
536527
*/
537528
protected List<Expression> rewriteExpression(List<? extends Expression> sourceExpressionsToWrite, Plan sourcePlan,
538-
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping,
539-
boolean targetExpressionNeedSourceBased, BitSet sourcePlanBitSet) {
529+
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping, BitSet sourcePlanBitSet) {
540530
// Firstly, rewrite the target expression using source with inverse mapping
541531
// then try to use the target expression to represent the query. if any of source expressions
542532
// can not be represented by target expressions, return null.
543533
// generate target to target replacement expression mapping, and change target expression to source based
544534
List<? extends Expression> sourceShuttledExpressions = ExpressionUtils.shuttleExpressionWithLineage(
545535
sourceExpressionsToWrite, sourcePlan, sourcePlanBitSet);
546-
ExpressionMapping expressionMappingKeySourceBased = targetExpressionNeedSourceBased
547-
? targetExpressionMapping.keyPermute(targetToSourceMapping) : targetExpressionMapping;
536+
ExpressionMapping expressionMappingKeySourceBased = targetExpressionMapping.keyPermute(targetToSourceMapping);
548537
// target to target replacement expression mapping, because mv is 1:1 so get first element
549538
List<Map<Expression, Expression>> flattenExpressionMap = expressionMappingKeySourceBased.flattenMap();
550-
Map<? extends Expression, ? extends Expression> targetToTargetReplacementMapping = flattenExpressionMap.get(0);
539+
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased =
540+
flattenExpressionMap.get(0);
551541

552542
List<Expression> rewrittenExpressions = new ArrayList<>();
553543
for (Expression expressionShuttledToRewrite : sourceShuttledExpressions) {
@@ -557,8 +547,13 @@ protected List<Expression> rewriteExpression(List<? extends Expression> sourceEx
557547
}
558548
final Set<Object> slotsToRewrite =
559549
expressionShuttledToRewrite.collectToSet(expression -> expression instanceof Slot);
550+
551+
final Set<SlotReference> variants =
552+
expressionShuttledToRewrite.collectToSet(expression -> expression instanceof SlotReference
553+
&& ((SlotReference) expression).getDataType() instanceof VariantType);
554+
extendMappingByVariant(variants, targetToTargetReplacementMappingQueryBased);
560555
Expression replacedExpression = ExpressionUtils.replace(expressionShuttledToRewrite,
561-
targetToTargetReplacementMapping);
556+
targetToTargetReplacementMappingQueryBased);
562557
if (replacedExpression.anyMatch(slotsToRewrite::contains)) {
563558
// if contains any slot to rewrite, which means can not be rewritten by target, bail out
564559
return ImmutableList.of();
@@ -568,6 +563,94 @@ protected List<Expression> rewriteExpression(List<? extends Expression> sourceEx
568563
return rewrittenExpressions;
569564
}
570565

566+
/**
567+
* if query contains variant slot reference, extend the expression mapping for rewrte
568+
* such as targetToTargetReplacementMappingQueryBased is
569+
* id#0 -> id#8
570+
* type#1 -> type#9
571+
* payload#4 -> payload#10
572+
* query variants is payload['issue']['number']#20
573+
* then we can add payload['issue']['number']#20 -> element_at(element_at(payload#10, 'issue'), 'number')
574+
* to targetToTargetReplacementMappingQueryBased
575+
* */
576+
private void extendMappingByVariant(Set<SlotReference> queryVariants,
577+
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased) {
578+
if (queryVariants.isEmpty()) {
579+
return;
580+
}
581+
Map<List<String>, Expression> viewNameToExprMap = new HashMap<>();
582+
for (Map.Entry<Expression, Expression> targetExpressionEntry :
583+
targetToTargetReplacementMappingQueryBased.entrySet()) {
584+
if (targetExpressionEntry.getKey() instanceof SlotReference
585+
&& ((SlotReference) targetExpressionEntry.getKey()).getDataType() instanceof VariantType) {
586+
SlotReference targetSlotReference = (SlotReference) targetExpressionEntry.getKey();
587+
List<String> nameIdentifier = new ArrayList<>(targetSlotReference.getQualifier());
588+
nameIdentifier.add(targetSlotReference.getName());
589+
nameIdentifier.addAll(targetSlotReference.getSubPath());
590+
viewNameToExprMap.put(nameIdentifier, targetExpressionEntry.getValue());
591+
}
592+
}
593+
if (viewNameToExprMap.isEmpty()) {
594+
return;
595+
}
596+
Map<List<String>, SlotReference> queryNameAndExpressionMap = new HashMap<>();
597+
for (SlotReference slotReference : queryVariants) {
598+
List<String> nameIdentifier = new ArrayList<>(slotReference.getQualifier());
599+
nameIdentifier.add(slotReference.getName());
600+
nameIdentifier.addAll(slotReference.getSubPath());
601+
queryNameAndExpressionMap.put(nameIdentifier, slotReference);
602+
}
603+
for (Map.Entry<List<String>, ? extends Expression> queryNameEntry : queryNameAndExpressionMap.entrySet()) {
604+
Expression minExpr = null;
605+
List<String> minCompensateName = null;
606+
for (Map.Entry<List<String>, Expression> entry : viewNameToExprMap.entrySet()) {
607+
if (!containsAllWithOrder(queryNameEntry.getKey(), entry.getKey())) {
608+
continue;
609+
}
610+
List<String> removedQueryName = new ArrayList<>(queryNameEntry.getKey());
611+
removedQueryName.removeAll(entry.getKey());
612+
if (minCompensateName == null) {
613+
minCompensateName = removedQueryName;
614+
minExpr = entry.getValue();
615+
}
616+
if (removedQueryName.size() < minCompensateName.size()) {
617+
minCompensateName = removedQueryName;
618+
minExpr = entry.getValue();
619+
}
620+
}
621+
if (minExpr != null) {
622+
targetToTargetReplacementMappingQueryBased.put(queryNameEntry.getValue(),
623+
constructElementAt(minExpr, minCompensateName));
624+
}
625+
}
626+
}
627+
628+
private static Expression constructElementAt(Expression target, List<String> atList) {
629+
Expression elementAt = target;
630+
for (String at : atList) {
631+
elementAt = new ElementAt(elementAt, new VarcharLiteral(at));
632+
}
633+
return elementAt;
634+
}
635+
636+
// source names is contain all target with order or not
637+
private static boolean containsAllWithOrder(List<String> sourceNames, List<String> targetNames) {
638+
if (sourceNames.size() < targetNames.size()) {
639+
return false;
640+
}
641+
for (int index = 0; index < targetNames.size(); index++) {
642+
String sourceName = sourceNames.get(index);
643+
String targetName = targetNames.get(index);
644+
if (sourceName == null || targetName == null) {
645+
return false;
646+
}
647+
if (!sourceName.equals(targetName)) {
648+
return false;
649+
}
650+
}
651+
return true;
652+
}
653+
571654
/**
572655
* Normalize expression with query, keep the consistency of exprId and nullable props with
573656
* query
@@ -753,7 +836,8 @@ protected boolean checkIfRewritten(Plan plan, MaterializationContext context) {
753836
}
754837

755838
// check mv plan is valid or not, this can use cache for performance
756-
private boolean isMaterializationValid(CascadesContext cascadesContext, MaterializationContext context) {
839+
private boolean isMaterializationValid(Plan queryPlan, CascadesContext cascadesContext,
840+
MaterializationContext context) {
757841
long materializationId = context.generateMaterializationIdentifier().hashCode();
758842
Boolean cachedCheckResult = cascadesContext.getMemo().materializationHasChecked(this.getClass(),
759843
materializationId);
@@ -764,6 +848,11 @@ private boolean isMaterializationValid(CascadesContext cascadesContext, Material
764848
context.recordFailReason(context.getStructInfo(),
765849
"View struct info is invalid", () -> String.format("view plan is %s",
766850
context.getStructInfo().getOriginalPlan().treeString()));
851+
// tmp to location question
852+
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
853+
+ "view plan is %s",
854+
context.generateMaterializationIdentifier(), queryPlan.treeString(),
855+
context.getStructInfo().getTopPlan().treeString()));
767856
cascadesContext.getMemo().recordMaterializationCheckResult(this.getClass(), materializationId,
768857
false);
769858
return false;
@@ -775,12 +864,20 @@ private boolean isMaterializationValid(CascadesContext cascadesContext, Material
775864
context.recordFailReason(context.getStructInfo(),
776865
"View struct info is invalid", () -> String.format("view plan is %s",
777866
context.getStructInfo().getOriginalPlan().treeString()));
867+
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
868+
+ "view plan is %s",
869+
context.generateMaterializationIdentifier(), queryPlan.treeString(),
870+
context.getStructInfo().getTopPlan().treeString()));
778871
return false;
779872
}
780873
if (!context.getStructInfo().isValid()) {
781874
context.recordFailReason(context.getStructInfo(),
782-
"View struct info is invalid", () -> String.format("view plan is %s",
875+
"View original struct info is invalid", () -> String.format("view plan is %s",
783876
context.getStructInfo().getOriginalPlan().treeString()));
877+
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
878+
+ "view plan is %s",
879+
context.generateMaterializationIdentifier(), queryPlan.treeString(),
880+
context.getStructInfo().getTopPlan().treeString()));
784881
return false;
785882
}
786883
return true;

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewScanRule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
5050
queryStructInfo.getTopPlan(),
5151
materializationContext.getShuttledExprToScanExprMapping(),
5252
targetToSourceMapping,
53-
true,
5453
queryStructInfo.getTableBitSet()
5554
);
5655
// Can not rewrite, bail out

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/mapping/Mapping.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919

2020
import org.apache.doris.nereids.trees.expressions.ExprId;
2121
import org.apache.doris.nereids.trees.expressions.Slot;
22+
import org.apache.doris.nereids.trees.expressions.SlotReference;
2223
import org.apache.doris.nereids.trees.plans.RelationId;
2324
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
2425

26+
import com.google.common.collect.ImmutableList;
27+
28+
import java.util.ArrayList;
2529
import java.util.HashMap;
30+
import java.util.List;
2631
import java.util.Map;
2732
import java.util.Objects;
2833
import javax.annotation.Nullable;
@@ -41,13 +46,24 @@ public static final class MappedRelation {
4146
public final RelationId relationId;
4247
public final CatalogRelation belongedRelation;
4348
// Generate eagerly, will be used to generate slot mapping
44-
private final Map<String, Slot> slotNameToSlotMap = new HashMap<>();
49+
private final Map<List<String>, Slot> slotNameToSlotMap = new HashMap<>();
4550

51+
/**
52+
* Construct relation and slot map
53+
*/
4654
public MappedRelation(RelationId relationId, CatalogRelation belongedRelation) {
4755
this.relationId = relationId;
4856
this.belongedRelation = belongedRelation;
4957
for (Slot slot : belongedRelation.getOutput()) {
50-
slotNameToSlotMap.put(slot.getName(), slot);
58+
if (slot instanceof SlotReference) {
59+
// variant slot
60+
List<String> slotNames = new ArrayList<>();
61+
slotNames.add(slot.getName());
62+
slotNames.addAll(((SlotReference) slot).getSubPath());
63+
slotNameToSlotMap.put(slotNames, slot);
64+
} else {
65+
slotNameToSlotMap.put(ImmutableList.of(slot.getName()), slot);
66+
}
5167
}
5268
}
5369

@@ -63,7 +79,7 @@ public CatalogRelation getBelongedRelation() {
6379
return belongedRelation;
6480
}
6581

66-
public Map<String, Slot> getSlotNameToSlotMap() {
82+
public Map<List<String>, Slot> getSlotNameToSlotMap() {
6783
return slotNameToSlotMap;
6884
}
6985

0 commit comments

Comments
 (0)