Skip to content

Conversation

Jiabao-Sun
Copy link
Contributor

@Jiabao-Sun Jiabao-Sun commented Sep 4, 2021

MongoDB CDC Connector

The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB. This document describes how to setup the MongoDB CDC connector to run SQL queries against MongoDB.

Dependencies

In order to setup the MongoDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mongodb-cdc</artifactId>
  <version>2.1.0</version>
</dependency>

SQL Client JAR

Download flink-sql-connector-mongodb-cdc-2.1.0.jar and put it under <FLINK_HOME>/lib/.

Setup MongoDB

Availability

  • MongoDB version

    MongoDB version >= 3.6

    We use change streams feature (new in version 3.6) to capture change data.

  • Cluster Deployment

    replica sets or sharded clusters is required.

  • Storage Engine

    WiredTiger storage engine is required.

  • Replica set protocol version

    Replica set protocol version 1 (pv1) is required.

    Starting in version 4.0, MongoDB only supports pv1. pv1 is the default for all new replica sets created with MongoDB 3.2 or later.

  • Privileges

    changeStream and read privileges are required by MongoDB Kafka Connector.

    You can use the following example for simple authorization.

    For more detailed authorization, please refer to MongoDB Database User Roles.

    use admin;
    db.createUser({
      user: "flinkuser",
      pwd: "flinkpw",
      roles: [
        { role: "read", db: "admin" }, //read role includes changeStream privilege 
        { role: "readAnyDatabase", db: "admin" } //for snapshot reading
      ]
    });

How to create a MongoDB CDC table

The MongoDB CDC table can be defined as following:

-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE products (
  _id STRING, // must be declared
  name STRING,
  weight DECIMAL(10,3),
  tags ARRAY<STRING>, -- array
  price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'products'
);

-- read snapshot and change events from products collection
SELECT * FROM products;

Note that

MongoDB's change event record doesn't have update before message. So, we can only convert it to Flink's UPSERT changelog stream.
An upsert stream requires a unique key, so we must declare _id as primary key.
We can't declare other column as primary key, becauce delete operation do not contain's the key and value besides _id and sharding key.

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'mongodb-cdc'.
hosts required (none) String The comma-separated list of hostname and port pairs of the MongoDB servers.
eg. localhost:27017,localhost:27018
username optional (none) String Name of the database user to be used when connecting to MongoDB.
This is required only when MongoDB is configured to use authentication.
password optional (none) String Password to be used when connecting to MongoDB.
This is required only when MongoDB is configured to use authentication.
database required (none) String Name of the database to watch for changes.
collection required (none) String Name of the collection in the database to watch for changes.
connection.options optional (none) String The ampersand-separated connection options of MongoDB. eg.
replicaSet=test&connectTimeoutMS=300000
errors.tolerance optional none String Whether to continue processing messages if an error is encountered. Accept none or all. When set to none, the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to all, the connector silently ignores any bad messages.
errors.log.enable optional true Boolean Whether details of failed operations should be written to the log file.
copy.existing optional true Boolean Whether copy existing data from source collections.
copy.existing.pipeline optional (none) String An array of JSON objects describing the pipeline operations to run when copying existing data.
This can improve the use of indexes by the copying manager and make copying more efficient. eg. [{"$match": {"closed": "false"}}] ensures that only documents in which the closed field is set to false are copied.
copy.existing.max.threads optional Processors Count Integer The number of threads to use when performing the data copy.
copy.existing.queue.size optional 16000 Integer The max size of the queue to use when copying data.
poll.max.batch.size optional 1000 Integer Maximum number of change stream documents to include in a single batch when polling for new data.
poll.await.time.ms optional 1500 Integer The amount of time to wait before checking for new results on the change stream.
heartbeat.interval.ms optional 0 Integer The length of time in milliseconds between sending heartbeat messages. Use 0 to disable.

Note: heartbeat.interval.ms is highly recommended to set a proper value larger than 0 if the collection changes slowly.
The heartbeat event can pushing the resumeToken forward to avoid resumeToken being expired when we recover the Flink job from checkpoint or savepoint.

Features

Exactly-Once Processing

The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with exactly-once processing even failures happen.

Snapshot When Startup Or Not

The config option copy.existing specifies whether do snapshot when MongoDB CDC consumer startup. Defaults to true.

Snapshot Data Filters

The config option copy.existing.pipeline describing the filters when copying existing data.
This can improve the use of indexes by the copying manager and make copying more efficient.

In the following example, the $match aggregation operator ensures that only documents in which the closed field is set to false are copied.
copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]

Change Streams

We integrates the MongoDB's official Kafka Connector to read snapshot or change events from MongoDB and drive it by Debezium's EmbeddedEngine.

Debezium's EmbeddedEngine provides a mechanism for running a single Kafka Connect SourceConnector within an application's process and it can drive any standard Kafka Connect SourceConnector properly even which is not provided by Debezium.

We choose MongoDB's official Kafka Connector instead of the Debezium's MongoDB Connector cause they use a different change data capture mechanism.

  • For Debezium's MongoDB Connector, it read the oplog.rs collection of each replicaset's master node.
  • For MongoDB's Kafka Connector, it subscribe Change Stream of MongoDB.

MongoDB's oplog.rs collection doesn't keep the changed record's update before state, so it's hard to extract the full document state by a single oplog.rs record and convert it to change log stream accepted by Flink (Insert Only, Upsert, All).
Additionally, MongoDB 5 (released in July) has changed the oplog format, so the current Debezium connector cannot be used with it.

Change Stream is a new feature provided by MongoDB 3.6 for replica sets and sharded clusters allows applications to access real-time data changes without the complexity and risk of tailing the oplog.
Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them.

Lookup Full Document for Update Operations is a feature provided by Change Stream which can configure the change stream to return the most current majority-committed version of the updated document. Because of this, we can easily collect the latest full document and convert the change log to Flink's Upsert Changelog Stream.

By the way, Debezium's MongoDB change streams exploration mentioned by DBZ-435 is on roadmap.
If it's done, we can support both for users to choose.

DataStream Source

The MongoDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;

public class MongoDBSourceExample {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
                .hosts("localhost:27017")
                .username("flink")
                .password("flinkpw")
                .database("inventory")
                .collection("products")
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute();
    }
}

Data Type Mapping

BSON type Flink SQL type
TINYINT
SMALLINT
Int
INT
Long BIGINT
FLOAT
Double DOUBLE
Decimal128 DECIMAL(p, s)
Boolean BOOLEAN
Date
Timestamp
DATE
Date
Timestamp
TIME
Date TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
BinData BYTES
Object ROW
Array ARRAY
DBPointer ROW<$ref STRING, $id STRING>
GeoJSON Point : ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

Reference

@Jiabao-Sun Jiabao-Sun marked this pull request as draft September 5, 2021 05:17
@Jiabao-Sun Jiabao-Sun marked this pull request as ready for review September 5, 2021 17:41
- "27017:27017"
environment:
- MONGO_INITDB_ROOT_USERNAME=mongouser
- MONGO_INITDB_ROOT_PASSWORD=mongopw
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the existing tutorial pages to have a better meaningful title. You can create a new page for mongodb, the title can be "Streaming ETL from MongoDB to Elasticsearch"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wuchong, I'll move the tutorial to that.

@wuchong
Copy link
Member

wuchong commented Sep 6, 2021

Besides, you can preview docs locally by following the instruction https://github.com/ververica/flink-cdc-connectors/blob/master/docs/README.md

@Jiabao-Sun Jiabao-Sun force-pushed the mongodb-cdc-docs branch 2 times, most recently from c7257ad to 228b5b1 Compare September 8, 2021 11:16
@Jiabao-Sun Jiabao-Sun requested a review from wuchong September 10, 2021 10:49
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @Jiabao-Sun , I left some comments.

This can improve the use of indexes by the copying manager and make copying more efficient.

In the following example, the $match aggregation operator ensures that only documents in which the closed field is set to false are copied.
`copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should quotes $match, this part of the rendering is broken.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

- For MongoDB's Kafka Connector, it subscribe `Change Stream` of MongoDB.

MongoDB's `oplog.rs` collection doesn't keep the changed record's update before state, so it's hard to extract the full document state by a single `oplog.rs` record and convert it to change log stream accepted by Flink (Insert Only, Upsert, All).
Additionally, MongoDB 5 (released in July) has changed the oplog format, so the current Debezium connector cannot be used with it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

released in July 2021.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">BSON type<a href="https://docs.mongodb.com/manual/reference/bson-types/"></a></th>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain a bit why we map from BSON type at the front?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 8 to 24
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=1234
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove all content about postgres and mysql because they are not needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

.password("flinkpw")
.database("inventory")
.collection("products")
.deserializer(new StringDebeziumDeserializationSchema())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are going to replace the recommended deserizer to JsonDebeziumDeserializationSchema which is more user-friendly for users. Could you take a look #410 and update this part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

2. Remove all content about postgres and mysql in mongodb-tutorials
3. Brief introduction to type conversion
4. Use JsonDebeziumDeserializationSchema instead of StringDebeziumDeserializationSchema
@Jiabao-Sun Jiabao-Sun requested a review from wuchong September 10, 2021 17:50
@Jiabao-Sun
Copy link
Contributor Author

Hi @wuchong, The problem you mentioned above has been fixed.
If you are available, please help review it again.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work @Jiabao-Sun .
LGTM.

@wuchong wuchong merged commit f9d17f4 into apache:master Sep 13, 2021
@wuchong wuchong added this to the v2.1.0 milestone Sep 13, 2021
@Jiabao-Sun Jiabao-Sun deleted the mongodb-cdc-docs branch February 17, 2022 15:18
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this pull request Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants