-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[ISSUE-11] MongoDB cdc connector of Flink 1.13.0 #225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE-11] MongoDB cdc connector of Flink 1.13.0 #225
Conversation
Hi, Jark. @wuchong |
@Jiabao-Sun , I went through the API and examples, and it looks very well. I think it's already in a good shape. |
I've compiled and tested in my env and seems to be working good! flink 1.13 |
Thanks for your reply. |
org.apache.flink.table.api.ValidationException: Table 'MongoDB-CDC' declares metadata columns, but the underlying DynamicTableSource doesn't implement the SupportsReadingMetadata interface. Therefore, metadata cannot be read from the given source. Is this intended? |
@carloscbl Currently this ability has not been realized, but I think it's a helpful suggestion. We can provide some metadata such as Debezium Format describes.
CC @wuchong @leonardBang |
Q1. Maybe relate to #72. Please help check:
Q2. Debezium mongodb connector cannot extract full document of a changed record. |
@Jiabao-Sun Thanks for your reply! |
@yangxusun9 However, there may be other reasons that cause this problem. If you can reproduce this problem, is it convenient to provide a detailed log for troubleshooting? |
4221a10
to
e7bcb88
Compare
Upgrade mongo-kafka-connect from 1.4.0 to 1.6.1 and mongo-driver from 4.2.1 to 4.3.1 to support MongoDB 5.0. |
e7bcb88
to
0c5a150
Compare
UTC for default
0c5a150
to
53ff887
Compare
It is solved because of the problem of using the platform. If you directly call cancel command, it can be cancelled normally |
Hi @yangxusun9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Jiabao-Sun for the great and high-quality work! The pull request looks good to me in general. I don't have much knowledge about MongoDB, so I only left some comments from Flink side.
Cheers!
// then exit the snapshot phase. | ||
if (!isCopying()) { | ||
outSourceRecords = | ||
Collections.singletonList(markLastSnapshotRecord(lastSnapshotRecord)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastSnapshotRecord
may be null
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Jark
lastSnapshotRecord
may be null
when the source collection is empty and a short poll waiting time is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// Snapshot Phase Ended, Condition 1: | ||
// Received non-snapshot record, exit snapshot phase immediately. | ||
if (lastSnapshotRecord != null) { | ||
outSourceRecords.add(markLastSnapshotRecord(lastSnapshotRecord)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we duplicate emit the lastSnapshotRecord
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I checked the code twice and it seems we don't duplicate emit the lastSnapshotRecord
.
The lastSnapshotRecord
is emitted may be snapshot record of previous loop or the last snapshot record of previous batch.
The lastSnapshotRecord
has been renamed to currentLastSnapshotRecord
to eliminate ambiguity.
"connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName()); | ||
props.setProperty("name", "mongodb_binlog_source"); | ||
|
||
props.setProperty(MongoSourceConfig.CONNECTION_URI_CONFIG, checkNotNull(connectionUri)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we didn't set mongodb.name
which is used to identify replica?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry Jark, we didn't choose Debezium's MongoDB Connector but MongoDB's Kafka Connector.
https://docs.mongodb.com/kafka-connector/current/kafka-source/
The mongodb.name
property is used for kafka topics prefix as topic.prefix
in MongoDB's Kafka Connector and I think it's not required for our flink cdc connector.
The mechanisms of the two connectors are different:
- For Debezium's MongoDB Connector, it read the
oplog.rs
collection of each replica set master node. - For MongoDB's Kafka Connector, it subscribe change stream of MongoDB.
MongoDB's oplog.rs
collection didn't keep the changed record's update before state
, so it's hard to extract the full document state by one oplog.rs
's record and convert it to Flink's upsert changelog.
Additionally, MongoDB 5 (released in July) has changed the oplog format, so the current Debezium connector cannot be used with it.
The MongoDB's change streams provided a updateLookup
feature to return the most current majority-committed version of the updated document. It's easy to convert change stream event to Flink's upsert changelog.
https://docs.mongodb.com/manual/changeStreams/
https://docs.mongodb.com/manual/reference/change-events/
By the way, Debezium's MongoDB change streams exploration is on roadmap.
If it's done, we can provide two engine for users to choose.
https://issues.redhat.com/browse/DBZ-435
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation.
* A builder to build a SourceFunction which can read snapshot and continue to consume change stream | ||
* events. | ||
*/ | ||
public class MongoDBSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @PublicEvolving
annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
+ " md5Field STRING,\n" | ||
+ " dateField DATE,\n" | ||
+ " dateBefore1970 DATE,\n" | ||
+ " timestampField TIMESTAMP,\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test for TIMESTAMP_LTZ
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
return TimestampData.fromEpochMillis(docObj.asDateTime().getValue()); | ||
} | ||
if (docObj.isTimestamp()) { | ||
return TimestampData.fromEpochMillis(docObj.asTimestamp().getTime() * 1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the underlying timestamp value means? Is there any documentation to reference?
Usually, TimestampData.fromEpochMillis
is used for TIMESTAMP_LTZ
types where the epoch milliseconds represent Java Instant
. We recommend to use TimestampData.fromLocalDateTime
for TIMESTAMP
types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.mongodb.com/manual/reference/bson-types/
Timestamp
BSON has a special timestamp type for internal MongoDB use and is not associated with the regular Date type. This internal timestamp type is a 64 bit value where:
- the most significant 32 bits are a time_t value (seconds since the Unix epoch)
- the least significant 32 bits are an incrementing ordinal for operations within a given second.
The BSON timestamp type is represented by a java long type. High 32 bit's represent unix epoch seconds and low 32 bits's represent an incrementing value to keep time increments and avoid duplication.
DateTime
BSON Date is a 64-bit integer that represents the number of milliseconds since the Unix epoch (Jan 1, 1970). This results in a representable date range of about 290 million years into the past and future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Jark to for the detailed explanation of TIMESTAMP_LTZ
.
As the documentation mentioned above, they are semantically equivalent:
- BSON
DateTime
-TIMESTAMP_LTZ(3)
- BSON
Timestamp
-TIMESTAMP_LTZ(0)
We treated BSON Timestamp
as TIMESTAMP_LTZ(0) because it's minimum precision is seconds and the incrementing value does not represent the exact time interval.
So, I think the transformation in the code can work.
If there is a deviation in my understanding, please help to correct me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Jiabao-Sun , I think it makes sense to map BSON DateTime
and Timestamp
to Flink TIMESTAMP_LTZ
because they all represents the seconds since epoch.
However, if we map them to Flink TIMESTAMP
(i.e. TIMESTAMP WITHOUT TIME ZONE), then we need to do a conversion (epoch seconds to local clock time) just like Java Instant
to Java LocalDateTime
. This conversion requires a time zone which means what time zone to dispaly the instant in string. Usually, the time zone is configured by users and not hard code, e.g. table.local-time-zone in Flink SQL , and server-time-zone in mysql-cdc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wuchong, I'll add a server-time-zone
configuration for users to configure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wuchong for the detailed explanation.
I added a new config option local-time-zone
to let users decide which time zone to convert to TIMESTAMP
.
I think this is semantically more similar to table.local-time-zone
in Flink SQL rather than server-time-zone
of MySQL, so I named it as local-time-zone
.
By the way, is there a method provided by Flink to get the table.local-time-zone
at runtime, so we may not need to add additional config option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you can get it by org.apache.flink.table.factories.DynamicTableFactory.Context#getConfiguration
in table factory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wuchong.
Removed table option 'local-time-zone', use time zone of table.local-time-zone
at runtime.
Besides, I created #393 for the mongodb-cdc connector, would like to take it as well ? |
Thanks, be happy to do with it. |
Thanks a lot for the review and helpful suggestions. Best |
props.setProperty("name", "mongodb_binlog_source"); | ||
|
||
props.setProperty(MongoSourceConfig.CONNECTION_URI_CONFIG, checkNotNull(connectionUri)); | ||
props.setProperty(MongoSourceConfig.DATABASE_CONFIG, checkNotNull(database)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @Jiabao-Sun Some mongodb needs to have an account and password to connect, here does not consider the setting of the account and password?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @XuQianJin-Stars Please use this pattern mongodb://username:password@localhost/
|
||
MongoDBSource.Builder<RowData> builder = | ||
MongoDBSource.<RowData>builder() | ||
.connectionUri(uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jiabao-Sun There is also the url here. If Shard, there will be multiple ip:ports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi. @XuQianJin-Stars
The uri represents MongoDB's connecting string, you can see https://docs.mongodb.com/manual/reference/connection-string/#standard-connection-string-format for detail.
For replica set.
mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet
For sharded cluster.
mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uri pattern looks a little complicate. I'm afraid most MongoDB newbies can't set a correct URI at first time.
Can we decouple the different parts of the URI? e.g. hosts
, replica-set
, username
, password
.
Decouple can also makes things flexible, e.g. we would like to mask secrets in logging and displaying in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Jiabao-Sun I also think it is better to set the username and password separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will add added separate configurations for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New config options have been added to replace the uri
.
hosts
: The comma-separated list of hostname and port pairs of the MongoDB servers.user
: Database user to be used when connecting to MongoDB.password
: Password to be used when connecting to MongoDB.mongodb.replicaset
: Specifies the name of the replica set. It is not necessary, but can speed up your connection times to explicitly state the servers configured byhosts
.mongodb.authsource
: This is required only when MongoDB is configured to use authentication with another authentication database than admin. Defaults toadmin
.mongodb.connect.timeout.ms
: The time in milliseconds to attempt a connection before timing out. Defaults to 10000 (10 seconds).mongodb.socket.timeout.ms
: The time in milliseconds to attempt a send or receive on a socket before the attempt times out. Defaults to 0.mongodb.ssl.enabled
: Connector will use SSL to connect to MongoDB instances.mongodb.ssl.invalid.hostname.allowed
: When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase.
Flink-VVP |
405c28a
to
7a339ab
Compare
7a339ab
to
3190591
Compare
@wuchong @XuQianJin-Stars |
+ "eg. localhost:27017,localhost:27018"); | ||
|
||
private static final ConfigOption<String> USER = | ||
ConfigOptions.key("user") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about username
to keep align with other connector in this repo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
"Name of the collection in the database to watch for changes."); | ||
|
||
private static final ConfigOption<String> REPLICA_SET = | ||
ConfigOptions.key("mongodb.replicaset") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention is just separate secret options. It seems we introduced a bounch of options and there are more.
In order to make the connector option to be concise, flexible, powerful. What do you think about just introduce a connection.options
? It can set one or more arbitrary mongodb connection options [1].
Connection options are pairs in the following form: name=value
. Separate options with the ampersand (i.e. &) character name1=value1&name2=value2
. In the following example, a connection includes the replicaSet
and connectTimeoutMS
options:
'connection.options' = 'replicaSet=test&connectTimeoutMS=300000'
In documentation, we can put the mongodb connection options link [1] when describing this option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @wuchong,
I think this new ampersand config option may cause confusion for those MongoDB expert users that they may be more used to use connection strings.
I have two proposals.
- Use
mongodb.connect
as config prefix to mark connection options.
For example:
CREATE TABLE WITH (
hosts = 'localhost:27017,localhost:27018',
username = 'user',
password = 'pwd',
mongodb.connect.authsource = 'nonadmin',
mongodb.connect.replicaset = 'rs',
mongodb.connect.ssl = 'false'
);
- Provide config option
uri
as an alternative ofhosts
.
We can directly use uri with connection parameters and append the `username` and `password` as `userInfo` to the connection URI.
For example:
CREATE TABLE WITH (
uri = 'mongodb://localhost:27017,localhost:27018/?authsource=nonadmin&replicaset=rs',
username = 'user',
password = 'pwd'
);
It s equivalent to the following but which has no connection option
CREATE TABLE WITH (
hosts = 'localhost:27017,localhost:27018',
username = 'user',
password = 'pwd'
);
Do you think so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with connection.options
and mongodb.connect.
prefix. Both way don't need us to do config mapping and we can transparent pass mongodb's options. What do you think about the two ways? @leonardBang @XuQianJin-Stars
@Jiabao-Sun a side question: does mongodb allow lower case options? e.g. mongodb://db1.example.net:27017,db2.example.net:2500/?replicaset=test&connecttimeoutms=300000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong
Yes, It's case insensitive.
As connection-string-options says:
Connection options are pairs in the following form: name=value.
The option name is case insensitive when using a driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong @leonardBang @XuQianJin-Stars
The connection.options
has been added in the latest commit.
If you think mongodb.connect.
prefix is better, I can make a change for it.
9d719cf
to
4e9e630
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes looks good to me. I will merge it once build is passed. Thanks for the great work and long time patience @Jiabao-Sun .
Thanks @wuchong @leonardBang @XuQianJin-Stars @carloscbl @yangxusun9 for your help. It is a great honor to be able to complete it. Cheers! |
Support MongoDB CDC connector of Flink 1.13.0 #11
Summary
Use Debezium EmbeddedEngine to drive a SourceConnector of MongoDB Kafka Connector.
MongoDB Kafka Connector's CDC mechanism is MongoDB ChangeSteams which is available for replica sets and sharded clusters.
For a fully detailed documentation of MongoDB Kafka Connector , please see Documentation.
Example
前置条件
Create a user:
Flink SQL Example:
DataStream Example:
Connector Options
eg. localhost:27017,localhost:27018
This is required only when MongoDB is configured to use authentication.
This is required only when MongoDB is configured to use authentication.
replicaSet=test&connectTimeoutMS=300000
.[{"$match": {"closed": "false"}}]
ensures that only documents in which the closed field is set to false are copied.Reference
MongoDB Kafaka Connector
MongoDB Change Streams