Skip to content

Commit 7e9d101

Browse files
committed
Add support for JOIN PUSHDOWN in Exasol connector
1 parent e1550a5 commit 7e9d101

File tree

4 files changed

+296
-5
lines changed

4 files changed

+296
-5
lines changed

docs/src/main/sphinx/connector/exasol.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,16 @@ FROM
206206

207207
```{include} query-table-function-ordering.fragment
208208
```
209+
210+
(exasol-pushdown)=
211+
### Pushdown
212+
213+
The connector supports pushdown for the following operations:
214+
215+
- {ref}`join-pushdown`
216+
217+
```{include} pushdown-correctness-behavior.fragment
218+
```
219+
220+
```{include} join-pushdown-enabled-true.fragment
221+
```

plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableSet;
1717
import com.google.inject.Inject;
18+
import io.trino.plugin.base.expression.ConnectorExpressionRewriter;
1819
import io.trino.plugin.base.mapping.IdentifierMapping;
1920
import io.trino.plugin.jdbc.BaseJdbcClient;
2021
import io.trino.plugin.jdbc.BaseJdbcConfig;
@@ -31,6 +32,9 @@
3132
import io.trino.plugin.jdbc.QueryBuilder;
3233
import io.trino.plugin.jdbc.WriteFunction;
3334
import io.trino.plugin.jdbc.WriteMapping;
35+
import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder;
36+
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
37+
import io.trino.plugin.jdbc.expression.RewriteIn;
3438
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
3539
import io.trino.spi.TrinoException;
3640
import io.trino.spi.connector.AggregateFunction;
@@ -39,7 +43,8 @@
3943
import io.trino.spi.connector.ColumnPosition;
4044
import io.trino.spi.connector.ConnectorSession;
4145
import io.trino.spi.connector.ConnectorTableMetadata;
42-
import io.trino.spi.type.Type;
46+
import io.trino.spi.expression.ConnectorExpression;
47+
import io.trino.spi.type.*;
4348

4449
import java.sql.Connection;
4550
import java.sql.Date;
@@ -52,19 +57,36 @@
5257
import java.util.Set;
5358

5459
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
60+
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
5561
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanColumnMapping;
62+
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
63+
import static io.trino.plugin.jdbc.StandardColumnMappings.charWriteFunction;
5664
import static io.trino.plugin.jdbc.StandardColumnMappings.decimalColumnMapping;
5765
import static io.trino.plugin.jdbc.StandardColumnMappings.defaultCharColumnMapping;
5866
import static io.trino.plugin.jdbc.StandardColumnMappings.defaultVarcharColumnMapping;
5967
import static io.trino.plugin.jdbc.StandardColumnMappings.doubleColumnMapping;
68+
import static io.trino.plugin.jdbc.StandardColumnMappings.doubleWriteFunction;
6069
import static io.trino.plugin.jdbc.StandardColumnMappings.integerColumnMapping;
70+
import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
71+
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
72+
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
6173
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
74+
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
75+
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
76+
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
6277
import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling;
6378
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR;
6479
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
6580
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
81+
import static io.trino.spi.type.BigintType.BIGINT;
82+
import static io.trino.spi.type.BooleanType.BOOLEAN;
6683
import static io.trino.spi.type.DateType.DATE;
6784
import static io.trino.spi.type.DecimalType.createDecimalType;
85+
import static io.trino.spi.type.DoubleType.DOUBLE;
86+
import static io.trino.spi.type.IntegerType.INTEGER;
87+
import static io.trino.spi.type.SmallintType.SMALLINT;
88+
import static io.trino.spi.type.TinyintType.TINYINT;
89+
import static java.lang.String.format;
6890
import static java.util.Locale.ENGLISH;
6991

7092
public class ExasolClient
@@ -75,6 +97,9 @@ public class ExasolClient
7597
.add("SYS")
7698
.build();
7799

100+
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
101+
private static final int EXASOL_MAX_SUPPORTED_VARCHAR_SIZE = 20000;
102+
78103
@Inject
79104
public ExasolClient(
80105
BaseJdbcConfig config,
@@ -84,6 +109,30 @@ public ExasolClient(
84109
RemoteQueryModifier queryModifier)
85110
{
86111
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
112+
this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
113+
.addStandardRules(this::quoted)
114+
.add(new RewriteIn())
115+
.withTypeClass("integer_type", ImmutableSet.of("tinyint", "smallint", "integer", "bigint"))
116+
.withTypeClass("numeric_type", ImmutableSet.of("tinyint", "smallint", "integer", "bigint", "decimal", "real", "double"))
117+
.map("$equal(left, right)").to("left = right")
118+
.map("$not_equal(left, right)").to("left <> right")
119+
.map("$identical(left, right)").to("(left = right OR (left IS NULL AND right IS NULL))")
120+
.map("$less_than(left, right)").to("left < right")
121+
.map("$less_than_or_equal(left, right)").to("left <= right")
122+
.map("$greater_than(left, right)").to("left > right")
123+
.map("$greater_than_or_equal(left, right)").to("left >= right")
124+
.map("$add(left: integer_type, right: integer_type)").to("left + right")
125+
.map("$subtract(left: integer_type, right: integer_type)").to("left - right")
126+
.map("$multiply(left: integer_type, right: integer_type)").to("left * right")
127+
.map("$divide(left: integer_type, right: integer_type)").to("left / right")
128+
.map("$modulus(left: integer_type, right: integer_type)").to("left % right")
129+
.map("$negate(value: integer_type)").to("-value")
130+
.map("$like(value: varchar, pattern: varchar): boolean").to("value LIKE pattern")
131+
.map("$not($is_null(value))").to("value IS NOT NULL")
132+
.map("$not(value: boolean)").to("NOT value")
133+
.map("$is_null(value)").to("value IS NULL")
134+
.map("$nullif(first, second)").to("NULLIF(first, second)")
135+
.build();
87136
}
88137

89138
@Override
@@ -95,6 +144,12 @@ protected boolean filterSchema(String schemaName)
95144
return super.filterSchema(schemaName);
96145
}
97146

147+
@Override
148+
public Optional<ParameterizedExpression> convertPredicate(ConnectorSession session, ConnectorExpression expression, Map<String, ColumnHandle> assignments)
149+
{
150+
return connectorExpressionRewriter.rewrite(session, expression, assignments);
151+
}
152+
98153
@Override
99154
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
100155
{
@@ -188,8 +243,7 @@ protected void renameTable(ConnectorSession session, Connection connection, Stri
188243
@Override
189244
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
190245
{
191-
// Deactivated because test 'testJoinPushdown()' requires write access which is not implemented for Exasol
192-
return false;
246+
return true;
193247
}
194248

195249
@Override
@@ -265,7 +319,48 @@ private static LongWriteFunction dateWriteFunctionUsingSqlDate()
265319
@Override
266320
public WriteMapping toWriteMapping(ConnectorSession session, Type type)
267321
{
268-
throw new TrinoException(NOT_SUPPORTED, "This connector does not support writing");
322+
if (type == BOOLEAN) {
323+
return WriteMapping.booleanMapping("boolean", booleanWriteFunction());
324+
}
325+
326+
if (type == TINYINT) {
327+
return WriteMapping.longMapping("tinyint", tinyintWriteFunction());
328+
}
329+
if (type == SMALLINT) {
330+
return WriteMapping.longMapping("smallint", smallintWriteFunction());
331+
}
332+
if (type == INTEGER) {
333+
return WriteMapping.longMapping("integer", integerWriteFunction());
334+
}
335+
if (type == BIGINT) {
336+
return WriteMapping.longMapping("bigint", bigintWriteFunction());
337+
}
338+
339+
if (type == DOUBLE) {
340+
return WriteMapping.doubleMapping("double", doubleWriteFunction());
341+
}
342+
343+
if (type instanceof DecimalType decimalType) {
344+
String dataType = format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale());
345+
if (decimalType.isShort()) {
346+
return WriteMapping.longMapping(dataType, shortDecimalWriteFunction(decimalType));
347+
}
348+
return WriteMapping.objectMapping(dataType, longDecimalWriteFunction(decimalType));
349+
}
350+
351+
if (type instanceof CharType charType) {
352+
return WriteMapping.sliceMapping("char(" + charType.getLength() + ")", charWriteFunction());
353+
}
354+
355+
if (type instanceof VarcharType varcharType) {
356+
int length = varcharType.isUnbounded()
357+
? EXASOL_MAX_SUPPORTED_VARCHAR_SIZE
358+
: Math.min(varcharType.getBoundedLength(), EXASOL_MAX_SUPPORTED_VARCHAR_SIZE);
359+
String dataType = "varchar(" + length + ")";
360+
return WriteMapping.sliceMapping(dataType, varcharWriteFunction());
361+
}
362+
363+
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
269364
}
270365

271366
@Override
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.exasol;
15+
16+
import io.airlift.slice.Slice;
17+
import io.airlift.slice.Slices;
18+
import io.trino.plugin.base.mapping.DefaultIdentifierMapping;
19+
import io.trino.plugin.jdbc.BaseJdbcConfig;
20+
import io.trino.plugin.jdbc.BooleanWriteFunction;
21+
import io.trino.plugin.jdbc.DefaultQueryBuilder;
22+
import io.trino.plugin.jdbc.DoubleWriteFunction;
23+
import io.trino.plugin.jdbc.JdbcClient;
24+
import io.trino.plugin.jdbc.LongWriteFunction;
25+
import io.trino.plugin.jdbc.ObjectWriteFunction;
26+
import io.trino.plugin.jdbc.SliceWriteFunction;
27+
import io.trino.plugin.jdbc.WriteMapping;
28+
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
29+
import io.trino.spi.connector.ConnectorSession;
30+
import io.trino.spi.type.Int128;
31+
import io.trino.spi.type.Type;
32+
import io.trino.testing.TestingConnectorSession;
33+
import org.junit.jupiter.api.Test;
34+
35+
import java.math.BigDecimal;
36+
import java.sql.PreparedStatement;
37+
import java.sql.SQLException;
38+
import java.sql.Types;
39+
40+
import static com.google.common.reflect.Reflection.newProxy;
41+
import static io.trino.spi.type.BigintType.BIGINT;
42+
import static io.trino.spi.type.BooleanType.BOOLEAN;
43+
import static io.trino.spi.type.CharType.createCharType;
44+
import static io.trino.spi.type.DecimalType.createDecimalType;
45+
import static io.trino.spi.type.DoubleType.DOUBLE;
46+
import static io.trino.spi.type.IntegerType.INTEGER;
47+
import static io.trino.spi.type.SmallintType.SMALLINT;
48+
import static io.trino.spi.type.TinyintType.TINYINT;
49+
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
50+
import static io.trino.spi.type.VarcharType.createVarcharType;
51+
import static org.assertj.core.api.Assertions.assertThat;
52+
import static org.junit.jupiter.api.Assertions.assertEquals;
53+
import static org.junit.jupiter.api.Assertions.assertTrue;
54+
55+
public class TestExasolClient
56+
{
57+
private static final JdbcClient CLIENT = new ExasolClient(
58+
new BaseJdbcConfig(),
59+
session -> {
60+
throw new UnsupportedOperationException();
61+
},
62+
new DefaultQueryBuilder(RemoteQueryModifier.NONE),
63+
new DefaultIdentifierMapping(),
64+
RemoteQueryModifier.NONE);
65+
66+
private static final ConnectorSession SESSION = TestingConnectorSession.SESSION;
67+
68+
@Test
69+
public void testTypedWriteMapping()
70+
throws SQLException
71+
{
72+
testLongWriteMapping(TINYINT, Types.TINYINT, "tinyint", "setByte", 1L, Byte.valueOf("1"));
73+
testLongWriteMapping(SMALLINT, Types.SMALLINT, "smallint", "setShort", 256L, Short.valueOf("256"));
74+
testLongWriteMapping(INTEGER, Types.INTEGER, "integer", "setInt", 123456L, 123456);
75+
testLongWriteMapping(BIGINT, Types.BIGINT, "bigint", "setLong", 123456789L, 123456789L);
76+
testLongWriteMapping(createDecimalType(16, 6), Types.DECIMAL, "decimal(16, 6)", "setBigDecimal", 123456123456L, BigDecimal.valueOf(123456.123456));
77+
testInt128WriteMapping(createDecimalType(36, 12), Types.DECIMAL, "decimal(36, 12)", "setBigDecimal", Int128.valueOf("123456789012345612345678901234567890"), new BigDecimal("123456789012345612345678.901234567890"));
78+
79+
testBooleanWriteMapping(Boolean.TRUE);
80+
testDoubleWriteMapping(1.2567);
81+
82+
testSliceWriteMapping(createCharType(25), Types.NCHAR, "char(25)", "test");
83+
testSliceWriteMapping(createUnboundedVarcharType(), Types.VARCHAR, "varchar(20000)", "u".repeat(20000));
84+
testSliceWriteMapping(createVarcharType(123), Types.VARCHAR, "varchar(123)", "9".repeat(123));
85+
}
86+
87+
private void testLongWriteMapping(Type type, int expectedJdbcType, String expectedDataType, String expectedJdbcMethodName,
88+
Long inputValue, Object expectedJdbcValue)
89+
throws SQLException
90+
{
91+
WriteMapping writeMapping = CLIENT.toWriteMapping(SESSION, type);
92+
assertEquals(writeMapping.getWriteFunction().getJavaType(), long.class);
93+
assertEquals(writeMapping.getDataType(), expectedDataType);
94+
assertTrue(writeMapping.getWriteFunction() instanceof LongWriteFunction);
95+
assertThat(writeMapping.getWriteFunction().getBindExpression()).isEqualTo("?");
96+
PreparedStatement statementMock = preparedStatementMock(expectedJdbcType, expectedJdbcMethodName, expectedJdbcValue);
97+
LongWriteFunction longWriteFunction = (LongWriteFunction) writeMapping.getWriteFunction();
98+
longWriteFunction.set(statementMock, expectedJdbcType, inputValue);
99+
}
100+
101+
private void testBooleanWriteMapping(boolean inputValue)
102+
throws SQLException
103+
{
104+
WriteMapping writeMapping = CLIENT.toWriteMapping(SESSION, BOOLEAN);
105+
assertEquals(writeMapping.getWriteFunction().getJavaType(), boolean.class);
106+
assertEquals(writeMapping.getDataType(), "boolean");
107+
assertTrue(writeMapping.getWriteFunction() instanceof BooleanWriteFunction);
108+
assertThat(writeMapping.getWriteFunction().getBindExpression()).isEqualTo("?");
109+
PreparedStatement statementMock = preparedStatementMock(Types.BOOLEAN, "setBoolean", inputValue);
110+
BooleanWriteFunction booleanWriteFunction = (BooleanWriteFunction) writeMapping.getWriteFunction();
111+
booleanWriteFunction.set(statementMock, Types.BOOLEAN, inputValue);
112+
}
113+
114+
private void testDoubleWriteMapping(double inputValue)
115+
throws SQLException
116+
{
117+
WriteMapping writeMapping = CLIENT.toWriteMapping(SESSION, DOUBLE);
118+
assertEquals(writeMapping.getWriteFunction().getJavaType(), double.class);
119+
assertEquals(writeMapping.getDataType(), "double");
120+
assertTrue(writeMapping.getWriteFunction() instanceof DoubleWriteFunction);
121+
assertThat(writeMapping.getWriteFunction().getBindExpression()).isEqualTo("?");
122+
PreparedStatement statementMock = preparedStatementMock(Types.DOUBLE, "setDouble", inputValue);
123+
DoubleWriteFunction doubleWriteFunction = (DoubleWriteFunction) writeMapping.getWriteFunction();
124+
doubleWriteFunction.set(statementMock, Types.DOUBLE, inputValue);
125+
}
126+
127+
private void testInt128WriteMapping(Type type, int expectedJdbcType, String expectedDataType, String expectedJdbcMethodName,
128+
Int128 inputValue, BigDecimal expectedJdbcValue)
129+
throws SQLException
130+
{
131+
WriteMapping writeMapping = CLIENT.toWriteMapping(SESSION, type);
132+
assertEquals(writeMapping.getWriteFunction().getJavaType(), Int128.class);
133+
assertEquals(writeMapping.getDataType(), expectedDataType);
134+
assertTrue(writeMapping.getWriteFunction() instanceof ObjectWriteFunction);
135+
assertThat(writeMapping.getWriteFunction().getBindExpression()).isEqualTo("?");
136+
PreparedStatement statementMock = preparedStatementMock(expectedJdbcType, expectedJdbcMethodName, expectedJdbcValue);
137+
ObjectWriteFunction objectWriteFunction = (ObjectWriteFunction) writeMapping.getWriteFunction();
138+
objectWriteFunction.set(statementMock, expectedJdbcType, inputValue);
139+
}
140+
141+
private void testSliceWriteMapping(Type type, int expectedJdbcType, String expectedDataType, String inputValue)
142+
throws SQLException
143+
{
144+
WriteMapping writeMapping = CLIENT.toWriteMapping(SESSION, type);
145+
assertEquals(writeMapping.getWriteFunction().getJavaType(), Slice.class);
146+
assertEquals(writeMapping.getDataType(), expectedDataType);
147+
assertTrue(writeMapping.getWriteFunction() instanceof SliceWriteFunction);
148+
assertThat(writeMapping.getWriteFunction().getBindExpression()).isEqualTo("?");
149+
PreparedStatement statementMock = preparedStatementMock(expectedJdbcType, "setString", inputValue);
150+
SliceWriteFunction sliceWriteFunction = (SliceWriteFunction) writeMapping.getWriteFunction();
151+
sliceWriteFunction.set(statementMock, expectedJdbcType, Slices.utf8Slice(inputValue));
152+
}
153+
154+
private PreparedStatement preparedStatementMock(int expectedJdbcType, String expectedMethodName, Object expectedValue)
155+
{
156+
return newProxy(PreparedStatement.class, (proxy, method, args) ->
157+
{
158+
assertThat(method.getName()).isEqualTo(expectedMethodName);
159+
assertThat(args.length).isEqualTo(2);
160+
assertThat(args[0]).isEqualTo(expectedJdbcType);
161+
assertThat(args[1])
162+
.describedAs("expected jdbc value")
163+
.isEqualTo(expectedValue);
164+
165+
return null;
166+
});
167+
}
168+
}

0 commit comments

Comments
 (0)