Skip to content

Conversation

leonardBang
Copy link
Contributor

@leonardBang leonardBang commented Oct 11, 2021

Expose MySqlSource 2.0 DataStream API which support incremental snapshot/parallel reading/ lock-free feature

@leonardBang leonardBang requested a review from wuchong October 11, 2021 13:02
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will help to update code.

// --------------------------------------------
// utils for debezium reader
// --------------------------------------------
public static BinaryLogClient getBinaryClient(Configuration dbzConfiguration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create would be better than get? It's confusing that client is a member of configuraiton.

connectorConfig.password());
}

public static MySqlConnection getConnection(Configuration dbzConfiguration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move all the utils to DebeziumUtils? There already are openMySqlConnection and closeMySqlConnection.

configuration.get(MySqlSourceOptions.DATABASE_NAME),
configuration.get(MySqlSourceOptions.TABLE_NAME)));
sourceConfig.getDbzConfig().getString("database.whitelist"),
sourceConfig.getDbzConfig().getString("table.whitelist")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make all the config decouple with debezium configuration? So that we don't always use debezium config key to get config value.


public Builder capturedDatabases(String capturedDatabases) {
if (capturedDatabases != null) {
dbzProperties.put("database.whitelist", capturedDatabases);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you follow the way of MySqlSource that maintain all the config values as member fields, so that we can easily to guarantee nullability and accessibility. We can build the debezium configuration in the construction.


/** The configuration properties that used by {@link MySqlParallelSource}. */
@Internal
public class MySqlParallelSourceConfig implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MySqlSourceConfig? I think it also represents configs for non-parallal mysql source?

}

/** Builder class of {@link MySqlParallelSource}. */
public static final class Builder<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Builder should be a public API, but lay in an internal API. Could you move it out to be a public API and call it MySqlParallelSourceBuilder.

private String serverTimeZone = SERVER_TIME_ZONE.defaultValue();
private Duration connectTimeout = MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue();
private boolean includeSchemaChanges = false;
private Properties dbzProperties = new Properties();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be final.

switch (startupOptions.startupMode) {
case INITIAL:
startupMode = "initial";
dbzProperties.put("scan.startup.mode", startupMode);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to put an our config into Debezium properties?

final String capturedTables = tableList == null ? null : String.join(",", tableList);

final MySqlParallelSourceConfig parallelSourceConfig =
new MySqlParallelSourceConfig.Builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make this builder holding MySqlParallelSourceConfig and delegates most methods (excepts deserializer) to the config. Then we don't need to maintain duplicate builder codes.


private final String startupMode;
private final boolean includeSchemaChanges;
private final String serverIdRange;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to make the server id range to be structured, and then we can add getServerId method on it ServerIdRange#getServerId(int subtaskId);

@wuchong
Copy link
Member

wuchong commented Oct 13, 2021

I have appended several commits, please have a look @leonardBang .

Comment on lines 387 to 389
The Incremental Snapshot Reading feature of MySQL CDC Source only exposes in SQL currently, if you're using DataStream, please use legacy MySQL Source:

```java
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description can safely delete

Comment on lines 38 to 40
* @deprecated please use {@link com.ververica.cdc.connectors.mysql.source.MySqlSource#builder()}
* instead which supports more rich features, e.g. parallel reading from historical data. The
* {@link MySqlSource} will be dropped in the future version.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @deprecated please use {@link com.ververica.cdc.connectors.mysql.source.MySqlSource#builder()}
* instead which supports more rich features, e.g. parallel reading from historical data. The
* {@link MySqlSource} will be dropped in the future version.
* @deprecated please use {@link com.ververica.cdc.connectors.mysql.source.MySqlSource}
* instead which supports more rich features, e.g. parallel reading from historical data. The
* {@link MySqlSource} will be dropped in the future version.

@leonardBang
Copy link
Contributor Author

Thanks @wuchong for the improvements, I'll merge once the build pass

@leonardBang leonardBang merged commit 5ce7adb into apache:master Oct 13, 2021
@leonardBang leonardBang changed the title [mysql] Expose MySqlParallelSource DataStream API [mysql] Expose MySqlSource 2.0 DataStream API (incremental snapshot/parallel reading/ lock-free) Oct 15, 2021
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this pull request Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants