|
| 1 | +# MongoDB CDC Connector |
| 2 | + |
| 3 | +The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB. This document describes how to setup the MongoDB CDC connector to run SQL queries against MongoDB. |
| 4 | + |
| 5 | +Dependencies |
| 6 | +------------ |
| 7 | + |
| 8 | +In order to setup the MongoDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. |
| 9 | + |
| 10 | +### Maven dependency |
| 11 | +<!-- fixme: correct the version --> |
| 12 | +``` |
| 13 | +<dependency> |
| 14 | + <groupId>com.ververica</groupId> |
| 15 | + <artifactId>flink-connector-mongodb-cdc</artifactId> |
| 16 | + <version>2.1.0</version> |
| 17 | +</dependency> |
| 18 | +``` |
| 19 | + |
| 20 | +### SQL Client JAR |
| 21 | + |
| 22 | +<!-- fixme: correct the version --> |
| 23 | +Download [flink-sql-connector-mongodb-cdc-2.1.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.1.0/flink-sql-connector-mongodb-cdc-2.1.0.jar) and put it under `<FLINK_HOME>/lib/`. |
| 24 | + |
| 25 | +How to create a MongoDB CDC table |
| 26 | +---------------- |
| 27 | + |
| 28 | +The MongoDB CDC table can be defined as following: |
| 29 | + |
| 30 | +```sql |
| 31 | +-- register a MongoDB table 'products' in Flink SQL |
| 32 | +CREATE TABLE products ( |
| 33 | + _id STRING, // must be declared |
| 34 | + name STRING, |
| 35 | + weight DECIMAL(10,3), |
| 36 | + tags ARRAY<STRING>, -- array |
| 37 | + price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document |
| 38 | + suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents |
| 39 | + PRIMARY KEY(_id) NOT ENFORCED |
| 40 | +) WITH ( |
| 41 | + 'connector' = 'mongodb-cdc', |
| 42 | + 'uri' = 'mongodb://flinkuser:flinkpw@localhost:27017', |
| 43 | + 'database' = 'inventory', |
| 44 | + 'collection' = 'products' |
| 45 | +); |
| 46 | + |
| 47 | +-- read snapshot and change events from products collection |
| 48 | +SELECT * FROM products; |
| 49 | +``` |
| 50 | + |
| 51 | +Connector Options |
| 52 | +---------------- |
| 53 | + |
| 54 | +<div class="highlight"> |
| 55 | +<table class="colwidths-auto docutils"> |
| 56 | + <thead> |
| 57 | + <tr> |
| 58 | + <th class="text-left" style="width: 25%">Option</th> |
| 59 | + <th class="text-left" style="width: 8%">Required</th> |
| 60 | + <th class="text-left" style="width: 7%">Default</th> |
| 61 | + <th class="text-left" style="width: 10%">Type</th> |
| 62 | + <th class="text-left" style="width: 50%">Description</th> |
| 63 | + </tr> |
| 64 | + </thead> |
| 65 | + <tbody> |
| 66 | + <tr> |
| 67 | + <td>connector</td> |
| 68 | + <td>required</td> |
| 69 | + <td style="word-wrap: break-word;">(none)</td> |
| 70 | + <td>String</td> |
| 71 | + <td>Specify what connector to use, here should be <code>'mongodb-cdc'</code>.</td> |
| 72 | + </tr> |
| 73 | + <tr> |
| 74 | + <td>uri</td> |
| 75 | + <td>required</td> |
| 76 | + <td style="word-wrap: break-word;">(none)</td> |
| 77 | + <td>String</td> |
| 78 | + <td>A MongoDB connection URI string.</td> |
| 79 | + </tr> |
| 80 | + <tr> |
| 81 | + <td>database</td> |
| 82 | + <td>required</td> |
| 83 | + <td style="word-wrap: break-word;">(none)</td> |
| 84 | + <td>String</td> |
| 85 | + <td>Name of the database to watch for changes.</td> |
| 86 | + </tr> |
| 87 | + <tr> |
| 88 | + <td>collection</td> |
| 89 | + <td>required</td> |
| 90 | + <td style="word-wrap: break-word;">(none)</td> |
| 91 | + <td>String</td> |
| 92 | + <td>Name of the collection in the database to watch for changes.</td> |
| 93 | + </tr> |
| 94 | + <tr> |
| 95 | + <td>errors.tolerance</td> |
| 96 | + <td>optional</td> |
| 97 | + <td style="word-wrap: break-word;">none</td> |
| 98 | + <td>String</td> |
| 99 | + <td>Whether to continue processing messages if an error is encountered. |
| 100 | + Accept <code>none</code> or <code>all</code>. |
| 101 | + When set to <code>none</code>, the connector reports an error and blocks further processing of the rest of the records |
| 102 | + when it encounters an error. When set to <code>all</code>, the connector silently ignores any bad messages. |
| 103 | + </td> |
| 104 | + </tr> |
| 105 | + <tr> |
| 106 | + <td>errors.log.enable</td> |
| 107 | + <td>optional</td> |
| 108 | + <td style="word-wrap: break-word;">true</td> |
| 109 | + <td>Boolean</td> |
| 110 | + <td>Whether details of failed operations should be written to the log file.</td> |
| 111 | + </tr> |
| 112 | + <tr> |
| 113 | + <td>copy.existing</td> |
| 114 | + <td>optional</td> |
| 115 | + <td style="word-wrap: break-word;">true</td> |
| 116 | + <td>Boolean</td> |
| 117 | + <td>Whether copy existing data from source collections.</td> |
| 118 | + </tr> |
| 119 | + <tr> |
| 120 | + <td>copy.existing.pipeline</td> |
| 121 | + <td>optional</td> |
| 122 | + <td style="word-wrap: break-word;">(none)</td> |
| 123 | + <td>String</td> |
| 124 | + <td>An array of JSON objects describing the pipeline operations to run when copying existing data. |
| 125 | + We can set <code>[{"$match": {"operationType": "insert"}}]</code> to observe only insert change events. |
| 126 | + </td> |
| 127 | + </tr> |
| 128 | + <tr> |
| 129 | + <td>copy.existing.max.threads</td> |
| 130 | + <td>optional</td> |
| 131 | + <td style="word-wrap: break-word;">Processors Count</td> |
| 132 | + <td>Integer</td> |
| 133 | + <td>The number of threads to use when performing the data copy.</td> |
| 134 | + </tr> |
| 135 | + <tr> |
| 136 | + <td>copy.existing.queue.size</td> |
| 137 | + <td>optional</td> |
| 138 | + <td style="word-wrap: break-word;">16000</td> |
| 139 | + <td>Integer</td> |
| 140 | + <td>The max size of the queue to use when copying data.</td> |
| 141 | + </tr> |
| 142 | + <tr> |
| 143 | + <td>poll.max.batch.size</td> |
| 144 | + <td>optional</td> |
| 145 | + <td style="word-wrap: break-word;">1000</td> |
| 146 | + <td>Integer</td> |
| 147 | + <td>Maximum number of change stream documents to include in a single batch when polling for new data.</td> |
| 148 | + </tr> |
| 149 | + <tr> |
| 150 | + <td>poll.await.time.ms</td> |
| 151 | + <td>optional</td> |
| 152 | + <td style="word-wrap: break-word;">1500</td> |
| 153 | + <td>Integer</td> |
| 154 | + <td>The amount of time to wait before checking for new results on the change stream.</td> |
| 155 | + </tr> |
| 156 | + <tr> |
| 157 | + <td>heartbeat.interval.ms</td> |
| 158 | + <td>optional</td> |
| 159 | + <td style="word-wrap: break-word;">0</td> |
| 160 | + <td>Integer</td> |
| 161 | + <td>The length of time in milliseconds between sending heartbeat messages. Use 0 to disable.</td> |
| 162 | + </tr> |
| 163 | + </tbody> |
| 164 | +</table> |
| 165 | +</div> |
| 166 | + |
| 167 | +Note: `heartbeat.interval.ms` is highly recommended to set a proper value larger than 0 if the collection changes slowly. |
| 168 | +The heartbeat event can pushing the `resumeToken` forward to avoid `resumeToken` being expired when we recover the Flink job from checkpoint or savepoint. |
| 169 | + |
| 170 | +Features |
| 171 | +-------- |
| 172 | + |
| 173 | +### Exactly-Once Processing |
| 174 | + |
| 175 | +The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with **exactly-once processing** even failures happen. |
| 176 | + |
| 177 | +### Change Streams |
| 178 | + |
| 179 | +We integrates the [MongoDB's official Kafka Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) to read snapshot or change events from MongoDB and drive it by Debezium's `EmbeddedEngine`. |
| 180 | + |
| 181 | +Debezium's `EmbeddedEngine` provides a mechanism for running a single Kafka Connect `SourceConnector` within an application's process and it can drive any standard Kafka Connect `SourceConnector` properly even which is not provided by Debezium. |
| 182 | + |
| 183 | +We choose **MongoDB's official Kafka Connector** instead of the **Debezium's MongoDB Connector** cause they use a different change data capture mechanism. |
| 184 | + |
| 185 | +- For Debezium's MongoDB Connector, it read the `oplog.rs` collection of each replicaset's master node. |
| 186 | +- For MongoDB's Kafka Connector, it subscribe `Change Stream` of MongoDB. |
| 187 | + |
| 188 | +MongoDB's `oplog.rs` collection doesn't keep the changed record's update before state, so it's hard to extract the full document state by a single `oplog.rs` record and convert it to change log stream accepted by Flink (Insert Only, Upsert, All). |
| 189 | +Additionally, MongoDB 5 (released in July) has changed the oplog format, so the current Debezium connector cannot be used with it. |
| 190 | + |
| 191 | +**Change Stream** is a new feature provided by MongoDB 3.6 for replica sets and sharded clusters allows applications to access real-time data changes without the complexity and risk of tailing the oplog. |
| 192 | +Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. |
| 193 | + |
| 194 | +**Lookup Full Document for Update Operations** is a feature provided by **Change Stream** which can configure the change stream to return the most current majority-committed version of the updated document. Because of this, we can easily collect the latest full document and convert the change log to Flink's **Upsert Changelog Stream**. |
| 195 | + |
| 196 | +**Note That** |
| 197 | + |
| 198 | +- A dynamic table that is converted into an upsert stream requires a unique key, so we must declare `_id` as primary key if we use Flink SQL. We can't declare other column as primary key, becauce delete operation do not contain's the key and value besides `_id` and `sharding key`. |
| 199 | +- `changeStream` and `read` privileges are required by MongoDB Kafka Connector. You can use the following example for simple authorization. For more detailed authorization, please refer to [MongoDB Database User Roles](https://docs.mongodb.com/manual/reference/built-in-roles/#database-user-roles). |
| 200 | + |
| 201 | +```javascript |
| 202 | +use admin; |
| 203 | +db.createUser( |
| 204 | + { |
| 205 | + user: "flinkuser", |
| 206 | + pwd: "flinkpw", |
| 207 | + roles: [ |
| 208 | + { role: "read", db: "admin" }, //read role contains changeStream privilege |
| 209 | + { role: "readAnyDatabase", db: "admin" } //for snapshot reading |
| 210 | + ] |
| 211 | + } |
| 212 | +); |
| 213 | +``` |
| 214 | + |
| 215 | +By the way, Debezium's MongoDB change streams exploration mentioned by [DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap. |
| 216 | +If it's done, we can provide both connectors for users to choose. |
| 217 | + |
| 218 | +### DataStream Source |
| 219 | + |
| 220 | +The MongoDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: |
| 221 | + |
| 222 | +```java |
| 223 | +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| 224 | +import org.apache.flink.streaming.api.functions.source.SourceFunction; |
| 225 | +import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; |
| 226 | +import com.ververica.cdc.connectors.mongodb.MongoDBSource; |
| 227 | + |
| 228 | +public class MongoDBSourceExample { |
| 229 | + public static void main(String[] args) throws Exception { |
| 230 | + SourceFunction<String> sourceFunction = MongoDBSource.<String>builder() |
| 231 | + .connectionUri("mongodb://localhost:27017") |
| 232 | + .database("inventory") |
| 233 | + .collection("products") |
| 234 | + .deserializer(new StringDebeziumDeserializationSchema()) |
| 235 | + .build(); |
| 236 | + |
| 237 | + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| 238 | + |
| 239 | + env.addSource(sourceFunction) |
| 240 | + .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering |
| 241 | + |
| 242 | + env.execute(); |
| 243 | + } |
| 244 | +} |
| 245 | +``` |
| 246 | + |
| 247 | + |
| 248 | +Data Type Mapping |
| 249 | +---------------- |
| 250 | + |
| 251 | +<div class="wy-table-responsive"> |
| 252 | +<table class="colwidths-auto docutils"> |
| 253 | + <thead> |
| 254 | + <tr> |
| 255 | + <th class="text-left">BSON type<a href="https://docs.mongodb.com/manual/reference/bson-types/"></a></th> |
| 256 | + <th class="text-left">Flink SQL type<a href="{% link dev/table/types.md %}"></a></th> |
| 257 | + </tr> |
| 258 | + </thead> |
| 259 | + <tbody> |
| 260 | + <tr> |
| 261 | + <td></td> |
| 262 | + <td>TINYINT</td> |
| 263 | + </tr> |
| 264 | + <tr> |
| 265 | + <td></td> |
| 266 | + <td>SMALLINT</td> |
| 267 | + </tr> |
| 268 | + <tr> |
| 269 | + <td> |
| 270 | + Int<br> |
| 271 | + <td>INT</td> |
| 272 | + </tr> |
| 273 | + <tr> |
| 274 | + <td>Long</td> |
| 275 | + <td>BIGINT</td> |
| 276 | + </tr> |
| 277 | + <tr> |
| 278 | + <td></td> |
| 279 | + <td>FLOAT</td> |
| 280 | + </tr> |
| 281 | + <tr> |
| 282 | + <td>Double</td> |
| 283 | + <td>DOUBLE</td> |
| 284 | + </tr> |
| 285 | + <tr> |
| 286 | + <td>Decimal128</td> |
| 287 | + <td>DECIMAL(p, s)</td> |
| 288 | + </tr> |
| 289 | + <tr> |
| 290 | + <td>Boolean</td> |
| 291 | + <td>BOOLEAN</td> |
| 292 | + </tr> |
| 293 | + <tr> |
| 294 | + <td>Date</br>Timestamp</td> |
| 295 | + <td>DATE</td> |
| 296 | + </tr> |
| 297 | + <tr> |
| 298 | + <td>Date</br>Timestamp</td> |
| 299 | + <td>TIME</td> |
| 300 | + </tr> |
| 301 | + <tr> |
| 302 | + <td>Date</td> |
| 303 | + <td>TIMESTAMP_LTZ(3)</td> |
| 304 | + </tr> |
| 305 | + <tr> |
| 306 | + <td>Timestamp</td> |
| 307 | + <td>TIMESTAMP_LTZ(0)</td> |
| 308 | + </tr> |
| 309 | + <tr> |
| 310 | + <td> |
| 311 | + String<br> |
| 312 | + ObjectId<br> |
| 313 | + UUID<br> |
| 314 | + Symbol<br> |
| 315 | + MD5<br> |
| 316 | + JavaScript</br> |
| 317 | + Regex</td> |
| 318 | + <td>STRING</td> |
| 319 | + </tr> |
| 320 | + <tr> |
| 321 | + <td>BinData</td> |
| 322 | + <td>BYTES</td> |
| 323 | + </tr> |
| 324 | + <tr> |
| 325 | + <td>Object</td> |
| 326 | + <td>ROW</td> |
| 327 | + </tr> |
| 328 | + <tr> |
| 329 | + <td>Array</td> |
| 330 | + <td>ARRAY</td> |
| 331 | + </tr> |
| 332 | + <tr> |
| 333 | + <td>DBPointer</td> |
| 334 | + <td>ROW<$ref STRING, $id STRING></td> |
| 335 | + </tr> |
| 336 | + <tr> |
| 337 | + <td> |
| 338 | + <a href="https://docs.mongodb.com/manual/reference/geojson/">GeoJSON</a> |
| 339 | + </td> |
| 340 | + <td> |
| 341 | + Point : ROW<type STRING, coordinates ARRAY<DOUBLE>></br> |
| 342 | + Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>></br> |
| 343 | + ... |
| 344 | + </td> |
| 345 | + </tr> |
| 346 | + </tbody> |
| 347 | +</table> |
| 348 | +</div> |
| 349 | + |
| 350 | + |
| 351 | +Reference |
| 352 | +-------- |
| 353 | + |
| 354 | +- [Change Streams](https://docs.mongodb.com/manual/changeStreams/) |
| 355 | +- [MongoDB Database User Roles](https://docs.mongodb.com/manual/reference/built-in-roles/#database-user-roles) |
| 356 | +- [MongoDB Kafaka Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) |
| 357 | +- [BSON Types](https://docs.mongodb.com/manual/reference/bson-types/) |
0 commit comments