Skip to content

Commit a2ce1a0

Browse files
committed
[mongodb] Add support for mongodb+srv connection protocol.
1 parent 7cdeb0d commit a2ce1a0

File tree

15 files changed

+102
-16
lines changed

15 files changed

+102
-16
lines changed

docs/content/connectors/mongodb-cdc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ Connector Options
146146
<td>String</td>
147147
<td>Specify what connector to use, here should be <code>mongodb-cdc</code>.</td>
148148
</tr>
149+
<tr>
150+
<td>scheme</td>
151+
<td>optional</td>
152+
<td style="word-wrap: break-word;">mongodb</td>
153+
<td>String</td>
154+
<td>The protocol connected to MongoDB. eg. <code>mongodb or mongodb+srv.</code></td>
155+
</tr>
149156
<tr>
150157
<td>hosts</td>
151158
<td>required</td>

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
3737
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
3838
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
39+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
40+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
3941
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
4042
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
4143
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
@@ -63,7 +65,7 @@ public static <T> Builder<T> builder() {
6365

6466
/** Builder class of {@link MongoDBSource}. */
6567
public static class Builder<T> {
66-
68+
private String scheme;
6769
private String hosts;
6870
private String username;
6971
private String password;
@@ -81,6 +83,17 @@ public static class Builder<T> {
8183
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
8284
private DebeziumDeserializationSchema<T> deserializer;
8385

86+
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
87+
public Builder<T> scheme(String scheme) {
88+
checkArgument(
89+
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
90+
String.format(
91+
"The scheme should either be %s or %s",
92+
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
93+
this.scheme = scheme;
94+
return this;
95+
}
96+
8497
/** The comma-separated list of hostname and port pairs of mongodb servers. */
8598
public Builder<T> hosts(String hosts) {
8699
this.hosts = hosts;
@@ -261,7 +274,8 @@ public DebeziumSourceFunction<T> build() {
261274
props.setProperty(
262275
MongoSourceConfig.CONNECTION_URI_CONFIG,
263276
String.valueOf(
264-
buildConnectionString(username, password, hosts, connectionOptions)));
277+
buildConnectionString(
278+
username, password, scheme, hosts, connectionOptions)));
265279

266280
if (databaseList != null) {
267281
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class MongoDBEnvelope {
4141

4242
public static final String MONGODB_SCHEME = "mongodb";
4343

44+
public static final String MONGODB_SRV_SCHEME = "mongodb+srv";
45+
4446
public static final String ID_FIELD = "_id";
4547

4648
public static final String DATA_FIELD = "_data";

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public class MongoDBSourceBuilder<T> {
5252
private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory();
5353
private DebeziumDeserializationSchema<T> deserializer;
5454

55+
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
56+
public MongoDBSourceBuilder<T> scheme(String scheme) {
57+
this.configFactory.scheme(scheme);
58+
return this;
59+
}
60+
5561
/** The comma-separated list of hostname and port pairs of mongodb servers. */
5662
public MongoDBSourceBuilder<T> hosts(String hosts) {
5763
this.configFactory.hosts(hosts);

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class MongoDBSourceConfig implements SourceConfig {
3333

3434
private static final long serialVersionUID = 1L;
3535

36+
private final String scheme;
3637
private final String hosts;
3738
@Nullable private final String username;
3839
@Nullable private final String password;
@@ -49,6 +50,7 @@ public class MongoDBSourceConfig implements SourceConfig {
4950
private final int splitSizeMB;
5051

5152
MongoDBSourceConfig(
53+
String scheme,
5254
String hosts,
5355
@Nullable String username,
5456
@Nullable String password,
@@ -63,13 +65,14 @@ public class MongoDBSourceConfig implements SourceConfig {
6365
int heartbeatIntervalMillis,
6466
int splitMetaGroupSize,
6567
int splitSizeMB) {
68+
this.scheme = checkNotNull(scheme);
6669
this.hosts = checkNotNull(hosts);
6770
this.username = username;
6871
this.password = password;
6972
this.databaseList = databaseList;
7073
this.collectionList = collectionList;
7174
this.connectionString =
72-
buildConnectionString(username, password, hosts, connectionOptions)
75+
buildConnectionString(scheme, username, password, hosts, connectionOptions)
7376
.getConnectionString();
7477
this.batchSize = batchSize;
7578
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
@@ -81,6 +84,10 @@ public class MongoDBSourceConfig implements SourceConfig {
8184
this.splitSizeMB = splitSizeMB;
8285
}
8386

87+
public String getScheme() {
88+
return scheme;
89+
}
90+
8491
public String getHosts() {
8592
return hosts;
8693
}
@@ -166,6 +173,7 @@ public boolean equals(Object o) {
166173
&& heartbeatIntervalMillis == that.heartbeatIntervalMillis
167174
&& splitMetaGroupSize == that.splitMetaGroupSize
168175
&& splitSizeMB == that.splitSizeMB
176+
&& Objects.equals(scheme, that.scheme)
169177
&& Objects.equals(hosts, that.hosts)
170178
&& Objects.equals(username, that.username)
171179
&& Objects.equals(password, that.password)
@@ -177,6 +185,7 @@ public boolean equals(Object o) {
177185
@Override
178186
public int hashCode() {
179187
return Objects.hash(
188+
scheme,
180189
hosts,
181190
username,
182191
password,

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import java.util.List;
2626

2727
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
28+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
29+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
2830
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
2931
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
3032
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
3133
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
3234
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
35+
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
3336
import static org.apache.flink.util.Preconditions.checkArgument;
3437
import static org.apache.flink.util.Preconditions.checkNotNull;
3538

@@ -39,6 +42,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
3942

4043
private static final long serialVersionUID = 1L;
4144

45+
private String scheme = SCHEME.defaultValue();
4246
private String hosts;
4347
private String username;
4448
private String password;
@@ -54,6 +58,17 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
5458
private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
5559
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
5660

61+
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
62+
public MongoDBSourceConfigFactory scheme(String scheme) {
63+
checkArgument(
64+
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
65+
String.format(
66+
"The scheme should either be %s or %s",
67+
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
68+
this.scheme = scheme;
69+
return this;
70+
}
71+
5772
/** The comma-separated list of hostname and port pairs of mongodb servers. */
5873
public MongoDBSourceConfigFactory hosts(String hosts) {
5974
this.hosts = hosts;
@@ -196,6 +211,7 @@ public void validate() {
196211
@Override
197212
public MongoDBSourceConfig create(int subtaskId) {
198213
return new MongoDBSourceConfig(
214+
scheme,
199215
hosts,
200216
username,
201217
password,

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,19 @@
2020
import org.apache.flink.configuration.ConfigOption;
2121
import org.apache.flink.configuration.ConfigOptions;
2222

23+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
24+
2325
/** Configurations for {@link com.ververica.cdc.connectors.mongodb.source.MongoDBSource}. */
2426
public class MongoDBSourceOptions {
2527

28+
public static final ConfigOption<String> SCHEME =
29+
ConfigOptions.key("scheme")
30+
.stringType()
31+
.defaultValue(MONGODB_SCHEME)
32+
.withDescription(
33+
"The protocol connected to MongoDB. eg. mongodb or mongodb+srv. "
34+
+ "The +srv indicates to the client that the hostname that follows corresponds to a DNS SRV record. Defaults to mongodb.");
35+
2636
public static final ConfigOption<String> HOSTS =
2737
ConfigOptions.key("hosts")
2838
.stringType()

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void execute(Context context) throws Exception {
130130
SourceRecord snapshotRecord =
131131
createSourceRecord(
132132
createPartitionMap(
133+
sourceConfig.getScheme(),
133134
sourceConfig.getHosts(),
134135
collectionId.catalog(),
135136
collectionId.table()),

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ public void execute(Context context) throws Exception {
145145
changeRecord =
146146
createSourceRecord(
147147
createPartitionMap(
148+
sourceConfig.getScheme(),
148149
sourceConfig.getHosts(),
149150
namespace.getDatabaseName(),
150151
namespace.getCollectionName()),
@@ -277,7 +278,7 @@ private HeartbeatManager openHeartbeatManagerIfNeeded(
277278
changeStreamCursor,
278279
sourceConfig.getHeartbeatIntervalMillis(),
279280
HEARTBEAT_TOPIC_NAME,
280-
createHeartbeatPartitionMap(sourceConfig.getHosts()));
281+
createHeartbeatPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts()));
281282
}
282283
return null;
283284
}

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ public static Map<String, String> createSourceOffsetMap(
175175
}
176176

177177
public static Map<String, String> createPartitionMap(
178-
String hosts, String database, String collection) {
178+
String scheme, String hosts, String database, String collection) {
179179
StringBuilder builder = new StringBuilder();
180-
builder.append("mongodb://");
180+
builder.append(String.format("%s://", scheme));
181181
builder.append(hosts);
182182
builder.append("/");
183183
if (StringUtils.isNotEmpty(database)) {
@@ -190,9 +190,9 @@ public static Map<String, String> createPartitionMap(
190190
return singletonMap(NAMESPACE_FIELD, builder.toString());
191191
}
192192

193-
public static Map<String, Object> createHeartbeatPartitionMap(String hosts) {
193+
public static Map<String, Object> createHeartbeatPartitionMap(String scheme, String hosts) {
194194
StringBuilder builder = new StringBuilder();
195-
builder.append("mongodb://");
195+
builder.append(String.format("%s://", scheme));
196196
builder.append(hosts);
197197
builder.append("/");
198198
builder.append(HEARTBEAT_TOPIC_NAME);

0 commit comments

Comments
 (0)