Skip to content

Commit 98b0b7e

Browse files
committed
Add support for JOIN PUSHDOWN in Exasol connector
1 parent bc0df32 commit 98b0b7e

File tree

4 files changed

+203
-32
lines changed

4 files changed

+203
-32
lines changed

docs/src/main/sphinx/connector/exasol.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,16 @@ FROM
206206

207207
```{include} query-table-function-ordering.fragment
208208
```
209+
210+
(exasol-pushdown)=
211+
### Pushdown
212+
213+
The connector supports pushdown for the following operations:
214+
215+
- {ref}`join-pushdown`
216+
217+
```{include} pushdown-correctness-behavior.fragment
218+
```
219+
220+
```{include} join-pushdown-enabled-true.fragment
221+
```

plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,12 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
150150
return switch (connectorBehavior) {
151151
case SUPPORTS_UPDATE -> true;
152152
case SUPPORTS_ADD_COLUMN_WITH_POSITION,
153-
SUPPORTS_CREATE_MATERIALIZED_VIEW,
154-
SUPPORTS_CREATE_VIEW,
155-
SUPPORTS_DEFAULT_COLUMN_VALUE,
156-
SUPPORTS_MERGE,
157-
SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN,
158-
SUPPORTS_ROW_LEVEL_UPDATE -> false;
153+
SUPPORTS_CREATE_MATERIALIZED_VIEW,
154+
SUPPORTS_CREATE_VIEW,
155+
SUPPORTS_DEFAULT_COLUMN_VALUE,
156+
SUPPORTS_MERGE,
157+
SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN,
158+
SUPPORTS_ROW_LEVEL_UPDATE -> false;
159159
// Dynamic filters can be pushed down only if predicate push down is supported.
160160
// It is possible for a connector to have predicate push down support but not push down dynamic filters.
161161
// TODO default SUPPORTS_DYNAMIC_FILTER_PUSHDOWN to SUPPORTS_PREDICATE_PUSHDOWN
@@ -1275,7 +1275,7 @@ public void testJoinPushdown()
12751275
try (TestTable nationLowercaseTable = newTrinoTable(
12761276
// If a connector supports Join pushdown, but does not allow CTAS, we need to make the table creation here overridable.
12771277
"nation_lowercase",
1278-
"AS SELECT nationkey, lower(name) name, regionkey FROM nation")) {
1278+
getNationLowerCaseTableDefinition())) {
12791279
for (JoinOperator joinOperator : JoinOperator.values()) {
12801280
log.info("Testing joinOperator=%s", joinOperator);
12811281

@@ -1290,12 +1290,7 @@ public void testJoinPushdown()
12901290
.setSystemProperty("enable_dynamic_filtering", "false")
12911291
.build();
12921292

1293-
List<String> nonEqualities = Stream.concat(
1294-
Stream.of(JoinCondition.Operator.values())
1295-
.filter(operator -> operator != JoinCondition.Operator.EQUAL && operator != JoinCondition.Operator.IDENTICAL)
1296-
.map(JoinCondition.Operator::getValue),
1297-
Stream.of("IS DISTINCT FROM", "IS NOT DISTINCT FROM"))
1298-
.collect(toImmutableList());
1293+
List<String> nonEqualities = getSupportedJoinConditionNonEqualities();
12991294

13001295
// basic case
13011296
assertThat(query(session, format("SELECT r.name, n.name FROM nation n %s region r ON n.regionkey = r.regionkey", joinOperator))).isFullyPushedDown();
@@ -1409,6 +1404,35 @@ public void testJoinPushdown()
14091404
}
14101405
}
14111406

1407+
protected List<String> getSupportedJoinConditionNonEqualitiesWithoutDistinctFrom()
1408+
{
1409+
List<String> result = new ArrayList<>();
1410+
for (JoinCondition.Operator op : JoinCondition.Operator.values()) {
1411+
if (op != JoinCondition.Operator.EQUAL && op != JoinCondition.Operator.IDENTICAL) {
1412+
result.add(op.getValue());
1413+
}
1414+
}
1415+
return result;
1416+
}
1417+
1418+
protected List<String> getSupportedJoinConditionNonEqualitiesWithDistinctFrom()
1419+
{
1420+
List<String> result = new ArrayList<>(getSupportedJoinConditionNonEqualitiesWithoutDistinctFrom());
1421+
result.add("IS DISTINCT FROM");
1422+
result.add("IS NOT DISTINCT FROM");
1423+
return result;
1424+
}
1425+
1426+
protected List<String> getSupportedJoinConditionNonEqualities()
1427+
{
1428+
return getSupportedJoinConditionNonEqualitiesWithDistinctFrom();
1429+
}
1430+
1431+
protected String getNationLowerCaseTableDefinition()
1432+
{
1433+
return "AS SELECT nationkey, lower(name) name, regionkey FROM nation";
1434+
}
1435+
14121436
@Test
14131437
public void testComplexJoinPushdown()
14141438
{

plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableSet;
1717
import com.google.inject.Inject;
18+
import io.trino.plugin.base.expression.ConnectorExpressionRewriter;
1819
import io.trino.plugin.base.mapping.IdentifierMapping;
1920
import io.trino.plugin.jdbc.BaseJdbcClient;
2021
import io.trino.plugin.jdbc.BaseJdbcConfig;
@@ -31,6 +32,9 @@
3132
import io.trino.plugin.jdbc.QueryBuilder;
3233
import io.trino.plugin.jdbc.WriteFunction;
3334
import io.trino.plugin.jdbc.WriteMapping;
35+
import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder;
36+
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
37+
import io.trino.plugin.jdbc.expression.RewriteIn;
3438
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
3539
import io.trino.spi.TrinoException;
3640
import io.trino.spi.connector.AggregateFunction;
@@ -39,7 +43,11 @@
3943
import io.trino.spi.connector.ColumnPosition;
4044
import io.trino.spi.connector.ConnectorSession;
4145
import io.trino.spi.connector.ConnectorTableMetadata;
46+
import io.trino.spi.expression.ConnectorExpression;
47+
import io.trino.spi.type.CharType;
48+
import io.trino.spi.type.DecimalType;
4249
import io.trino.spi.type.Type;
50+
import io.trino.spi.type.VarcharType;
4351

4452
import java.sql.Connection;
4553
import java.sql.Date;
@@ -52,19 +60,36 @@
5260
import java.util.Set;
5361

5462
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
63+
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
5564
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanColumnMapping;
65+
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
66+
import static io.trino.plugin.jdbc.StandardColumnMappings.charWriteFunction;
5667
import static io.trino.plugin.jdbc.StandardColumnMappings.decimalColumnMapping;
5768
import static io.trino.plugin.jdbc.StandardColumnMappings.defaultCharColumnMapping;
5869
import static io.trino.plugin.jdbc.StandardColumnMappings.defaultVarcharColumnMapping;
5970
import static io.trino.plugin.jdbc.StandardColumnMappings.doubleColumnMapping;
71+
import static io.trino.plugin.jdbc.StandardColumnMappings.doubleWriteFunction;
6072
import static io.trino.plugin.jdbc.StandardColumnMappings.integerColumnMapping;
73+
import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
74+
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
75+
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
6176
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
77+
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
78+
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
79+
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
6280
import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling;
6381
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR;
6482
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
6583
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
84+
import static io.trino.spi.type.BigintType.BIGINT;
85+
import static io.trino.spi.type.BooleanType.BOOLEAN;
6686
import static io.trino.spi.type.DateType.DATE;
6787
import static io.trino.spi.type.DecimalType.createDecimalType;
88+
import static io.trino.spi.type.DoubleType.DOUBLE;
89+
import static io.trino.spi.type.IntegerType.INTEGER;
90+
import static io.trino.spi.type.SmallintType.SMALLINT;
91+
import static io.trino.spi.type.TinyintType.TINYINT;
92+
import static java.lang.String.format;
6893
import static java.util.Locale.ENGLISH;
6994

7095
public class ExasolClient
@@ -75,6 +100,9 @@ public class ExasolClient
75100
.add("SYS")
76101
.build();
77102

103+
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
104+
private static final int EXASOL_MAX_SUPPORTED_VARCHAR_SIZE = 20000;
105+
78106
@Inject
79107
public ExasolClient(
80108
BaseJdbcConfig config,
@@ -84,6 +112,29 @@ public ExasolClient(
84112
RemoteQueryModifier queryModifier)
85113
{
86114
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
115+
this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
116+
.addStandardRules(this::quoted)
117+
.add(new RewriteIn())
118+
.withTypeClass("integer_type", ImmutableSet.of("tinyint", "smallint", "integer", "bigint"))
119+
.withTypeClass("numeric_type", ImmutableSet.of("tinyint", "smallint", "integer", "bigint", "decimal", "real", "double"))
120+
.map("$equal(left, right)").to("left = right")
121+
.map("$not_equal(left, right)").to("left <> right")
122+
.map("$less_than(left, right)").to("left < right")
123+
.map("$less_than_or_equal(left, right)").to("left <= right")
124+
.map("$greater_than(left, right)").to("left > right")
125+
.map("$greater_than_or_equal(left, right)").to("left >= right")
126+
.map("$add(left: integer_type, right: integer_type)").to("left + right")
127+
.map("$subtract(left: integer_type, right: integer_type)").to("left - right")
128+
.map("$multiply(left: integer_type, right: integer_type)").to("left * right")
129+
.map("$divide(left: integer_type, right: integer_type)").to("left / right")
130+
.map("$modulus(left: integer_type, right: integer_type)").to("left % right")
131+
.map("$negate(value: integer_type)").to("-value")
132+
.map("$like(value: varchar, pattern: varchar): boolean").to("value LIKE pattern")
133+
.map("$not($is_null(value))").to("value IS NOT NULL")
134+
.map("$not(value: boolean)").to("NOT value")
135+
.map("$is_null(value)").to("value IS NULL")
136+
.map("$nullif(first, second)").to("NULLIF(first, second)")
137+
.build();
87138
}
88139

89140
@Override
@@ -95,6 +146,12 @@ protected boolean filterSchema(String schemaName)
95146
return super.filterSchema(schemaName);
96147
}
97148

149+
@Override
150+
public Optional<ParameterizedExpression> convertPredicate(ConnectorSession session, ConnectorExpression expression, Map<String, ColumnHandle> assignments)
151+
{
152+
return connectorExpressionRewriter.rewrite(session, expression, assignments);
153+
}
154+
98155
@Override
99156
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
100157
{
@@ -188,8 +245,7 @@ protected void renameTable(ConnectorSession session, Connection connection, Stri
188245
@Override
189246
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
190247
{
191-
// Deactivated because test 'testJoinPushdown()' requires write access which is not implemented for Exasol
192-
return false;
248+
return true;
193249
}
194250

195251
@Override
@@ -265,7 +321,48 @@ private static LongWriteFunction dateWriteFunctionUsingSqlDate()
265321
@Override
266322
public WriteMapping toWriteMapping(ConnectorSession session, Type type)
267323
{
268-
throw new TrinoException(NOT_SUPPORTED, "This connector does not support writing");
324+
if (type == BOOLEAN) {
325+
return WriteMapping.booleanMapping("boolean", booleanWriteFunction());
326+
}
327+
328+
if (type == TINYINT) {
329+
return WriteMapping.longMapping("tinyint", tinyintWriteFunction());
330+
}
331+
if (type == SMALLINT) {
332+
return WriteMapping.longMapping("smallint", smallintWriteFunction());
333+
}
334+
if (type == INTEGER) {
335+
return WriteMapping.longMapping("integer", integerWriteFunction());
336+
}
337+
if (type == BIGINT) {
338+
return WriteMapping.longMapping("bigint", bigintWriteFunction());
339+
}
340+
341+
if (type == DOUBLE) {
342+
return WriteMapping.doubleMapping("double", doubleWriteFunction());
343+
}
344+
345+
if (type instanceof DecimalType decimalType) {
346+
String dataType = format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale());
347+
if (decimalType.isShort()) {
348+
return WriteMapping.longMapping(dataType, shortDecimalWriteFunction(decimalType));
349+
}
350+
return WriteMapping.objectMapping(dataType, longDecimalWriteFunction(decimalType));
351+
}
352+
353+
if (type instanceof CharType charType) {
354+
return WriteMapping.sliceMapping("char(" + charType.getLength() + ")", charWriteFunction());
355+
}
356+
357+
if (type instanceof VarcharType varcharType) {
358+
int length = varcharType.isUnbounded()
359+
? EXASOL_MAX_SUPPORTED_VARCHAR_SIZE
360+
: Math.min(varcharType.getBoundedLength(), EXASOL_MAX_SUPPORTED_VARCHAR_SIZE);
361+
String dataType = "varchar(" + length + ")";
362+
return WriteMapping.sliceMapping(dataType, varcharWriteFunction());
363+
}
364+
365+
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
269366
}
270367

271368
@Override

plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolConnectorTest.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.trino.testing.sql.SqlExecutor;
2424
import io.trino.testing.sql.TestTable;
2525
import io.trino.testing.sql.TestView;
26+
import org.intellij.lang.annotations.Language;
2627
import org.junit.jupiter.api.Test;
2728
import org.junit.jupiter.api.parallel.Isolated;
2829

@@ -38,6 +39,7 @@
3839
import static java.util.stream.IntStream.range;
3940
import static org.assertj.core.api.Assertions.assertThat;
4041
import static org.assertj.core.api.Assertions.assertThatThrownBy;
42+
import static org.junit.jupiter.api.Assumptions.abort;
4143

4244
@Isolated
4345
final class TestExasolConnectorTest
@@ -59,27 +61,27 @@ protected QueryRunner createQueryRunner()
5961
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
6062
{
6163
return switch (connectorBehavior) {
64+
case SUPPORTS_JOIN_PUSHDOWN -> true;
6265
// Tests requires write access which is not implemented
6366
case SUPPORTS_AGGREGATION_PUSHDOWN,
64-
SUPPORTS_JOIN_PUSHDOWN,
65-
SUPPORTS_LIMIT_PUSHDOWN,
66-
SUPPORTS_TOPN_PUSHDOWN -> false;
67+
SUPPORTS_TOPN_PUSHDOWN,
68+
SUPPORTS_LIMIT_PUSHDOWN -> false;
6769

6870
// Parallel writing is not supported due to restrictions of the Exasol JDBC driver.
6971
case SUPPORTS_ADD_COLUMN,
70-
SUPPORTS_ARRAY,
71-
SUPPORTS_COMMENT_ON_TABLE,
72-
SUPPORTS_CREATE_SCHEMA,
73-
SUPPORTS_CREATE_TABLE,
74-
SUPPORTS_DELETE,
75-
SUPPORTS_INSERT,
76-
SUPPORTS_MAP_TYPE,
77-
SUPPORTS_NEGATIVE_DATE, // min date is 0001-01-01
78-
SUPPORTS_RENAME_COLUMN,
79-
SUPPORTS_RENAME_TABLE,
80-
SUPPORTS_ROW_TYPE,
81-
SUPPORTS_SET_COLUMN_TYPE,
82-
SUPPORTS_UPDATE -> false;
72+
SUPPORTS_ARRAY,
73+
SUPPORTS_COMMENT_ON_TABLE,
74+
SUPPORTS_CREATE_SCHEMA,
75+
SUPPORTS_CREATE_TABLE,
76+
SUPPORTS_DELETE,
77+
SUPPORTS_INSERT,
78+
SUPPORTS_MAP_TYPE,
79+
SUPPORTS_NEGATIVE_DATE, // min date is 0001-01-01
80+
SUPPORTS_RENAME_COLUMN,
81+
SUPPORTS_RENAME_TABLE,
82+
SUPPORTS_ROW_TYPE,
83+
SUPPORTS_SET_COLUMN_TYPE,
84+
SUPPORTS_UPDATE -> false;
8385

8486
default -> super.hasBehavior(connectorBehavior);
8587
};
@@ -126,6 +128,27 @@ protected TestTable createTableWithUnsupportedColumn()
126128
"(one NUMBER(19), two GEOMETRY, three VARCHAR(10 CHAR))");
127129
}
128130

131+
@Override
132+
// Override, because read-only Exasol connector does not support CREATE TABLE and INSERT statements
133+
protected TestTable newTrinoTable(String namePrefix, @Language("SQL") String tableDefinition, List<String> rowsToInsert)
134+
{
135+
return new TestTable(exasolServer.getSqlExecutor(), TEST_SCHEMA + "." + namePrefix, tableDefinition, rowsToInsert);
136+
}
137+
138+
@Override
139+
// Override to add test schema prefix to the name of the nation table
140+
protected String getNationLowerCaseTableDefinition()
141+
{
142+
return "AS SELECT nationkey, lower(name) name, regionkey FROM %s.nation".formatted(TEST_SCHEMA);
143+
}
144+
145+
@Override
146+
// Override, because Exasol doesn't support "IS DISTINCT FROM" and "IS NOT DISTINCT FROM" inequalities
147+
protected List<String> getSupportedJoinConditionNonEqualities()
148+
{
149+
return getSupportedJoinConditionNonEqualitiesWithoutDistinctFrom();
150+
}
151+
129152
@Test
130153
@Override
131154
public void testShowColumns()
@@ -327,6 +350,20 @@ void testNativeMultipleInClauses()
327350
onRemoteDatabase().execute(format("SELECT count(*) FROM %s.orders WHERE %s", TEST_SCHEMA, longInClauses));
328351
}
329352

353+
@Override
354+
@Test
355+
public void testNativeQueryInsertStatementTableExists()
356+
{
357+
abort("Read-only Exasol connector does not support CREATE TABLE and INSERT statements");
358+
}
359+
360+
@Override
361+
@Test
362+
public void testNativeQuerySelectFromTestTable()
363+
{
364+
abort("Read-only Exasol connector does not support CREATE TABLE and INSERT statements");
365+
}
366+
330367
private static String getLongInClause(int start, int length)
331368
{
332369
String longValues = range(start, start + length)

0 commit comments

Comments
 (0)