|
40 | 40 | import org.apache.doris.planner.DataSink;
|
41 | 41 | import org.apache.doris.planner.DataStreamSink;
|
42 | 42 | import org.apache.doris.planner.ExchangeNode;
|
| 43 | +import org.apache.doris.planner.MultiCastDataSink; |
43 | 44 | import org.apache.doris.planner.OlapTableSink;
|
44 | 45 | import org.apache.doris.planner.PlanFragment;
|
45 | 46 | import org.apache.doris.qe.ConnectContext;
|
@@ -139,7 +140,28 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys
|
139 | 140 | // set schema and partition info for tablet id shuffle exchange
|
140 | 141 | if (fragment.getPlanRoot() instanceof ExchangeNode
|
141 | 142 | && fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
|
142 |
| - DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink()); |
| 143 | + DataSink childFragmentSink = fragment.getChild(0).getSink(); |
| 144 | + DataStreamSink dataStreamSink = null; |
| 145 | + if (childFragmentSink instanceof MultiCastDataSink) { |
| 146 | + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink; |
| 147 | + int outputExchangeId = (fragment.getPlanRoot()).getId().asInt(); |
| 148 | + // which DataStreamSink link to the output exchangeNode? |
| 149 | + for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) { |
| 150 | + int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt(); |
| 151 | + if (outputExchangeId == sinkExchangeId) { |
| 152 | + dataStreamSink = currentDataStreamSink; |
| 153 | + break; |
| 154 | + } |
| 155 | + } |
| 156 | + if (dataStreamSink == null) { |
| 157 | + throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink"); |
| 158 | + } |
| 159 | + } else if (childFragmentSink instanceof DataStreamSink) { |
| 160 | + dataStreamSink = (DataStreamSink) childFragmentSink; |
| 161 | + } else { |
| 162 | + throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink); |
| 163 | + } |
| 164 | + |
143 | 165 | Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
|
144 | 166 | dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
|
145 | 167 | database.getId(), olapTableSink.getDstTable(), analyzer));
|
|
0 commit comments