Skip to content

Commit c04dc6e

Browse files
Jiabao-Sunbobby-richard
authored andcommitted
[mongodb] Match multiple database and collection names using a regular expression in MongoDB. (apache#940)
Co-authored-by: Bobby Richard <[email protected]>
1 parent ab5448a commit c04dc6e

File tree

17 files changed

+857
-206
lines changed

17 files changed

+857
-206
lines changed

docs/content/connectors/mongodb-cdc.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,19 @@ Connector Options
151151
</tr>
152152
<tr>
153153
<td>database</td>
154-
<td>required</td>
154+
<td>optional</td>
155155
<td style="word-wrap: break-word;">(none)</td>
156156
<td>String</td>
157-
<td>Name of the database to watch for changes.</td>
157+
<td>Name of the database to watch for changes. If not set then all databases will be captured. <br>
158+
The database also supports regular expressions to monitor multiple databases matching the regular expression.</td>
158159
</tr>
159160
<tr>
160161
<td>collection</td>
161-
<td>required</td>
162+
<td>optional</td>
162163
<td style="word-wrap: break-word;">(none)</td>
163164
<td>String</td>
164-
<td>Name of the collection in the database to watch for changes.</td>
165+
<td>Name of the collection in the database to watch for changes. If not set then all collections will be captured.<br>
166+
The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers.</td>
165167
</tr>
166168
<tr>
167169
<td>connection.options</td>
@@ -324,7 +326,7 @@ This can filter only required data and improve the use of indexes by the copying
324326
In the following example, the `$match` aggregation operator ensures that only documents in which the closed field is set to false are copied.
325327

326328
```
327-
copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
329+
'copy.existing.pipeline' = '[ { "$match": { "closed": "false" } } ]'
328330
```
329331

330332
### Change Streams

flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
3131
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
3232
import com.ververica.cdc.connectors.tests.utils.TestUtils;
33+
import org.apache.commons.lang3.StringUtils;
3334
import org.bson.Document;
3435
import org.bson.types.ObjectId;
3536
import org.junit.After;
@@ -82,14 +83,12 @@ public void before() {
8283
// please check your '/etc/hosts' file contains the line 'internet_ip(not 127.0.0.1)
8384
// hostname' e.g: '30.225.0.87 leonard.machine'
8485
mongodb =
85-
new MongoDBContainer()
86-
.withNetwork(NETWORK)
86+
new MongoDBContainer(NETWORK)
8787
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)
8888
.withLogConsumer(new Slf4jLogConsumer(LOG));
8989

9090
Startables.deepStart(Stream.of(mongodb)).join();
9191

92-
executeCommandFileInMongoDB("mongo_setup", "admin");
9392
MongoClientSettings settings =
9493
MongoClientSettings.builder()
9594
.applyConnectionString(
@@ -225,7 +224,7 @@ private String executeCommandFileInMongoDB(String fileNameIgnoreSuffix, String d
225224
String command1 =
226225
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
227226
.map(String::trim)
228-
.filter(x -> !x.startsWith("//") && !x.isEmpty())
227+
.filter(x -> StringUtils.isNotBlank(x) && !x.trim().startsWith("//"))
229228
.map(
230229
x -> {
231230
final Matcher m = COMMENT_PATTERN.matcher(x);

flink-cdc-e2e-tests/src/test/resources/ddl/mongo_setup.js

Lines changed: 0 additions & 48 deletions
This file was deleted.

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,13 @@
3636
import java.io.UnsupportedEncodingException;
3737
import java.net.URLEncoder;
3838
import java.nio.charset.StandardCharsets;
39+
import java.util.Arrays;
40+
import java.util.List;
3941
import java.util.Locale;
4042
import java.util.Properties;
4143

44+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
45+
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
4246
import static org.apache.flink.util.Preconditions.checkArgument;
4347
import static org.apache.flink.util.Preconditions.checkNotNull;
4448

@@ -123,10 +127,9 @@ public static class Builder<T> {
123127
private String hosts;
124128
private String username;
125129
private String password;
126-
private String database;
127-
private String collection;
130+
private List<String> databaseList;
131+
private List<String> collectionList;
128132
private String connectionOptions;
129-
private String pipeline;
130133
private Integer batchSize;
131134
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
132135
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
@@ -167,24 +170,18 @@ public Builder<T> password(String password) {
167170
return this;
168171
}
169172

170-
/** Name of the database to watch for changes. */
171-
public Builder<T> database(String database) {
172-
this.database = database;
173-
return this;
174-
}
175-
176-
/** Name of the collection in the database to watch for changes. */
177-
public Builder<T> collection(String collection) {
178-
this.collection = collection;
173+
/** Regular expressions list that match database names to be monitored. */
174+
public Builder<T> databaseList(String... databaseList) {
175+
this.databaseList = Arrays.asList(databaseList);
179176
return this;
180177
}
181178

182179
/**
183-
* An array of objects describing the pipeline operations to run. eg. [{"$match":
184-
* {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]
180+
* Regular expressions that match fully-qualified collection identifiers for collections to
181+
* be monitored. Each identifier is of the form {@code <databaseName>.<collectionName>}.
185182
*/
186-
public Builder<T> pipeline(String pipeline) {
187-
this.pipeline = pipeline;
183+
public Builder<T> collectionList(String... collectionList) {
184+
this.collectionList = Arrays.asList(collectionList);
188185
return this;
189186
}
190187

@@ -353,11 +350,17 @@ public DebeziumSourceFunction<T> build() {
353350
"connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
354351
props.setProperty("name", "mongodb_binlog_source");
355352

353+
ConnectionString connectionString = buildConnectionUri();
356354
props.setProperty(
357-
MongoSourceConfig.CONNECTION_URI_CONFIG, String.valueOf(buildConnectionUri()));
355+
MongoSourceConfig.CONNECTION_URI_CONFIG, String.valueOf(connectionString));
356+
357+
if (databaseList != null) {
358+
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
359+
}
358360

359-
props.setProperty(MongoSourceConfig.DATABASE_CONFIG, checkNotNull(database));
360-
props.setProperty(MongoSourceConfig.COLLECTION_CONFIG, checkNotNull(collection));
361+
if (collectionList != null) {
362+
props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
363+
}
361364

362365
props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
363366
props.setProperty(
@@ -372,10 +375,6 @@ public DebeziumSourceFunction<T> build() {
372375
props.setProperty(
373376
MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA_VALUE_DEFAULT);
374377

375-
if (pipeline != null) {
376-
props.setProperty(MongoSourceConfig.PIPELINE_CONFIG, pipeline);
377-
}
378-
379378
if (batchSize != null) {
380379
props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
381380
}

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818

1919
package com.ververica.cdc.connectors.mongodb.internal;
2020

21+
import com.mongodb.ConnectionString;
22+
import com.mongodb.MongoNamespace;
23+
import com.mongodb.client.MongoClient;
24+
import com.mongodb.client.MongoClients;
25+
import com.mongodb.kafka.connect.source.MongoSourceConfig;
2126
import com.mongodb.kafka.connect.source.MongoSourceTask;
2227
import io.debezium.connector.AbstractSourceInfo;
2328
import io.debezium.connector.SnapshotRecord;
@@ -29,21 +34,45 @@
2934
import org.apache.kafka.connect.source.SourceRecord;
3035
import org.apache.kafka.connect.source.SourceTask;
3136
import org.apache.kafka.connect.source.SourceTaskContext;
37+
import org.bson.conversions.Bson;
3238
import org.bson.json.JsonReader;
3339

3440
import java.lang.reflect.Field;
3541
import java.time.Instant;
42+
import java.util.ArrayList;
43+
import java.util.Arrays;
3644
import java.util.LinkedList;
3745
import java.util.List;
3846
import java.util.Map;
47+
import java.util.Optional;
3948
import java.util.concurrent.atomic.AtomicBoolean;
49+
import java.util.regex.Pattern;
50+
import java.util.stream.Collectors;
51+
52+
import static com.mongodb.client.model.Aggregates.match;
53+
import static com.mongodb.client.model.Filters.and;
54+
import static com.mongodb.client.model.Filters.regex;
55+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.ADD_NS_FIELD;
56+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.ADD_NS_FIELD_NAME;
57+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.bsonListToJson;
58+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.collectionNames;
59+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.collectionsFilter;
60+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.completionPattern;
61+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.databaseFilter;
62+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.databaseNames;
63+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.includeListAsPatterns;
64+
import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.isIncludeListExplicitlySpecified;
4065

4166
/**
4267
* Source Task that proxies mongodb kafka connector's {@link MongoSourceTask} to adapt to {@link
4368
* com.ververica.cdc.debezium.internal.DebeziumChangeFetcher}.
4469
*/
4570
public class MongoDBConnectorSourceTask extends SourceTask {
4671

72+
public static final String DATABASE_INCLUDE_LIST = "database.include.list";
73+
74+
public static final String COLLECTION_INCLUDE_LIST = "collection.include.list";
75+
4776
private static final String TRUE = "true";
4877

4978
private static final Schema HEARTBEAT_VALUE_SCHEMA =
@@ -77,6 +106,7 @@ public void initialize(SourceTaskContext context) {
77106

78107
@Override
79108
public void start(Map<String, String> props) {
109+
initCapturedCollections(props);
80110
target.start(props);
81111
isInSnapshotPhase = isCopying();
82112
}
@@ -226,4 +256,103 @@ private boolean isCopying() {
226256
throw new IllegalStateException("Cannot access isCopying field of SourceTask", e);
227257
}
228258
}
259+
260+
private void initCapturedCollections(Map<String, String> props) {
261+
ConnectionString connectionString =
262+
new ConnectionString(props.get(MongoSourceConfig.CONNECTION_URI_CONFIG));
263+
264+
String databaseIncludeList = props.get(DATABASE_INCLUDE_LIST);
265+
String collectionIncludeList = props.get(COLLECTION_INCLUDE_LIST);
266+
267+
List<String> databaseList =
268+
Optional.ofNullable(databaseIncludeList)
269+
.map(input -> Arrays.asList(input.split(",")))
270+
.orElse(null);
271+
272+
List<String> collectionList =
273+
Optional.ofNullable(collectionIncludeList)
274+
.map(input -> Arrays.asList(input.split(",")))
275+
.orElse(null);
276+
277+
if (collectionList != null) {
278+
// Watching collections changes
279+
List<String> discoveredDatabases;
280+
List<String> discoveredCollections;
281+
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
282+
discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
283+
discoveredCollections =
284+
collectionNames(
285+
mongoClient,
286+
discoveredDatabases,
287+
collectionsFilter(collectionList));
288+
}
289+
290+
// case: database = db0, collection = coll1
291+
if (isIncludeListExplicitlySpecified(collectionList, discoveredCollections)) {
292+
MongoNamespace namespace = new MongoNamespace(discoveredCollections.get(0));
293+
props.put(MongoSourceConfig.DATABASE_CONFIG, namespace.getDatabaseName());
294+
props.put(MongoSourceConfig.COLLECTION_CONFIG, namespace.getCollectionName());
295+
} else { // case: database = db0|db2, collection = (db0.coll[0-9])|(db1.coll[1-2])
296+
String namespacesRegex =
297+
includeListAsPatterns(collectionList).stream()
298+
.map(Pattern::pattern)
299+
.collect(Collectors.joining("|"));
300+
301+
List<Bson> pipeline = new ArrayList<>();
302+
pipeline.add(ADD_NS_FIELD);
303+
304+
Bson nsFilter = regex(ADD_NS_FIELD_NAME, namespacesRegex);
305+
if (databaseList != null) {
306+
String databaseRegex =
307+
includeListAsPatterns(databaseList).stream()
308+
.map(Pattern::pattern)
309+
.collect(Collectors.joining("|"));
310+
Bson dbFilter = regex("ns.db", databaseRegex);
311+
nsFilter = and(dbFilter, nsFilter);
312+
}
313+
pipeline.add(match(nsFilter));
314+
315+
props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));
316+
317+
String copyExistingNamespaceRegex =
318+
discoveredCollections.stream()
319+
.map(ns -> completionPattern(ns).pattern())
320+
.collect(Collectors.joining("|"));
321+
322+
props.put(
323+
MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
324+
copyExistingNamespaceRegex);
325+
}
326+
} else if (databaseList != null) {
327+
// Watching databases changes
328+
List<String> discoveredDatabases;
329+
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
330+
discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
331+
}
332+
333+
if (isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
334+
props.put(MongoSourceConfig.DATABASE_CONFIG, discoveredDatabases.get(0));
335+
} else {
336+
String databaseRegex =
337+
includeListAsPatterns(databaseList).stream()
338+
.map(Pattern::pattern)
339+
.collect(Collectors.joining("|"));
340+
341+
List<Bson> pipeline = new ArrayList<>();
342+
pipeline.add(match(regex("ns.db", databaseRegex)));
343+
props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));
344+
345+
String copyExistingNamespaceRegex =
346+
discoveredDatabases.stream()
347+
.map(db -> completionPattern(db + "\\..*").pattern())
348+
.collect(Collectors.joining("|"));
349+
350+
props.put(
351+
MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
352+
copyExistingNamespaceRegex);
353+
}
354+
} else {
355+
// Watching all changes on the cluster by default, we do nothing here
356+
}
357+
}
229358
}

0 commit comments

Comments
 (0)