Skip to content

Commit 3c7ef0e

Browse files
committed
[mongodb] Add support for mongodb+srv connection protocol.
1 parent 7cdeb0d commit 3c7ef0e

File tree

15 files changed

+108
-23
lines changed

15 files changed

+108
-23
lines changed

docs/content/connectors/mongodb-cdc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ Connector Options
146146
<td>String</td>
147147
<td>Specify what connector to use, here should be <code>mongodb-cdc</code>.</td>
148148
</tr>
149+
<tr>
150+
<td>scheme</td>
151+
<td>optional</td>
152+
<td style="word-wrap: break-word;">mongodb</td>
153+
<td>String</td>
154+
<td>The protocol connected to MongoDB. eg. <code>mongodb or mongodb+srv.</code></td>
155+
</tr>
149156
<tr>
150157
<td>hosts</td>
151158
<td>required</td>

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,15 @@
3636
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
3737
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
3838
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
39+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
40+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
3941
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
4042
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
4143
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
4244
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
4345
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
4446
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
47+
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
4548
import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
4649
import static org.apache.flink.util.Preconditions.checkArgument;
4750

@@ -63,7 +66,7 @@ public static <T> Builder<T> builder() {
6366

6467
/** Builder class of {@link MongoDBSource}. */
6568
public static class Builder<T> {
66-
69+
private String scheme = SCHEME.defaultValue();
6770
private String hosts;
6871
private String username;
6972
private String password;
@@ -81,6 +84,17 @@ public static class Builder<T> {
8184
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
8285
private DebeziumDeserializationSchema<T> deserializer;
8386

87+
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
88+
public Builder<T> scheme(String scheme) {
89+
checkArgument(
90+
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
91+
String.format(
92+
"The scheme should either be %s or %s",
93+
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
94+
this.scheme = scheme;
95+
return this;
96+
}
97+
8498
/** The comma-separated list of hostname and port pairs of mongodb servers. */
8599
public Builder<T> hosts(String hosts) {
86100
this.hosts = hosts;
@@ -260,8 +274,7 @@ public DebeziumSourceFunction<T> build() {
260274

261275
props.setProperty(
262276
MongoSourceConfig.CONNECTION_URI_CONFIG,
263-
String.valueOf(
264-
buildConnectionString(username, password, hosts, connectionOptions)));
277+
buildConnectionString(username, password, scheme, hosts, connectionOptions));
265278

266279
if (databaseList != null) {
267280
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class MongoDBEnvelope {
4141

4242
public static final String MONGODB_SCHEME = "mongodb";
4343

44+
public static final String MONGODB_SRV_SCHEME = "mongodb+srv";
45+
4446
public static final String ID_FIELD = "_id";
4547

4648
public static final String DATA_FIELD = "_data";

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public class MongoDBSourceBuilder<T> {
5252
private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory();
5353
private DebeziumDeserializationSchema<T> deserializer;
5454

55+
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
56+
public MongoDBSourceBuilder<T> scheme(String scheme) {
57+
this.configFactory.scheme(scheme);
58+
return this;
59+
}
60+
5561
/** The comma-separated list of hostname and port pairs of mongodb servers. */
5662
public MongoDBSourceBuilder<T> hosts(String hosts) {
5763
this.configFactory.hosts(hosts);

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class MongoDBSourceConfig implements SourceConfig {
3333

3434
private static final long serialVersionUID = 1L;
3535

36+
private final String scheme;
3637
private final String hosts;
3738
@Nullable private final String username;
3839
@Nullable private final String password;
@@ -49,6 +50,7 @@ public class MongoDBSourceConfig implements SourceConfig {
4950
private final int splitSizeMB;
5051

5152
MongoDBSourceConfig(
53+
String scheme,
5254
String hosts,
5355
@Nullable String username,
5456
@Nullable String password,
@@ -63,14 +65,14 @@ public class MongoDBSourceConfig implements SourceConfig {
6365
int heartbeatIntervalMillis,
6466
int splitMetaGroupSize,
6567
int splitSizeMB) {
68+
this.scheme = checkNotNull(scheme);
6669
this.hosts = checkNotNull(hosts);
6770
this.username = username;
6871
this.password = password;
6972
this.databaseList = databaseList;
7073
this.collectionList = collectionList;
7174
this.connectionString =
72-
buildConnectionString(username, password, hosts, connectionOptions)
73-
.getConnectionString();
75+
buildConnectionString(username, password, scheme, hosts, connectionOptions);
7476
this.batchSize = batchSize;
7577
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
7678
this.pollMaxBatchSize = pollMaxBatchSize;
@@ -81,6 +83,10 @@ public class MongoDBSourceConfig implements SourceConfig {
8183
this.splitSizeMB = splitSizeMB;
8284
}
8385

86+
public String getScheme() {
87+
return scheme;
88+
}
89+
8490
public String getHosts() {
8591
return hosts;
8692
}
@@ -166,6 +172,7 @@ public boolean equals(Object o) {
166172
&& heartbeatIntervalMillis == that.heartbeatIntervalMillis
167173
&& splitMetaGroupSize == that.splitMetaGroupSize
168174
&& splitSizeMB == that.splitSizeMB
175+
&& Objects.equals(scheme, that.scheme)
169176
&& Objects.equals(hosts, that.hosts)
170177
&& Objects.equals(username, that.username)
171178
&& Objects.equals(password, that.password)
@@ -177,6 +184,7 @@ public boolean equals(Object o) {
177184
@Override
178185
public int hashCode() {
179186
return Objects.hash(
187+
scheme,
180188
hosts,
181189
username,
182190
password,

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import java.util.List;
2626

2727
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
28+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
29+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
2830
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
2931
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
3032
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
3133
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
3234
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
35+
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
3336
import static org.apache.flink.util.Preconditions.checkArgument;
3437
import static org.apache.flink.util.Preconditions.checkNotNull;
3538

@@ -39,6 +42,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
3942

4043
private static final long serialVersionUID = 1L;
4144

45+
private String scheme = SCHEME.defaultValue();
4246
private String hosts;
4347
private String username;
4448
private String password;
@@ -54,6 +58,17 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
5458
private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
5559
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
5660

61+
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
62+
public MongoDBSourceConfigFactory scheme(String scheme) {
63+
checkArgument(
64+
MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
65+
String.format(
66+
"The scheme should either be %s or %s",
67+
MONGODB_SCHEME, MONGODB_SRV_SCHEME));
68+
this.scheme = scheme;
69+
return this;
70+
}
71+
5772
/** The comma-separated list of hostname and port pairs of mongodb servers. */
5873
public MongoDBSourceConfigFactory hosts(String hosts) {
5974
this.hosts = hosts;
@@ -196,6 +211,7 @@ public void validate() {
196211
@Override
197212
public MongoDBSourceConfig create(int subtaskId) {
198213
return new MongoDBSourceConfig(
214+
scheme,
199215
hosts,
200216
username,
201217
password,

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,19 @@
2020
import org.apache.flink.configuration.ConfigOption;
2121
import org.apache.flink.configuration.ConfigOptions;
2222

23+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
24+
2325
/** Configurations for {@link com.ververica.cdc.connectors.mongodb.source.MongoDBSource}. */
2426
public class MongoDBSourceOptions {
2527

28+
public static final ConfigOption<String> SCHEME =
29+
ConfigOptions.key("scheme")
30+
.stringType()
31+
.defaultValue(MONGODB_SCHEME)
32+
.withDescription(
33+
"The protocol connected to MongoDB. eg. mongodb or mongodb+srv. "
34+
+ "The +srv indicates to the client that the hostname that follows corresponds to a DNS SRV record. Defaults to mongodb.");
35+
2636
public static final ConfigOption<String> HOSTS =
2737
ConfigOptions.key("hosts")
2838
.stringType()

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void execute(Context context) throws Exception {
130130
SourceRecord snapshotRecord =
131131
createSourceRecord(
132132
createPartitionMap(
133+
sourceConfig.getScheme(),
133134
sourceConfig.getHosts(),
134135
collectionId.catalog(),
135136
collectionId.table()),

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ public void execute(Context context) throws Exception {
145145
changeRecord =
146146
createSourceRecord(
147147
createPartitionMap(
148+
sourceConfig.getScheme(),
148149
sourceConfig.getHosts(),
149150
namespace.getDatabaseName(),
150151
namespace.getCollectionName()),
@@ -277,7 +278,7 @@ private HeartbeatManager openHeartbeatManagerIfNeeded(
277278
changeStreamCursor,
278279
sourceConfig.getHeartbeatIntervalMillis(),
279280
HEARTBEAT_TOPIC_NAME,
280-
createHeartbeatPartitionMap(sourceConfig.getHosts()));
281+
createHeartbeatPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts()));
281282
}
282283
return null;
283284
}

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ public static Map<String, String> createSourceOffsetMap(
175175
}
176176

177177
public static Map<String, String> createPartitionMap(
178-
String hosts, String database, String collection) {
178+
String scheme, String hosts, String database, String collection) {
179179
StringBuilder builder = new StringBuilder();
180-
builder.append("mongodb://");
180+
builder.append(String.format("%s://", scheme));
181181
builder.append(hosts);
182182
builder.append("/");
183183
if (StringUtils.isNotEmpty(database)) {
@@ -190,9 +190,9 @@ public static Map<String, String> createPartitionMap(
190190
return singletonMap(NAMESPACE_FIELD, builder.toString());
191191
}
192192

193-
public static Map<String, Object> createHeartbeatPartitionMap(String hosts) {
193+
public static Map<String, Object> createHeartbeatPartitionMap(String scheme, String hosts) {
194194
StringBuilder builder = new StringBuilder();
195-
builder.append("mongodb://");
195+
builder.append(String.format("%s://", scheme));
196196
builder.append(hosts);
197197
builder.append("/");
198198
builder.append(HEARTBEAT_TOPIC_NAME);

0 commit comments

Comments
 (0)