Skip to content

Commit 80012b8

Browse files
Jiabao-SunleonardBang
authored andcommitted
[mongodb] Support to connect MongoDB without authentication (apache#642)
1 parent 6f4e04f commit 80012b8

File tree

2 files changed

+35
-4
lines changed

2 files changed

+35
-4
lines changed

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.ververica.cdc.connectors.mongodb;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223

2324
import com.mongodb.ConnectionString;
2425
import com.mongodb.client.model.changestream.FullDocument;
@@ -321,14 +322,18 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
321322
}
322323

323324
/** Build connection uri. */
324-
private ConnectionString buildConnectionUri() {
325+
@VisibleForTesting
326+
public ConnectionString buildConnectionUri() {
325327
StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
326328

327-
if (username != null && password != null) {
328-
sb.append(encodeValue(username)).append(":").append(encodeValue(password));
329+
if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
330+
sb.append(encodeValue(username))
331+
.append(":")
332+
.append(encodeValue(password))
333+
.append("@");
329334
}
330335

331-
sb.append("@").append(checkNotNull(hosts));
336+
sb.append(checkNotNull(hosts));
332337

333338
if (StringUtils.isNotEmpty(connectionOptions)) {
334339
sb.append("/?").append(connectionOptions);

flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/MongoDBSourceTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.util.Preconditions;
3636

3737
import com.jayway.jsonpath.JsonPath;
38+
import com.mongodb.ConnectionString;
3839
import com.mongodb.client.MongoCollection;
3940
import com.mongodb.client.model.Filters;
4041
import com.mongodb.client.model.Updates;
@@ -347,6 +348,31 @@ public void go() throws Exception {
347348
}
348349
}
349350

351+
@Test
352+
public void testConnectionUri() {
353+
String hosts = MONGODB_CONTAINER.getHostAndPort();
354+
355+
ConnectionString case0 = MongoDBSource.builder().hosts(hosts).buildConnectionUri();
356+
assertEquals(String.format("mongodb://%s", hosts), case0.toString());
357+
358+
ConnectionString case1 =
359+
MongoDBSource.builder().username("").hosts(hosts).buildConnectionUri();
360+
assertEquals(String.format("mongodb://%s", hosts), case1.toString());
361+
362+
ConnectionString case2 =
363+
MongoDBSource.builder().password("").hosts(hosts).buildConnectionUri();
364+
assertEquals(String.format("mongodb://%s", hosts), case2.toString());
365+
366+
ConnectionString case3 =
367+
MongoDBSource.builder()
368+
.username(FLINK_USER)
369+
.password(FLINK_USER_PASSWORD)
370+
.hosts(hosts)
371+
.buildConnectionUri();
372+
assertEquals(FLINK_USER, case3.getUsername());
373+
assertEquals(FLINK_USER_PASSWORD, new String(case3.getPassword()));
374+
}
375+
350376
// ------------------------------------------------------------------------------------------
351377
// Utilities
352378
// ------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)