|
41 | 41 | import com.fasterxml.jackson.databind.Module;
|
42 | 42 | import com.google.common.collect.ImmutableMap;
|
43 | 43 | import com.google.common.collect.Lists;
|
| 44 | +import com.google.common.collect.Maps; |
44 | 45 | import io.airlift.concurrent.BoundedExecutor;
|
45 | 46 | import io.airlift.concurrent.MoreFutures;
|
46 | 47 | import io.airlift.concurrent.Threads;
|
|
66 | 67 | import io.trino.spi.connector.ConstraintApplicationResult;
|
67 | 68 | import io.trino.spi.connector.DynamicFilter;
|
68 | 69 | import io.trino.spi.connector.LimitApplicationResult;
|
| 70 | +import io.trino.spi.connector.ProjectionApplicationResult; |
| 71 | +import io.trino.spi.expression.ConnectorExpression; |
| 72 | +import io.trino.spi.expression.Variable; |
69 | 73 | import io.trino.spi.predicate.TupleDomain;
|
70 | 74 | import io.trino.spi.type.TypeManager;
|
71 | 75 | import io.trino.split.BufferingSplitSource;
|
@@ -177,29 +181,22 @@ private void applyPushDown(ConnectorSession connectorSession) {
|
177 | 181 | + " after pushing down.");
|
178 | 182 | }
|
179 | 183 |
|
180 |
| - // TODO(ftw): push down projection |
181 |
| - // Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap(); |
182 |
| - // Map<String, ColumnHandle> assignments = Maps.newHashMap(); |
183 |
| - // if (source.getTargetTable().getName().equals("customer")) { |
184 |
| - // assignments.put("c_custkey", columnHandleMap.get("c_custkey")); |
185 |
| - // assignments.put("c_mktsegment", columnHandleMap.get("c_mktsegment")); |
186 |
| - // } else if (source.getTargetTable().getName().equals("orders")) { |
187 |
| - // assignments.put("o_orderkey", columnHandleMap.get("o_orderkey")); |
188 |
| - // assignments.put("o_custkey", columnHandleMap.get("o_custkey")); |
189 |
| - // assignments.put("o_orderdate", columnHandleMap.get("o_orderdate")); |
190 |
| - // assignments.put("o_shippriority", columnHandleMap.get("o_shippriority")); |
191 |
| - // } else if (source.getTargetTable().getName().equals("lineitem")) { |
192 |
| - // assignments.put("l_orderkey", columnHandleMap.get("l_orderkey")); |
193 |
| - // assignments.put("l_extendedprice", columnHandleMap.get("l_extendedprice")); |
194 |
| - // assignments.put("l_discount", columnHandleMap.get("l_discount")); |
195 |
| - // assignments.put("l_shipdate", columnHandleMap.get("l_shipdate")); |
196 |
| - // } |
197 |
| - // Optional<ProjectionApplicationResult<ConnectorTableHandle>> projectionResult |
198 |
| - // = connectorMetadata.applyProjection(connectorSession, source.getTrinoConnectorTableHandle(), |
199 |
| - // Lists.newArrayList(), assignments); |
200 |
| - // if (projectionResult.isPresent()) { |
201 |
| - // source.setTrinoConnectorTableHandle(projectionResult.get().getHandle()); |
202 |
| - // } |
| 184 | + // push down projection |
| 185 | + Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap(); |
| 186 | + Map<String, ColumnMetadata> columnMetadataMap = source.getTargetTable().getColumnMetadataMap(); |
| 187 | + Map<String, ColumnHandle> assignments = Maps.newLinkedHashMap(); |
| 188 | + List<ConnectorExpression> projections = Lists.newArrayList(); |
| 189 | + for (SlotDescriptor slotDescriptor : desc.getSlots()) { |
| 190 | + String colName = slotDescriptor.getColumn().getName(); |
| 191 | + assignments.put(colName, columnHandleMap.get(colName)); |
| 192 | + projections.add(new Variable(colName, columnMetadataMap.get(colName).getType())); |
| 193 | + } |
| 194 | + Optional<ProjectionApplicationResult<ConnectorTableHandle>> projectionResult |
| 195 | + = connectorMetadata.applyProjection(connectorSession, source.getTrinoConnectorTableHandle(), |
| 196 | + projections, assignments); |
| 197 | + if (projectionResult.isPresent()) { |
| 198 | + source.setTrinoConnectorTableHandle(projectionResult.get().getHandle()); |
| 199 | + } |
203 | 200 | }
|
204 | 201 |
|
205 | 202 | private SplitSource getTrinoSplitSource(Connector connector, Session session, ConnectorTableHandle table,
|
|
0 commit comments