Skip to content

Commit 1f5d2dd

Browse files
committed
[mysql] Expose MySqlParallelSource DataStream API
1 parent 5beaeaf commit 1f5d2dd

26 files changed

+976
-603
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@
2828
import java.util.Map;
2929
import java.util.Properties;
3030

31-
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
3231
import static com.ververica.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_KEY;
3332
import static com.ververica.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_VALUE;
3433
import static org.apache.flink.util.Preconditions.checkNotNull;
3534

3635
/** A builder to build a SourceFunction which can read snapshot and continue to consume binlog. */
3736
public class MySqlSource {
3837

38+
private static final String DATABASE_SERVER_NAME = "mysql_binlog_source";
39+
3940
public static <T> Builder<T> builder() {
4041
return new Builder<>();
4142
}

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlValidator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ public MySqlValidator(Configuration configuration) {
5555

5656
@Override
5757
public void validate() {
58-
try (MySqlConnection connection = DebeziumUtils.openMySqlConnection(configuration)) {
58+
try (MySqlConnection connection =
59+
DebeziumUtils.openMySqlConnection(
60+
io.debezium.config.Configuration.from(configuration.toMap()))) {
5961
checkVersion(connection);
6062
checkBinlogFormat(connection);
6163
checkBinlogRowImage(connection);

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,26 @@
1818

1919
package com.ververica.cdc.connectors.mysql.debezium;
2020

21-
import org.apache.flink.configuration.Configuration;
2221
import org.apache.flink.util.FlinkRuntimeException;
2322

2423
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
24+
import io.debezium.config.Configuration;
2525
import io.debezium.connector.mysql.MySqlConnection;
26-
import io.debezium.connector.mysql.MySqlConnectorConfig;
27-
import io.debezium.relational.RelationalTableFilters;
2826
import org.slf4j.Logger;
2927
import org.slf4j.LoggerFactory;
3028

3129
import java.sql.SQLException;
3230

33-
import static com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.toDebeziumConfig;
34-
3531
/** Utilities related to Debezium. */
3632
public class DebeziumUtils {
3733

3834
private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);
3935

40-
/**
41-
* Creates {@link RelationalTableFilters} from configuration. The {@link RelationalTableFilters}
42-
* can be used to filter tables according to "table.whitelist" and "database.whitelist" options.
43-
*/
44-
public static RelationalTableFilters createTableFilters(Configuration configuration) {
45-
io.debezium.config.Configuration debeziumConfig = toDebeziumConfig(configuration);
46-
final MySqlConnectorConfig mySqlConnectorConfig = new MySqlConnectorConfig(debeziumConfig);
47-
return mySqlConnectorConfig.getTableFilters();
48-
}
49-
5036
/** Creates and opens a new {@link MySqlConnection}. */
5137
public static MySqlConnection openMySqlConnection(Configuration configuration) {
5238
MySqlConnection jdbc =
5339
new MySqlConnection(
54-
new MySqlConnection.MySqlConnectionConfiguration(
55-
toDebeziumConfig(configuration)));
40+
new MySqlConnection.MySqlConnectionConfiguration(configuration));
5641
try {
5742
jdbc.connect();
5843
} catch (SQLException e) {

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
176176
// otherwise, we may can't find corresponding schema
177177
Configuration dezConf =
178178
statefulTaskContext
179-
.getDezConf()
179+
.getSourceConfig()
180+
.getDbzConfig()
180181
.edit()
181182
.with("table.whitelist", currentSnapshotSplit.getTableId())
182183
.build();

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java

Lines changed: 12 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818

1919
package com.ververica.cdc.connectors.mysql.debezium.task.context;
2020

21-
import org.apache.flink.configuration.Configuration;
22-
2321
import com.github.shyiko.mysql.binlog.BinaryLogClient;
2422
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
2523
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
26-
import com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
24+
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceConfig;
2725
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
2826
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
2927
import io.debezium.connector.AbstractSourceInfo;
@@ -36,18 +34,14 @@
3634
import io.debezium.connector.mysql.MySqlOffsetContext;
3735
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
3836
import io.debezium.connector.mysql.MySqlTopicSelector;
39-
import io.debezium.connector.mysql.MySqlValueConverters;
4037
import io.debezium.data.Envelope;
41-
import io.debezium.jdbc.JdbcValueConverters;
42-
import io.debezium.jdbc.TemporalPrecisionMode;
4338
import io.debezium.pipeline.DataChangeEvent;
4439
import io.debezium.pipeline.ErrorHandler;
4540
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
4641
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
4742
import io.debezium.pipeline.source.spi.EventMetadataProvider;
4843
import io.debezium.pipeline.spi.OffsetContext;
4944
import io.debezium.relational.TableId;
50-
import io.debezium.relational.history.AbstractDatabaseHistory;
5145
import io.debezium.schema.DataCollectionId;
5246
import io.debezium.schema.TopicSelector;
5347
import io.debezium.util.Clock;
@@ -62,7 +56,6 @@
6256
import java.util.Map;
6357

6458
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
65-
import static io.debezium.config.CommonConnectorConfig.TOMBSTONES_ON_DELETE;
6659

6760
/**
6861
* A stateful task context that contains entries the debezium mysql connector task required.
@@ -75,7 +68,7 @@ public class StatefulTaskContext {
7568
private static final Logger LOG = LoggerFactory.getLogger(StatefulTaskContext.class);
7669
private static final Clock clock = Clock.SYSTEM;
7770

78-
private final io.debezium.config.Configuration dezConf;
71+
private final MySqlParallelSourceConfig sourceConfig;
7972
private final MySqlConnectorConfig connectorConfig;
8073
private final MySqlEventMetadataProvider metadataProvider;
8174
private final SchemaNameAdjuster schemaNameAdjuster;
@@ -93,11 +86,11 @@ public class StatefulTaskContext {
9386
private ErrorHandler errorHandler;
9487

9588
public StatefulTaskContext(
96-
Configuration configuration,
89+
MySqlParallelSourceConfig sourceConfig,
9790
BinaryLogClient binaryLogClient,
9891
MySqlConnection connection) {
99-
this.dezConf = toDebeziumConfig(configuration);
100-
this.connectorConfig = new MySqlConnectorConfig(dezConf);
92+
this.sourceConfig = sourceConfig;
93+
this.connectorConfig = sourceConfig.getMySqlConnectorConfig();
10194
this.schemaNameAdjuster = SchemaNameAdjuster.create();
10295
this.metadataProvider = new MySqlEventMetadataProvider();
10396
this.binaryLogClient = binaryLogClient;
@@ -106,19 +99,14 @@ public StatefulTaskContext(
10699

107100
public void configure(MySqlSplit mySqlSplit) {
108101
// initial stateful objects
109-
final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
110102
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
111-
final MySqlValueConverters valueConverters = getValueConverters(connectorConfig);
112103
EmbeddedFlinkDatabaseHistory.registerHistory(
113-
dezConf.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
104+
sourceConfig
105+
.getDbzConfig()
106+
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
114107
mySqlSplit.getTableSchemas().values());
115108
this.databaseSchema =
116-
new MySqlDatabaseSchema(
117-
connectorConfig,
118-
valueConverters,
119-
topicSelector,
120-
schemaNameAdjuster,
121-
tableIdCaseInsensitive);
109+
MySqlParallelSourceConfig.getMySqlDatabaseSchema(connectorConfig, connection);
122110
this.offsetContext =
123111
loadStartingOffsetState(new MySqlOffsetContext.Loader(connectorConfig), mySqlSplit);
124112
validateAndLoadDatabaseHistory(offsetContext, databaseSchema);
@@ -139,7 +127,7 @@ public void configure(MySqlSplit mySqlSplit) {
139127
() ->
140128
taskContext.configureLoggingContext(
141129
"mysql-cdc-connector-task"))
142-
// no buffer any more, we use signal event
130+
// do not buffer any element, we use signal event
143131
// .buffering()
144132
.build();
145133
this.dispatcher =
@@ -218,30 +206,6 @@ private boolean isBinlogAvailable(MySqlOffsetContext offset) {
218206
return found;
219207
}
220208

221-
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {
222-
TemporalPrecisionMode timePrecisionMode = configuration.getTemporalPrecisionMode();
223-
JdbcValueConverters.DecimalMode decimalMode = configuration.getDecimalMode();
224-
String bigIntUnsignedHandlingModeStr =
225-
configuration
226-
.getConfig()
227-
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
228-
MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
229-
MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
230-
bigIntUnsignedHandlingModeStr);
231-
JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
232-
bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
233-
234-
final boolean timeAdjusterEnabled =
235-
configuration.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
236-
return new MySqlValueConverters(
237-
decimalMode,
238-
timePrecisionMode,
239-
bigIntUnsignedMode,
240-
configuration.binaryHandlingMode(),
241-
timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
242-
MySqlValueConverters::defaultParsingErrorHandler);
243-
}
244-
245209
/** Copied from debezium for accessing here. */
246210
public static class MySqlEventMetadataProvider implements EventMetadataProvider {
247211
public static final String SERVER_ID_KEY = "server_id";
@@ -297,8 +261,8 @@ public static Clock getClock() {
297261
return clock;
298262
}
299263

300-
public io.debezium.config.Configuration getDezConf() {
301-
return dezConf;
264+
public MySqlParallelSourceConfig getSourceConfig() {
265+
return sourceConfig;
302266
}
303267

304268
public MySqlConnectorConfig getConnectorConfig() {
@@ -354,48 +318,4 @@ public StreamingChangeEventSourceMetrics getStreamingChangeEventSourceMetrics()
354318
public SchemaNameAdjuster getSchemaNameAdjuster() {
355319
return schemaNameAdjuster;
356320
}
357-
358-
// ------------ utils ---------
359-
public static BinaryLogClient getBinaryClient(Configuration configuration) {
360-
final MySqlConnectorConfig connectorConfig =
361-
new MySqlConnectorConfig(toDebeziumConfig(configuration));
362-
return new BinaryLogClient(
363-
connectorConfig.hostname(),
364-
connectorConfig.port(),
365-
connectorConfig.username(),
366-
connectorConfig.password());
367-
}
368-
369-
public static MySqlConnection getConnection(Configuration configuration) {
370-
return new MySqlConnection(
371-
new MySqlConnection.MySqlConnectionConfiguration(toDebeziumConfig(configuration)));
372-
}
373-
374-
public static MySqlDatabaseSchema getMySqlDatabaseSchema(
375-
Configuration configuration, MySqlConnection connection) {
376-
io.debezium.config.Configuration dezConf = toDebeziumConfig(configuration);
377-
MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dezConf);
378-
boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
379-
TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
380-
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
381-
MySqlValueConverters valueConverters = getValueConverters(connectorConfig);
382-
return new MySqlDatabaseSchema(
383-
connectorConfig,
384-
valueConverters,
385-
topicSelector,
386-
schemaNameAdjuster,
387-
tableIdCaseInsensitive);
388-
}
389-
390-
public static io.debezium.config.Configuration toDebeziumConfig(Configuration configuration) {
391-
return io.debezium.config.Configuration.from(configuration.toMap())
392-
.edit()
393-
.with(AbstractDatabaseHistory.INTERNAL_PREFER_DDL, true)
394-
.with(TOMBSTONES_ON_DELETE, false)
395-
.with("database.responseBuffering", "adaptive")
396-
.with(
397-
"database.fetchSize",
398-
configuration.getInteger(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE))
399-
.build();
400-
}
401321
}

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlSchema.java

Lines changed: 5 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,22 @@
2020

2121
import org.apache.flink.util.FlinkRuntimeException;
2222

23-
import io.debezium.config.Configuration;
23+
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceConfig;
2424
import io.debezium.connector.mysql.MySqlConnection;
2525
import io.debezium.connector.mysql.MySqlConnectorConfig;
2626
import io.debezium.connector.mysql.MySqlDatabaseSchema;
2727
import io.debezium.connector.mysql.MySqlOffsetContext;
28-
import io.debezium.connector.mysql.MySqlTopicSelector;
29-
import io.debezium.connector.mysql.MySqlValueConverters;
30-
import io.debezium.jdbc.JdbcValueConverters;
31-
import io.debezium.jdbc.TemporalPrecisionMode;
3228
import io.debezium.relational.TableId;
3329
import io.debezium.relational.history.TableChanges.TableChange;
3430
import io.debezium.schema.SchemaChangeEvent;
35-
import io.debezium.schema.TopicSelector;
36-
import io.debezium.util.SchemaNameAdjuster;
3731

3832
import java.sql.SQLException;
3933
import java.time.Instant;
4034
import java.util.HashMap;
4135
import java.util.List;
4236
import java.util.Map;
4337

38+
import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceConfig.getMySqlDatabaseSchema;
4439
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote;
4540

4641
/** A component used to get schema by table path. */
@@ -50,18 +45,9 @@ public class MySqlSchema {
5045
private final MySqlConnection jdbc;
5146
private final Map<TableId, TableChange> schemasByTableId;
5247

53-
public MySqlSchema(Configuration dbzConf, MySqlConnection jdbc) {
54-
this.connectorConfig = new MySqlConnectorConfig(dbzConf);
55-
TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
56-
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
57-
MySqlValueConverters valueConverters = getValueConverters(connectorConfig);
58-
this.databaseSchema =
59-
new MySqlDatabaseSchema(
60-
connectorConfig,
61-
valueConverters,
62-
topicSelector,
63-
schemaNameAdjuster,
64-
jdbc.isTableIdCaseSensitive());
48+
public MySqlSchema(MySqlParallelSourceConfig sourceConfig, MySqlConnection jdbc) {
49+
this.connectorConfig = sourceConfig.getMySqlConnectorConfig();
50+
this.databaseSchema = getMySqlDatabaseSchema(connectorConfig, jdbc);
6551
this.jdbc = jdbc;
6652
this.schemasByTableId = new HashMap<>();
6753
}
@@ -118,28 +104,4 @@ private TableChange readTableSchema(TableId tableId) {
118104

119105
return tableChangeMap.get(tableId);
120106
}
121-
122-
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig connectorConfig) {
123-
TemporalPrecisionMode timePrecisionMode = connectorConfig.getTemporalPrecisionMode();
124-
JdbcValueConverters.DecimalMode decimalMode = connectorConfig.getDecimalMode();
125-
String bigIntUnsignedHandlingModeStr =
126-
connectorConfig
127-
.getConfig()
128-
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
129-
MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
130-
MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
131-
bigIntUnsignedHandlingModeStr);
132-
JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
133-
bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
134-
135-
final boolean timeAdjusterEnabled =
136-
connectorConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
137-
return new MySqlValueConverters(
138-
decimalMode,
139-
timePrecisionMode,
140-
bigIntUnsignedMode,
141-
connectorConfig.binaryHandlingMode(),
142-
timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
143-
MySqlValueConverters::defaultParsingErrorHandler);
144-
}
145107
}

0 commit comments

Comments
 (0)