Skip to content

Commit 6fad791

Browse files
committed
Use new config options hosts, user, password instead of uri
1 parent d3a1cd8 commit 6fad791

File tree

3 files changed

+93
-108
lines changed

3 files changed

+93
-108
lines changed

docs/content/connectors/mongodb-cdc.md

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ CREATE TABLE products (
8181
PRIMARY KEY(_id) NOT ENFORCED
8282
) WITH (
8383
'connector' = 'mongodb-cdc',
84-
'uri' = 'mongodb://flinkuser:flinkpw@localhost:27017',
84+
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
85+
'user' = 'flinkuser',
86+
'password' = 'flinkpw',
8587
'database' = 'inventory',
8688
'collection' = 'products'
8789
);
@@ -119,11 +121,31 @@ Connector Options
119121
<td>Specify what connector to use, here should be <code>'mongodb-cdc'</code>.</td>
120122
</tr>
121123
<tr>
122-
<td>uri</td>
124+
<td>hosts</td>
123125
<td>required</td>
124126
<td style="word-wrap: break-word;">(none)</td>
125127
<td>String</td>
126-
<td>A MongoDB connection URI string.</td>
128+
<td>The comma-separated list of hostname and port pairs of the MongoDB servers.<br>
129+
eg. localhost:27017,localhost:27018
130+
</td>
131+
</tr>
132+
<tr>
133+
<td>user</td>
134+
<td>optional</td>
135+
<td style="word-wrap: break-word;">(none)</td>
136+
<td>String</td>
137+
<td>Name of the database user to be used when connecting to MongoDB.<br>
138+
This is required only when MongoDB is configured to use authentication.
139+
</td>
140+
</tr>
141+
<tr>
142+
<td>password</td>
143+
<td>optional</td>
144+
<td style="word-wrap: break-word;">(none)</td>
145+
<td>String</td>
146+
<td>Password to be used when connecting to MongoDB.<br>
147+
This is required only when MongoDB is configured to use authentication.
148+
</td>
127149
</tr>
128150
<tr>
129151
<td>database</td>
@@ -139,6 +161,60 @@ Connector Options
139161
<td>String</td>
140162
<td>Name of the collection in the database to watch for changes.</td>
141163
</tr>
164+
<tr>
165+
<td>mongodb.replicaset</td>
166+
<td>optional</td>
167+
<td style="word-wrap: break-word;">(none)</td>
168+
<td>String</td>
169+
<td>Specifies the name of the replica set. <br>
170+
It is not necessary, but can speed up your connection times to
171+
explicitly state the servers are in a replica set in the connection.
172+
</td>
173+
</tr>
174+
<tr>
175+
<td>mongodb.authsource</td>
176+
<td>optional</td>
177+
<td style="word-wrap: break-word;">admin</td>
178+
<td>String</td>
179+
<td>Database (authentication source) containing MongoDB credentials. <br>
180+
This is required only when MongoDB is configured to use authentication
181+
with another authentication database than admin.
182+
</td>
183+
</tr>
184+
<tr>
185+
<td>mongodb.connect.timeout.ms</td>
186+
<td>optional</td>
187+
<td style="word-wrap: break-word;">10000</td>
188+
<td>String</td>
189+
<td>The time in milliseconds to attempt a connection before timing out. <br>
190+
Defaults to 10 seconds.
191+
</td>
192+
</tr>
193+
<tr>
194+
<td>mongodb.socket.timeout.ms</td>
195+
<td>optional</td>
196+
<td style="word-wrap: break-word;">0</td>
197+
<td>String</td>
198+
<td>The time in milliseconds to attempt a send or receive on a socket before the attempt times out. <br>
199+
A value of 0 disables this behavior.
200+
</td>
201+
</tr>
202+
<tr>
203+
<td>mongodb.ssl.enabled</td>
204+
<td>optional</td>
205+
<td style="word-wrap: break-word;">false</td>
206+
<td>Boolean</td>
207+
<td>Connector will use SSL to connect to MongoDB instances.</td>
208+
</tr>
209+
<tr>
210+
<td>mongodb.ssl.invalid.hostname.allowed</td>
211+
<td>optional</td>
212+
<td style="word-wrap: break-word;">false</td>
213+
<td>Boolean</td>
214+
<td>When SSL is enabled this setting controls
215+
whether strict hostname checking is disabled during connection phase.
216+
</td>
217+
</tr>
142218
<tr>
143219
<td>errors.tolerance</td>
144220
<td>optional</td>
@@ -208,15 +284,8 @@ Connector Options
208284
<td>Integer</td>
209285
<td>The length of time in milliseconds between sending heartbeat messages. Use 0 to disable.</td>
210286
</tr>
211-
<tr>
212-
<td>local-time-zone</td>
213-
<td>optional</td>
214-
<td style="word-wrap: break-word;">UTC</td>
215-
<td>String</td>
216-
<td>The local time zone.</td>
217-
</tr>
218287
</tbody>
219-
</table>
288+
</table>
220289
</div>
221290

222291
Note: `heartbeat.interval.ms` is highly recommended to set a proper value larger than 0 if the collection changes slowly.
@@ -229,6 +298,10 @@ Features
229298

230299
The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with **exactly-once processing** even failures happen.
231300

301+
### Whether Snapshot When Startup
302+
303+
The config option `copy.existing` specifies whether do snapshot when MongoDB CDC consumer startup. Defaults to `true`.
304+
232305
### Change Streams
233306

234307
We integrates the [MongoDB's official Kafka Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) to read snapshot or change events from MongoDB and drive it by Debezium's `EmbeddedEngine`.
@@ -264,7 +337,9 @@ import com.ververica.cdc.connectors.mongodb.MongoDBSource;
264337
public class MongoDBSourceExample {
265338
public static void main(String[] args) throws Exception {
266339
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
267-
.connectionUri("mongodb://localhost:27017")
340+
.hosts("localhost:27017")
341+
.user("flink")
342+
.password("flinkpw")
268343
.database("inventory")
269344
.collection("products")
270345
.deserializer(new StringDebeziumDeserializationSchema())

docs/content/tutorials/mongodb-tutorial-zh.md

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,9 @@ Flink SQL> CREATE TABLE shipments (
229229
PRIMARY KEY (_id) NOT ENFORCED
230230
) WITH (
231231
'connector' = 'mongodb-cdc',
232-
'uri' = 'mongodb://mongouser:mongopw@localhost:27017',
232+
'hosts' = 'localhost:27017',
233+
'user' = 'mongodb',
234+
'password' = 'mongopw',
233235
'database' = 'mgdb',
234236
'collection' = 'customers'
235237
);
@@ -293,50 +295,3 @@ db.customers.updateOne(
293295
--MySQL
294296
DELETE FROM orders WHERE order_id = 10004;
295297
```
296-
297-
7. Kafka changelog json format
298-
299-
在 SQL CLI 中:
300-
301-
```sql
302-
--Flink SQL
303-
Flink SQL> CREATE TABLE kafka_gmv (
304-
day_str STRING,
305-
gmv DECIMAL(10, 5)
306-
) WITH (
307-
'connector' = 'kafka',
308-
'topic' = 'kafka_gmv',
309-
'scan.startup.mode' = 'earliest-offset',
310-
'properties.bootstrap.servers' = 'localhost:9092',
311-
'format' = 'changelog-json'
312-
);
313-
314-
Flink SQL> INSERT INTO kafka_gmv
315-
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
316-
FROM orders
317-
WHERE order_status = true
318-
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
319-
320-
-- 读取 Kafka 的 changelog 数据,观察 materialize 后的结果
321-
Flink SQL> SELECT * FROM kafka_gmv;
322-
```
323-
观察 kafka 的输出:
324-
325-
```
326-
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
327-
```
328-
329-
更新 orders 数据,观察SQL CLI 和 kafka console 的输出
330-
```sql
331-
-- MySQL
332-
UPDATE orders SET order_status = true WHERE order_id = 10001;
333-
UPDATE orders SET order_status = true WHERE order_id = 10002;
334-
UPDATE orders SET order_status = true WHERE order_id = 10003;
335-
336-
INSERT INTO orders
337-
VALUES (default, '2020-07-30 17:33:00', 1001, 50.00, 104, true);
338-
339-
UPDATE orders SET price = 40.00 WHERE order_id = 10005;
340-
341-
DELETE FROM orders WHERE order_id = 10005;
342-
```

docs/content/tutorials/mongodb-tutorial.md

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,9 @@ Flink SQL> CREATE TABLE shipments (
229229
PRIMARY KEY (_id) NOT ENFORCED
230230
) WITH (
231231
'connector' = 'mongodb-cdc',
232-
'uri' = 'mongodb://mongouser:mongopw@localhost:27017',
232+
'hosts' = 'localhost:27017',
233+
'user' = 'mongodb',
234+
'password' = 'mongopw',
233235
'database' = 'mgdb',
234236
'collection' = 'customers'
235237
);
@@ -293,50 +295,3 @@ db.customers.updateOne(
293295
--MySQL
294296
DELETE FROM orders WHERE order_id = 10004;
295297
```
296-
297-
8. Kafka Changelog JSON format
298-
299-
Execute following SQL in Flink SQL CLI:
300-
301-
```sql
302-
-- Flink SQL
303-
Flink SQL> CREATE TABLE kafka_gmv (
304-
day_str STRING,
305-
gmv DECIMAL(10, 5)
306-
) WITH (
307-
'connector' = 'kafka',
308-
'topic' = 'kafka_gmv',
309-
'scan.startup.mode' = 'earliest-offset',
310-
'properties.bootstrap.servers' = 'localhost:9092',
311-
'format' = 'changelog-json'
312-
);
313-
314-
Flink SQL> INSERT INTO kafka_gmv
315-
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
316-
FROM orders
317-
WHERE order_status = true
318-
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
319-
320-
-- Consumer changelog data from Kafka, and check the result of materialized view:
321-
Flink SQL> SELECT * FROM kafka_gmv;
322-
```
323-
To consumer records in Kafka using `kafka-console-consumer`:
324-
325-
```
326-
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
327-
```
328-
329-
Update orders data and check the output in Flink SQL CLI and Kafka console consumer:
330-
```sql
331-
-- MySQL
332-
UPDATE orders SET order_status = true WHERE order_id = 10001;
333-
UPDATE orders SET order_status = true WHERE order_id = 10002;
334-
UPDATE orders SET order_status = true WHERE order_id = 10003;
335-
336-
INSERT INTO orders
337-
VALUES (default, '2020-07-30 17:33:00', 1001, 50.00, 104, true);
338-
339-
UPDATE orders SET price = 40.00 WHERE order_id = 10005;
340-
341-
DELETE FROM orders WHERE order_id = 10005;
342-
```

0 commit comments

Comments
 (0)