Skip to content

Commit c50634d

Browse files
author
dujie
committed
[fix] #1261 add JdbcConfigurationUtil and add test for url param
1 parent b09669f commit c50634d

File tree

6 files changed

+77
-17
lines changed

6 files changed

+77
-17
lines changed

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/OracleSource.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.ververica.cdc.connectors.oracle;
2020

2121
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
22+
import com.ververica.cdc.connectors.oracle.util.JdbcConfigurationUtil;
2223
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
2324
import com.ververica.cdc.debezium.DebeziumSourceFunction;
2425
import com.ververica.cdc.debezium.internal.DebeziumOffset;
@@ -47,6 +48,7 @@ public static class Builder<T> {
4748
private String database;
4849
private String username;
4950
private String password;
51+
private String url;
5052
private String[] tableList;
5153
private String[] schemaList;
5254
private Properties dbzProperties;
@@ -64,6 +66,12 @@ public Builder<T> port(int port) {
6466
return this;
6567
}
6668

69+
/** Url to use when connecting to the Oracle database server. */
70+
public Builder<T> url(String url) {
71+
this.url = url;
72+
return this;
73+
}
74+
6775
/**
6876
* An optional list of regular expressions that match database names to be monitored; any
6977
* database name not included in the whitelist will be excluded from monitoring. By default
@@ -143,6 +151,9 @@ public DebeziumSourceFunction<T> build() {
143151
props.setProperty("database.user", checkNotNull(username));
144152
props.setProperty("database.password", checkNotNull(password));
145153
props.setProperty("database.port", String.valueOf(port));
154+
if (url != null) {
155+
props.setProperty("database.url", url);
156+
}
146157
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
147158
props.setProperty("database.dbname", checkNotNull(database));
148159
if (schemaList != null) {
@@ -170,15 +181,11 @@ public DebeziumSourceFunction<T> build() {
170181
props.putAll(dbzProperties);
171182
}
172183

173-
String hostname = props.getProperty("database.hostname");
174-
String port = props.getProperty("database.port");
175-
String dbname = props.getProperty("database.dbname");
176-
177-
String url;
184+
// debezium default URL format is serviceName rather than SID, so we should set
185+
// default url format is sid
178186
if (!props.containsKey("database.url")) {
179-
url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname;
180-
// debezium default URL format is SID rather than serviceName, so we should set url
181-
props.setProperty("database.url", url);
187+
props.setProperty(
188+
"database.url", JdbcConfigurationUtil.getConnectionUrlWithSid(props));
182189
}
183190

184191
return new DebeziumSourceFunction<>(

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/OracleValidator.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.table.api.TableException;
2222
import org.apache.flink.table.api.ValidationException;
2323

24+
import com.ververica.cdc.connectors.oracle.util.JdbcConfigurationUtil;
2425
import com.ververica.cdc.debezium.Validator;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -65,15 +66,7 @@ public void validate() {
6566

6667
public static Connection openConnection(Properties properties) throws SQLException {
6768
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
68-
String url;
69-
if (properties.containsKey("database.url")) {
70-
url = properties.getProperty("database.url");
71-
} else {
72-
String hostname = properties.getProperty("database.hostname");
73-
String port = properties.getProperty("database.port");
74-
String dbname = properties.getProperty("database.dbname");
75-
url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname;
76-
}
69+
String url = JdbcConfigurationUtil.getConnectionUrlWithSid(properties);
7770
String userName = properties.getProperty("database.user");
7871
String userpwd = properties.getProperty("database.password");
7972
return DriverManager.getConnection(url, userName, userpwd);

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
5454

5555
private final ResolvedSchema physicalSchema;
56+
private final String url;
5657
private final int port;
5758
private final String hostname;
5859
private final String database;
@@ -75,6 +76,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
7576

7677
public OracleTableSource(
7778
ResolvedSchema physicalSchema,
79+
String url,
7880
int port,
7981
String hostname,
8082
String database,
@@ -85,6 +87,7 @@ public OracleTableSource(
8587
Properties dbzProperties,
8688
StartupOptions startupOptions) {
8789
this.physicalSchema = physicalSchema;
90+
this.url = url;
8891
this.port = port;
8992
this.hostname = checkNotNull(hostname);
9093
this.database = checkNotNull(database);
@@ -125,6 +128,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
125128
.build();
126129
OracleSource.Builder<RowData> builder =
127130
OracleSource.<RowData>builder()
131+
.url(url)
128132
.hostname(hostname)
129133
.port(port)
130134
.database(database)
@@ -161,6 +165,7 @@ public DynamicTableSource copy() {
161165
OracleTableSource source =
162166
new OracleTableSource(
163167
physicalSchema,
168+
url,
164169
port,
165170
hostname,
166171
database,
@@ -186,6 +191,7 @@ public boolean equals(Object o) {
186191
OracleTableSource that = (OracleTableSource) o;
187192
return port == that.port
188193
&& Objects.equals(physicalSchema, that.physicalSchema)
194+
&& Objects.equals(url, that.url)
189195
&& Objects.equals(hostname, that.hostname)
190196
&& Objects.equals(database, that.database)
191197
&& Objects.equals(username, that.username)
@@ -202,6 +208,7 @@ public boolean equals(Object o) {
202208
public int hashCode() {
203209
return Objects.hash(
204210
physicalSchema,
211+
url,
205212
port,
206213
hostname,
207214
database,

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
5454
.defaultValue(1521)
5555
.withDescription("Integer port number of the Oracle database server.");
5656

57+
private static final ConfigOption<String> URL =
58+
ConfigOptions.key("url")
59+
.stringType()
60+
.noDefaultValue()
61+
.withDescription(
62+
"Complete JDBC URL as an alternative to specifying hostname, port and database provided as a way to support alternative connection scenarios.");
63+
5764
private static final ConfigOption<String> USERNAME =
5865
ConfigOptions.key("username")
5966
.stringType()
@@ -107,12 +114,14 @@ public DynamicTableSource createDynamicTableSource(Context context) {
107114
String databaseName = config.get(DATABASE_NAME);
108115
String tableName = config.get(TABLE_NAME);
109116
String schemaName = config.get(SCHEMA_NAME);
117+
String url = config.get(URL);
110118
int port = config.get(PORT);
111119
StartupOptions startupOptions = getStartupOptions(config);
112120
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
113121

114122
return new OracleTableSource(
115123
physicalSchema,
124+
url,
116125
port,
117126
hostname,
118127
databaseName,
@@ -145,6 +154,7 @@ public Set<ConfigOption<?>> requiredOptions() {
145154
public Set<ConfigOption<?>> optionalOptions() {
146155
Set<ConfigOption<?>> options = new HashSet<>();
147156
options.add(PORT);
157+
options.add(URL);
148158
options.add(SCAN_STARTUP_MODE);
149159

150160
return options;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.ververica.cdc.connectors.oracle.util;
20+
21+
import java.util.Properties;
22+
23+
public class JdbcConfigurationUtil {
24+
25+
public static String getConnectionUrlWithSid(Properties properties) {
26+
String url;
27+
if (properties.containsKey("database.url")) {
28+
url = properties.getProperty("database.url");
29+
} else {
30+
String hostname = properties.getProperty("database.hostname");
31+
String port = properties.getProperty("database.port");
32+
String dbname = properties.getProperty("database.dbname");
33+
url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname;
34+
}
35+
return url;
36+
}
37+
}

flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public void testCommonProperties() {
100100
OracleTableSource expectedSource =
101101
new OracleTableSource(
102102
SCHEMA,
103+
null,
103104
1521,
104105
MY_LOCALHOST,
105106
MY_DATABASE,
@@ -116,6 +117,7 @@ public void testCommonProperties() {
116117
public void testOptionalProperties() {
117118
Map<String, String> options = getAllOptions();
118119
options.put("port", "1521");
120+
options.put("url", "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE);
119121
options.put("debezium.snapshot.mode", "initial");
120122

121123
DynamicTableSource actualSource = createTableSource(options);
@@ -124,6 +126,7 @@ public void testOptionalProperties() {
124126
OracleTableSource expectedSource =
125127
new OracleTableSource(
126128
SCHEMA,
129+
"jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE,
127130
1521,
128131
MY_LOCALHOST,
129132
MY_DATABASE,
@@ -146,6 +149,7 @@ public void testStartupFromInitial() {
146149
OracleTableSource expectedSource =
147150
new OracleTableSource(
148151
SCHEMA,
152+
null,
149153
1521,
150154
MY_LOCALHOST,
151155
MY_DATABASE,
@@ -168,6 +172,7 @@ public void testStartupFromLatestOffset() {
168172
OracleTableSource expectedSource =
169173
new OracleTableSource(
170174
SCHEMA,
175+
null,
171176
1521,
172177
MY_LOCALHOST,
173178
MY_DATABASE,
@@ -194,6 +199,7 @@ public void testMetadataColumns() {
194199
OracleTableSource expectedSource =
195200
new OracleTableSource(
196201
SCHEMA_WITH_METADATA,
202+
null,
197203
1521,
198204
MY_LOCALHOST,
199205
MY_DATABASE,

0 commit comments

Comments
 (0)