Skip to content

Commit 5ce7adb

Browse files
leonardBangwuchong
andauthored
[mysql] Expose MySqlSource API that supports incremental snapshot (#495)
Co-authored-by: Jark Wu <[email protected]>
1 parent 9cf48a3 commit 5ce7adb

39 files changed

+1453
-768
lines changed

README.md

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,33 +65,35 @@ Include following Maven dependency (available through Maven Central):
6565
```
6666

6767
```java
68+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
6869
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
69-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
7070
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
71-
import com.ververica.cdc.connectors.mysql.MySqlSource;
71+
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
7272

73-
public class MySqlBinlogSourceExample {
73+
public class MySqlSourceExample {
7474
public static void main(String[] args) throws Exception {
75-
Properties debeziumProperties = new Properties();
76-
debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
77-
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
75+
MySqlSource<String> source = MySqlSource.<String>builder()
7876
.hostname("yourHostname")
7977
.port(yourPort)
8078
.databaseList("yourDatabaseName") // set captured database
8179
.tableList("yourDatabaseName.yourTableName") // set captured table
8280
.username("yourUsername")
8381
.password("yourPassword")
8482
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
85-
.debeziumProperties(debeziumProperties)
8683
.build();
8784

8885
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8986

87+
// enable checkpoint
88+
env.enableCheckpointing(3000);
89+
9090
env
91-
.addSource(sourceFunction)
91+
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
92+
// set 4 parallel source tasks
93+
.setParallelism(4)
9294
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
9395

94-
env.execute();
96+
env.execute("Print MySQL Snapshot + Binlog");
9597
}
9698
}
9799
```

docs/content/connectors/mysql-cdc.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -384,36 +384,36 @@ _Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapsh
384384

385385
### DataStream Source
386386

387-
The Incremental Snapshot Reading feature of MySQL CDC Source only exposes in SQL currently, if you're using DataStream, please use legacy MySQL Source:
388-
389387
```java
388+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
390389
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
391-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
392390
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
393-
import com.ververica.cdc.connectors.mysql.MySqlSource;
391+
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
394392

395-
public class MySqlBinlogSourceExample {
393+
public class MySqlSourceExample {
396394
public static void main(String[] args) throws Exception {
397-
Properties debeziumProperties = new Properties();
398-
debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
399-
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
400-
.hostname("yourHostname")
401-
.port(yourPort)
402-
.databaseList("yourDatabaseName") // set captured database
403-
.tableList("yourDatabaseName.yourTableName") // set captured table
404-
.username("yourUsername")
405-
.password("yourPassword")
406-
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
407-
.debeziumProperties(debeziumProperties)
408-
.build();
395+
MySqlSource<String> source = MySqlSource.<String>builder()
396+
.hostname("yourHostname")
397+
.port(yourPort)
398+
.databaseList("yourDatabaseName") // set captured database
399+
.tableList("yourDatabaseName.yourTableName") // set captured table
400+
.username("yourUsername")
401+
.password("yourPassword")
402+
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
403+
.build();
409404

410405
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
411406

407+
// enable checkpoint
408+
env.enableCheckpointing(3000);
409+
412410
env
413-
.addSource(sourceFunction)
411+
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
412+
// set 4 parallel source tasks
413+
.setParallelism(4)
414414
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
415415

416-
env.execute();
416+
env.execute("Print MySQL Snapshot + Binlog");
417417
}
418418
}
419419
```

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlSource.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,35 @@
2828
import java.util.Map;
2929
import java.util.Properties;
3030

31-
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
3231
import static com.ververica.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_KEY;
3332
import static com.ververica.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_VALUE;
3433
import static org.apache.flink.util.Preconditions.checkNotNull;
3534

36-
/** A builder to build a SourceFunction which can read snapshot and continue to consume binlog. */
35+
/**
36+
* A builder to build a SourceFunction which can read snapshot and continue to consume binlog.
37+
*
38+
* @deprecated please use {@link com.ververica.cdc.connectors.mysql.source.MySqlSource} instead
39+
* which supports more rich features, e.g. parallel reading from historical data. The {@link
40+
* MySqlSource} will be dropped in the future version.
41+
*/
42+
@Deprecated
3743
public class MySqlSource {
3844

45+
private static final String DATABASE_SERVER_NAME = "mysql_binlog_source";
46+
3947
public static <T> Builder<T> builder() {
4048
return new Builder<>();
4149
}
4250

43-
/** Builder class of {@link MySqlSource}. */
51+
/**
52+
* Builder class of {@link MySqlSource}.
53+
*
54+
* @deprecated please use {@link
55+
* com.ververica.cdc.connectors.mysql.source.MySqlSource#builder()} instead which supports
56+
* more rich features, e.g. parallel reading from historical data. The {@link
57+
* MySqlSource.Builder} will be dropped in the future version.
58+
*/
59+
@Deprecated
4460
public static class Builder<T> {
4561

4662
private int port = 3306; // default 3306 port

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlValidator.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818

1919
package com.ververica.cdc.connectors.mysql;
2020

21-
import org.apache.flink.configuration.Configuration;
2221
import org.apache.flink.table.api.TableException;
2322
import org.apache.flink.table.api.ValidationException;
2423

25-
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
26-
2724
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
2825
import com.ververica.cdc.debezium.Validator;
26+
import io.debezium.config.Configuration;
2927
import io.debezium.connector.mysql.MySqlConnection;
3028

3129
import java.sql.SQLException;
@@ -43,19 +41,16 @@ public class MySqlValidator implements Validator {
4341
private static final String BINLOG_FORMAT_ROW = "ROW";
4442
private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL";
4543

46-
private final Configuration configuration;
47-
48-
public MySqlValidator(Properties properties) {
49-
this(Configuration.fromMap(Maps.fromProperties(properties)));
50-
}
44+
private final Properties dbzProperties;
5145

52-
public MySqlValidator(Configuration configuration) {
53-
this.configuration = configuration;
46+
public MySqlValidator(Properties dbzProperties) {
47+
this.dbzProperties = dbzProperties;
5448
}
5549

5650
@Override
5751
public void validate() {
58-
try (MySqlConnection connection = DebeziumUtils.openMySqlConnection(configuration)) {
52+
try (MySqlConnection connection =
53+
DebeziumUtils.openMySqlConnection(Configuration.from(dbzProperties))) {
5954
checkVersion(connection);
6055
checkBinlogFormat(connection);
6156
checkBinlogRowImage(connection);

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,36 @@
1818

1919
package com.ververica.cdc.connectors.mysql.debezium;
2020

21-
import org.apache.flink.configuration.Configuration;
2221
import org.apache.flink.util.FlinkRuntimeException;
2322

23+
import com.github.shyiko.mysql.binlog.BinaryLogClient;
2424
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
25+
import io.debezium.config.Configuration;
2526
import io.debezium.connector.mysql.MySqlConnection;
2627
import io.debezium.connector.mysql.MySqlConnectorConfig;
27-
import io.debezium.relational.RelationalTableFilters;
28+
import io.debezium.connector.mysql.MySqlDatabaseSchema;
29+
import io.debezium.connector.mysql.MySqlTopicSelector;
30+
import io.debezium.connector.mysql.MySqlValueConverters;
31+
import io.debezium.jdbc.JdbcValueConverters;
32+
import io.debezium.jdbc.TemporalPrecisionMode;
33+
import io.debezium.relational.TableId;
34+
import io.debezium.schema.TopicSelector;
35+
import io.debezium.util.SchemaNameAdjuster;
2836
import org.slf4j.Logger;
2937
import org.slf4j.LoggerFactory;
3038

3139
import java.sql.SQLException;
3240

33-
import static com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.toDebeziumConfig;
34-
3541
/** Utilities related to Debezium. */
3642
public class DebeziumUtils {
3743

3844
private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);
3945

40-
/**
41-
* Creates {@link RelationalTableFilters} from configuration. The {@link RelationalTableFilters}
42-
* can be used to filter tables according to "table.whitelist" and "database.whitelist" options.
43-
*/
44-
public static RelationalTableFilters createTableFilters(Configuration configuration) {
45-
io.debezium.config.Configuration debeziumConfig = toDebeziumConfig(configuration);
46-
final MySqlConnectorConfig mySqlConnectorConfig = new MySqlConnectorConfig(debeziumConfig);
47-
return mySqlConnectorConfig.getTableFilters();
48-
}
49-
5046
/** Creates and opens a new {@link MySqlConnection}. */
51-
public static MySqlConnection openMySqlConnection(Configuration configuration) {
47+
public static MySqlConnection openMySqlConnection(Configuration dbzConfiguration) {
5248
MySqlConnection jdbc =
5349
new MySqlConnection(
54-
new MySqlConnection.MySqlConnectionConfiguration(
55-
toDebeziumConfig(configuration)));
50+
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
5651
try {
5752
jdbc.connect();
5853
} catch (SQLException e) {
@@ -75,6 +70,37 @@ public static void closeMySqlConnection(MySqlConnection jdbc) {
7570
}
7671
}
7772

73+
/** Creates a new {@link MySqlConnection}, but not open the connection. */
74+
public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) {
75+
return new MySqlConnection(
76+
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
77+
}
78+
79+
/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */
80+
public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) {
81+
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration);
82+
return new BinaryLogClient(
83+
connectorConfig.hostname(),
84+
connectorConfig.port(),
85+
connectorConfig.username(),
86+
connectorConfig.password());
87+
}
88+
89+
/** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */
90+
public static MySqlDatabaseSchema createMySqlDatabaseSchema(
91+
MySqlConnectorConfig dbzMySqlConfig, MySqlConnection connection) {
92+
boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
93+
TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
94+
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
95+
MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig);
96+
return new MySqlDatabaseSchema(
97+
dbzMySqlConfig,
98+
valueConverters,
99+
topicSelector,
100+
schemaNameAdjuster,
101+
tableIdCaseInsensitive);
102+
}
103+
78104
/** Fetch current binlog offsets in MySql Server. */
79105
public static BinlogOffset currentBinlogOffset(MySqlConnection jdbc) {
80106
final String showMasterStmt = "SHOW MASTER STATUS";
@@ -104,4 +130,30 @@ public static BinlogOffset currentBinlogOffset(MySqlConnection jdbc) {
104130
e);
105131
}
106132
}
133+
134+
// --------------------------------------------------------------------------------------------
135+
136+
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
137+
TemporalPrecisionMode timePrecisionMode = dbzMySqlConfig.getTemporalPrecisionMode();
138+
JdbcValueConverters.DecimalMode decimalMode = dbzMySqlConfig.getDecimalMode();
139+
String bigIntUnsignedHandlingModeStr =
140+
dbzMySqlConfig
141+
.getConfig()
142+
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
143+
MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
144+
MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
145+
bigIntUnsignedHandlingModeStr);
146+
JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
147+
bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
148+
149+
boolean timeAdjusterEnabled =
150+
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
151+
return new MySqlValueConverters(
152+
decimalMode,
153+
timePrecisionMode,
154+
bigIntUnsignedMode,
155+
dbzMySqlConfig.binaryHandlingMode(),
156+
timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
157+
MySqlValueConverters::defaultParsingErrorHandler);
158+
}
107159
}

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
176176
// otherwise, we may can't find corresponding schema
177177
Configuration dezConf =
178178
statefulTaskContext
179-
.getDezConf()
179+
.getSourceConfig()
180+
.getDbzConfiguration()
180181
.edit()
181182
.with("table.whitelist", currentSnapshotSplit.getTableId())
182183
.build();

0 commit comments

Comments
 (0)