Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,33 +65,35 @@ Include following Maven dependency (available through Maven Central):
```

```java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
Properties debeziumProperties = new Properties();
debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(debeziumProperties)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);

env
.addSource(sourceFunction)
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute();
env.execute("Print MySQL Snapshot + Binlog");
}
}
```
Expand Down
38 changes: 19 additions & 19 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,36 +384,36 @@ _Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapsh

### DataStream Source

The Incremental Snapshot Reading feature of MySQL CDC Source only exposes in SQL currently, if you're using DataStream, please use legacy MySQL Source:

```java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
Properties debeziumProperties = new Properties();
debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(debeziumProperties)
.build();
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);

env
.addSource(sourceFunction)
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute();
env.execute("Print MySQL Snapshot + Binlog");
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,35 @@
import java.util.Map;
import java.util.Properties;

import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
import static com.ververica.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_KEY;
import static com.ververica.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_VALUE;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A builder to build a SourceFunction which can read snapshot and continue to consume binlog. */
/**
* A builder to build a SourceFunction which can read snapshot and continue to consume binlog.
*
* @deprecated please use {@link com.ververica.cdc.connectors.mysql.source.MySqlSource} instead
* which supports more rich features, e.g. parallel reading from historical data. The {@link
* MySqlSource} will be dropped in the future version.
*/
@Deprecated
public class MySqlSource {

private static final String DATABASE_SERVER_NAME = "mysql_binlog_source";

public static <T> Builder<T> builder() {
return new Builder<>();
}

/** Builder class of {@link MySqlSource}. */
/**
* Builder class of {@link MySqlSource}.
*
* @deprecated please use {@link
* com.ververica.cdc.connectors.mysql.source.MySqlSource#builder()} instead which supports
* more rich features, e.g. parallel reading from historical data. The {@link
* MySqlSource.Builder} will be dropped in the future version.
*/
@Deprecated
public static class Builder<T> {

private int port = 3306; // default 3306 port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package com.ververica.cdc.connectors.mysql;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;

import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.debezium.Validator;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;

import java.sql.SQLException;
Expand All @@ -43,19 +41,16 @@ public class MySqlValidator implements Validator {
private static final String BINLOG_FORMAT_ROW = "ROW";
private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL";

private final Configuration configuration;

public MySqlValidator(Properties properties) {
this(Configuration.fromMap(Maps.fromProperties(properties)));
}
private final Properties dbzProperties;

public MySqlValidator(Configuration configuration) {
this.configuration = configuration;
public MySqlValidator(Properties dbzProperties) {
this.dbzProperties = dbzProperties;
}

@Override
public void validate() {
try (MySqlConnection connection = DebeziumUtils.openMySqlConnection(configuration)) {
try (MySqlConnection connection =
DebeziumUtils.openMySqlConnection(Configuration.from(dbzProperties))) {
checkVersion(connection);
checkBinlogFormat(connection);
checkBinlogRowImage(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,36 @@

package com.ververica.cdc.connectors.mysql.debezium;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkRuntimeException;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;

import static com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.toDebeziumConfig;

/** Utilities related to Debezium. */
public class DebeziumUtils {

private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);

/**
* Creates {@link RelationalTableFilters} from configuration. The {@link RelationalTableFilters}
* can be used to filter tables according to "table.whitelist" and "database.whitelist" options.
*/
public static RelationalTableFilters createTableFilters(Configuration configuration) {
io.debezium.config.Configuration debeziumConfig = toDebeziumConfig(configuration);
final MySqlConnectorConfig mySqlConnectorConfig = new MySqlConnectorConfig(debeziumConfig);
return mySqlConnectorConfig.getTableFilters();
}

/** Creates and opens a new {@link MySqlConnection}. */
public static MySqlConnection openMySqlConnection(Configuration configuration) {
public static MySqlConnection openMySqlConnection(Configuration dbzConfiguration) {
MySqlConnection jdbc =
new MySqlConnection(
new MySqlConnection.MySqlConnectionConfiguration(
toDebeziumConfig(configuration)));
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
try {
jdbc.connect();
} catch (SQLException e) {
Expand All @@ -75,6 +70,37 @@ public static void closeMySqlConnection(MySqlConnection jdbc) {
}
}

/** Creates a new {@link MySqlConnection}, but not open the connection. */
public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) {
return new MySqlConnection(
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
}

/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */
public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) {
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration);
return new BinaryLogClient(
connectorConfig.hostname(),
connectorConfig.port(),
connectorConfig.username(),
connectorConfig.password());
}

/** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */
public static MySqlDatabaseSchema createMySqlDatabaseSchema(
MySqlConnectorConfig dbzMySqlConfig, MySqlConnection connection) {
boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig);
return new MySqlDatabaseSchema(
dbzMySqlConfig,
valueConverters,
topicSelector,
schemaNameAdjuster,
tableIdCaseInsensitive);
}

/** Fetch current binlog offsets in MySql Server. */
public static BinlogOffset currentBinlogOffset(MySqlConnection jdbc) {
final String showMasterStmt = "SHOW MASTER STATUS";
Expand Down Expand Up @@ -104,4 +130,30 @@ public static BinlogOffset currentBinlogOffset(MySqlConnection jdbc) {
e);
}
}

// --------------------------------------------------------------------------------------------

private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
TemporalPrecisionMode timePrecisionMode = dbzMySqlConfig.getTemporalPrecisionMode();
JdbcValueConverters.DecimalMode decimalMode = dbzMySqlConfig.getDecimalMode();
String bigIntUnsignedHandlingModeStr =
dbzMySqlConfig
.getConfig()
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
bigIntUnsignedHandlingModeStr);
JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
bigIntUnsignedHandlingMode.asBigIntUnsignedMode();

boolean timeAdjusterEnabled =
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
return new MySqlValueConverters(
decimalMode,
timePrecisionMode,
bigIntUnsignedMode,
dbzMySqlConfig.binaryHandlingMode(),
timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
MySqlValueConverters::defaultParsingErrorHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
// otherwise, we may can't find corresponding schema
Configuration dezConf =
statefulTaskContext
.getDezConf()
.getSourceConfig()
.getDbzConfiguration()
.edit()
.with("table.whitelist", currentSnapshotSplit.getTableId())
.build();
Expand Down
Loading