Skip to content

Commit 38cc01d

Browse files
authored
[postgres] add geometry type support. (#1554)
1 parent cdfc86f commit 38cc01d

File tree

5 files changed

+185
-71
lines changed

5 files changed

+185
-71
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2022 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.postgres.table;
18+
19+
import org.apache.flink.table.data.StringData;
20+
import org.apache.flink.table.types.logical.LogicalType;
21+
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.ObjectWriter;
24+
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
25+
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory;
26+
import io.debezium.data.geometry.Geography;
27+
import io.debezium.data.geometry.Geometry;
28+
import io.debezium.util.HexConverter;
29+
import org.apache.kafka.connect.data.Schema;
30+
import org.apache.kafka.connect.data.Struct;
31+
32+
import java.time.ZoneId;
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
import java.util.Optional;
36+
37+
/** Used to create {@link DeserializationRuntimeConverterFactory} specified to PostgreSQL. */
38+
public class PostgreSQLDeserializationConverterFactory {
39+
40+
public static final String SRID = "srid";
41+
public static final String HEXEWKB = "hexewkb";
42+
43+
public static DeserializationRuntimeConverterFactory instance() {
44+
return new DeserializationRuntimeConverterFactory() {
45+
46+
private static final long serialVersionUID = 1L;
47+
48+
@Override
49+
public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
50+
LogicalType logicalType, ZoneId serverTimeZone) {
51+
switch (logicalType.getTypeRoot()) {
52+
case VARCHAR:
53+
return createStringConverter();
54+
default:
55+
// fallback to default converter
56+
return Optional.empty();
57+
}
58+
}
59+
};
60+
}
61+
62+
private static Optional<DeserializationRuntimeConverter> createStringConverter() {
63+
final ObjectMapper objectMapper = new ObjectMapper();
64+
final ObjectWriter objectWriter = objectMapper.writer();
65+
return Optional.of(
66+
new DeserializationRuntimeConverter() {
67+
68+
private static final long serialVersionUID = 1L;
69+
70+
@Override
71+
public Object convert(Object dbzObj, Schema schema) throws Exception {
72+
// the Geometry datatype in PostgreSQL will be converted to
73+
// a String with Json format
74+
if (Geometry.LOGICAL_NAME.equals(schema.name())
75+
|| Geography.LOGICAL_NAME.equals(schema.name())) {
76+
try {
77+
Struct geometryStruct = (Struct) dbzObj;
78+
byte[] wkb = geometryStruct.getBytes("wkb");
79+
Optional<Integer> srid =
80+
Optional.ofNullable(geometryStruct.getInt32(SRID));
81+
Map<String, Object> geometryInfo = new HashMap<>(2);
82+
geometryInfo.put(HEXEWKB, HexConverter.convertToHexString(wkb));
83+
geometryInfo.put(SRID, srid.orElse(0));
84+
return StringData.fromString(
85+
objectWriter.writeValueAsString(geometryInfo));
86+
} catch (Exception e) {
87+
throw new IllegalArgumentException(
88+
String.format(
89+
"Failed to convert %s to geometry JSON.", dbzObj),
90+
e);
91+
}
92+
} else {
93+
return StringData.fromString(dbzObj.toString());
94+
}
95+
}
96+
});
97+
}
98+
}

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
121121
.setPhysicalRowType(physicalDataType)
122122
.setMetadataConverters(metadataConverters)
123123
.setResultTypeInfo(typeInfo)
124+
.setUserDefinedConverterFactory(
125+
PostgreSQLDeserializationConverterFactory.instance())
124126
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
125127
.build();
126128
DebeziumSourceFunction<RowData> sourceFunction =

flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import static org.junit.Assert.assertNotNull;
4444

4545
/**
46-
* Basic class for testing PostgresSQL source, this contains a PostgreSQL container which enables
46+
* Basic class for testing PostgreSQL source, this contains a PostgreSQL container which enables
4747
* binlog.
4848
*/
4949
public abstract class PostgresTestBase extends AbstractTestBase {
@@ -96,7 +96,7 @@ protected void initializePostgresTable(String sqlFile) {
9696
return m.matches() ? m.group(1) : x;
9797
})
9898
.collect(Collectors.joining("\n"))
99-
.split(";"))
99+
.split(";\n"))
100100
.collect(Collectors.toList());
101101
for (String stmt : statements) {
102102
statement.execute(stmt);

flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -247,26 +247,28 @@ public void testAllTypes() throws Throwable {
247247

248248
String sourceDDL =
249249
String.format(
250-
"CREATE TABLE full_types (\n"
251-
+ " id INTEGER NOT NULL,\n"
252-
+ " bytea_c BYTES,\n"
253-
+ " small_c SMALLINT,\n"
254-
+ " int_c INTEGER,\n"
255-
+ " big_c BIGINT,\n"
256-
+ " real_c FLOAT,\n"
257-
+ " double_precision DOUBLE,\n"
258-
+ " numeric_c DECIMAL(10, 5),\n"
259-
+ " decimal_c DECIMAL(10, 1),\n"
260-
+ " boolean_c BOOLEAN,\n"
261-
+ " text_c STRING,\n"
262-
+ " char_c STRING,\n"
263-
+ " character_c STRING,\n"
264-
+ " character_varying_c STRING,\n"
265-
+ " timestamp3_c TIMESTAMP(3),\n"
266-
+ " timestamp6_c TIMESTAMP(6),\n"
267-
+ " date_c DATE,\n"
268-
+ " time_c TIME(0),\n"
269-
+ " default_numeric_c DECIMAL\n"
250+
"CREATE TABLE full_types ("
251+
+ " id INTEGER NOT NULL,"
252+
+ " bytea_c BYTES,"
253+
+ " small_c SMALLINT,"
254+
+ " int_c INTEGER,"
255+
+ " big_c BIGINT,"
256+
+ " real_c FLOAT,"
257+
+ " double_precision DOUBLE,"
258+
+ " numeric_c DECIMAL(10, 5),"
259+
+ " decimal_c DECIMAL(10, 1),"
260+
+ " boolean_c BOOLEAN,"
261+
+ " text_c STRING,"
262+
+ " char_c STRING,"
263+
+ " character_c STRING,"
264+
+ " character_varying_c STRING,"
265+
+ " timestamp3_c TIMESTAMP(3),"
266+
+ " timestamp6_c TIMESTAMP(6),"
267+
+ " date_c DATE,"
268+
+ " time_c TIME(0),"
269+
+ " default_numeric_c DECIMAL,"
270+
+ " geography_c STRING,"
271+
+ " geometry_c STRING"
270272
+ ") WITH ("
271273
+ " 'connector' = 'postgres-cdc',"
272274
+ " 'hostname' = '%s',"
@@ -282,29 +284,31 @@ public void testAllTypes() throws Throwable {
282284
POSTGERS_CONTAINER.getUsername(),
283285
POSTGERS_CONTAINER.getPassword(),
284286
POSTGERS_CONTAINER.getDatabaseName(),
285-
"public",
287+
"inventory",
286288
"full_types");
287289
String sinkDDL =
288-
"CREATE TABLE sink (\n"
289-
+ " id INTEGER NOT NULL,\n"
290-
+ " bytea_c BYTES,\n"
291-
+ " small_c SMALLINT,\n"
292-
+ " int_c INTEGER,\n"
293-
+ " big_c BIGINT,\n"
294-
+ " real_c FLOAT,\n"
295-
+ " double_precision DOUBLE,\n"
296-
+ " numeric_c DECIMAL(10, 5),\n"
297-
+ " decimal_c DECIMAL(10, 1),\n"
298-
+ " boolean_c BOOLEAN,\n"
299-
+ " text_c STRING,\n"
300-
+ " char_c STRING,\n"
301-
+ " character_c STRING,\n"
302-
+ " character_varying_c STRING,\n"
303-
+ " timestamp3_c TIMESTAMP(3),\n"
304-
+ " timestamp6_c TIMESTAMP(6),\n"
305-
+ " date_c DATE,\n"
306-
+ " time_c TIME(0),\n"
307-
+ " default_numeric_c DECIMAL\n"
290+
"CREATE TABLE sink ("
291+
+ " id INTEGER NOT NULL,"
292+
+ " bytea_c BYTES,"
293+
+ " small_c SMALLINT,"
294+
+ " int_c INTEGER,"
295+
+ " big_c BIGINT,"
296+
+ " real_c FLOAT,"
297+
+ " double_precision DOUBLE,"
298+
+ " numeric_c DECIMAL(10, 5),"
299+
+ " decimal_c DECIMAL(10, 1),"
300+
+ " boolean_c BOOLEAN,"
301+
+ " text_c STRING,"
302+
+ " char_c STRING,"
303+
+ " character_c STRING,"
304+
+ " character_varying_c STRING,"
305+
+ " timestamp3_c TIMESTAMP(3),"
306+
+ " timestamp6_c TIMESTAMP(6),"
307+
+ " date_c DATE,"
308+
+ " time_c TIME(0),"
309+
+ " default_numeric_c DECIMAL,"
310+
+ " geography_c STRING,"
311+
+ " geometry_c STRING"
308312
+ ") WITH ("
309313
+ " 'connector' = 'values',"
310314
+ " 'sink-insert-only' = 'false'"
@@ -319,16 +323,16 @@ public void testAllTypes() throws Throwable {
319323

320324
try (Connection connection = getJdbcConnection();
321325
Statement statement = connection.createStatement()) {
322-
statement.execute("UPDATE full_types SET small_c=0 WHERE id=1;");
326+
statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE id=1;");
323327
}
324328

325329
waitForSinkSize("sink", 3);
326330

327331
List<String> expected =
328332
Arrays.asList(
329-
"+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500)",
330-
"-U(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500)",
331-
"+U(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500)");
333+
"+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
334+
"-U(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
335+
"+U(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
332336
List<String> actual = TestValuesTableFactory.getRawResults("sink");
333337
assertEquals(expected, actual);
334338

flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,43 @@
1414
-- ----------------------------------------------------------------------------------------------------------------
1515
-- DATABASE: column_type_test
1616
-- ----------------------------------------------------------------------------------------------------------------
17+
-- Generate a number of tables to cover as many of the PG types as possible
18+
DROP SCHEMA IF EXISTS inventory CASCADE;
19+
CREATE SCHEMA inventory;
20+
SET search_path TO inventory;
21+
CREATE EXTENSION postgis;
1722

18-
CREATE TABLE full_types (
19-
id INTEGER NOT NULL,
20-
bytea_c BYTEA,
21-
small_c SMALLINT,
22-
int_c INTEGER,
23-
big_c BIGINT,
24-
real_c REAL,
25-
double_precision DOUBLE PRECISION,
26-
numeric_c NUMERIC(10, 5),
27-
decimal_c DECIMAL(10, 1),
28-
boolean_c BOOLEAN,
29-
text_c TEXT,
30-
char_c CHAR,
31-
character_c CHARACTER(3),
23+
CREATE TABLE full_types
24+
(
25+
id INTEGER NOT NULL,
26+
bytea_c BYTEA,
27+
small_c SMALLINT,
28+
int_c INTEGER,
29+
big_c BIGINT,
30+
real_c REAL,
31+
double_precision DOUBLE PRECISION,
32+
numeric_c NUMERIC(10, 5),
33+
decimal_c DECIMAL(10, 1),
34+
boolean_c BOOLEAN,
35+
text_c TEXT,
36+
char_c CHAR,
37+
character_c CHARACTER(3),
3238
character_varying_c CHARACTER VARYING(20),
33-
timestamp3_c TIMESTAMP(3),
34-
timestamp6_c TIMESTAMP(6),
35-
date_c DATE,
36-
time_c TIME(0),
37-
default_numeric_c NUMERIC,
39+
timestamp3_c TIMESTAMP(3),
40+
timestamp6_c TIMESTAMP(6),
41+
date_c DATE,
42+
time_c TIME(0),
43+
default_numeric_c NUMERIC,
44+
geometry_c GEOMETRY(POINT, 3187),
45+
geography_c GEOGRAPHY(MULTILINESTRING),
3846
PRIMARY KEY (id)
3947
);
4048

41-
ALTER TABLE full_types REPLICA IDENTITY FULL;
49+
ALTER TABLE full_types
50+
REPLICA IDENTITY FULL;
4251

43-
INSERT INTO full_types VALUES (
44-
1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
45-
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
46-
'2020-07-17', '18:00:22', 500);
52+
INSERT INTO full_types
53+
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
54+
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
55+
'2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479 -36.7208)'::geometry,
56+
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);

0 commit comments

Comments
 (0)