Skip to content

Commit 0438416

Browse files
authored
[cdc-base] Flink CDC base registers the identical history engine on multiple tasks (#1340)
This closes #1130
1 parent 70ebfef commit 0438416

File tree

7 files changed

+204
-12
lines changed

7 files changed

+204
-12
lines changed

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.flink.annotation.Experimental;
2020

21+
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
2122
import com.ververica.cdc.connectors.base.config.SourceConfig;
2223
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
2324
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
@@ -69,5 +70,6 @@ public interface DataSourceDialect<ID extends DataCollectionId, S, C extends Sou
6970
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);
7071

7172
/** The task context used for fetch task to fetch data from external systems. */
72-
FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase);
73+
FetchTask.Context createFetchTaskContext(
74+
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig);
7375
}

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/JdbcDataSourceDialect.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,6 @@ default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
7575
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);
7676

7777
@Override
78-
JdbcSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase);
78+
JdbcSourceFetchTaskContext createFetchTaskContext(
79+
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig);
7980
}

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public SourceReader createReader(SourceReaderContext readerContext) {
109109
Supplier<JdbcSourceSplitReader> splitReaderSupplier =
110110
() ->
111111
new JdbcSourceSplitReader(
112-
readerContext.getIndexOfSubtask(), dataSourceDialect);
112+
readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig);
113113
return new JdbcIncrementalSourceReader<>(
114114
elementsQueue,
115115
splitReaderSupplier,

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
2323
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
2424

25+
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
2526
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
2627
import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
2728
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
@@ -51,11 +52,14 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
5152
@Nullable private Fetcher<SourceRecord, SourceSplitBase> currentFetcher;
5253
@Nullable private String currentSplitId;
5354
private final JdbcDataSourceDialect dataSourceDialect;
55+
private final JdbcSourceConfig sourceConfig;
5456

55-
public JdbcSourceSplitReader(int subtaskId, JdbcDataSourceDialect dataSourceDialect) {
57+
public JdbcSourceSplitReader(
58+
int subtaskId, JdbcDataSourceDialect dataSourceDialect, JdbcSourceConfig sourceConfig) {
5659
this.subtaskId = subtaskId;
5760
this.splits = new ArrayDeque<>();
5861
this.dataSourceDialect = dataSourceDialect;
62+
this.sourceConfig = sourceConfig;
5963
}
6064

6165
@Override
@@ -114,7 +118,7 @@ protected void checkSplitOrStartNext() throws IOException {
114118
if (nextSplit.isSnapshotSplit()) {
115119
if (currentFetcher == null) {
116120
final JdbcSourceFetchTaskContext taskContext =
117-
dataSourceDialect.createFetchTaskContext(nextSplit);
121+
dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig);
118122
currentFetcher = new JdbcSourceScanFetcher(taskContext, subtaskId);
119123
}
120124
} else {
@@ -124,7 +128,7 @@ protected void checkSplitOrStartNext() throws IOException {
124128
currentFetcher.close();
125129
}
126130
final JdbcSourceFetchTaskContext taskContext =
127-
dataSourceDialect.createFetchTaskContext(nextSplit);
131+
dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig);
128132
currentFetcher = new JdbcSourceStreamFetcher(taskContext, subtaskId);
129133
LOG.info("Stream fetcher is created.");
130134
}

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.SIGNAL_EVENT_VALUE_SCHEMA_NAME;
4747
import static com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.WATERMARK_KIND;
4848
import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
49+
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
4950
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
5051
import static org.apache.flink.util.Preconditions.checkState;
5152

@@ -137,8 +138,17 @@ public static TableId getTableId(SourceRecord dataRecord) {
137138
Struct value = (Struct) dataRecord.value();
138139
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
139140
String dbName = source.getString(DATABASE_NAME_KEY);
141+
// Oracle need schemaName
142+
String schemaName = getSchemaName(source);
140143
String tableName = source.getString(TABLE_NAME_KEY);
141-
return new TableId(dbName, null, tableName);
144+
return new TableId(dbName, schemaName, tableName);
145+
}
146+
147+
public static String getSchemaName(Struct source) {
148+
if (source.schema().fields().stream().anyMatch(r -> SCHEMA_NAME_KEY.equals(r.name()))) {
149+
return source.getString(SCHEMA_NAME_KEY);
150+
}
151+
return null;
142152
}
143153

144154
public static Object[] getSplitKey(

flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/MySqlChangeEventSourceExampleTest.java

Lines changed: 174 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,27 @@
2020
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
2121
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
2222
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
23+
import org.apache.flink.table.api.DataTypes;
24+
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.table.data.conversion.RowRowConverter;
26+
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
27+
import org.apache.flink.table.types.DataType;
28+
import org.apache.flink.table.types.logical.LogicalType;
29+
import org.apache.flink.table.types.logical.RowType;
30+
import org.apache.flink.table.types.utils.TypeConversions;
2331
import org.apache.flink.test.util.MiniClusterWithClientResource;
32+
import org.apache.flink.util.CloseableIterator;
2433

2534
import com.ververica.cdc.connectors.base.experimental.MySqlSourceBuilder;
35+
import com.ververica.cdc.connectors.base.experimental.utils.MySqlConnectionUtils;
2636
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
2737
import com.ververica.cdc.connectors.base.testutils.MySqlContainer;
2838
import com.ververica.cdc.connectors.base.testutils.MySqlVersion;
2939
import com.ververica.cdc.connectors.base.testutils.UniqueDatabase;
3040
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
41+
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
42+
import io.debezium.connector.mysql.MySqlConnection;
43+
import io.debezium.jdbc.JdbcConnection;
3144
import org.junit.BeforeClass;
3245
import org.junit.Ignore;
3346
import org.junit.Rule;
@@ -37,8 +50,20 @@
3750
import org.testcontainers.containers.output.Slf4jLogConsumer;
3851
import org.testcontainers.lifecycle.Startables;
3952

53+
import java.sql.SQLException;
54+
import java.time.ZoneId;
55+
import java.util.ArrayList;
56+
import java.util.Arrays;
57+
import java.util.HashMap;
58+
import java.util.List;
59+
import java.util.Map;
60+
import java.util.stream.Collectors;
4061
import java.util.stream.Stream;
4162

63+
import static org.junit.Assert.assertArrayEquals;
64+
import static org.junit.Assert.assertEquals;
65+
import static org.junit.Assert.assertTrue;
66+
4267
/** Example Tests for {@link JdbcIncrementalSource}. */
4368
public class MySqlChangeEventSourceExampleTest {
4469

@@ -70,7 +95,7 @@ public static void startContainers() {
7095

7196
@Test
7297
@Ignore("Test ignored because it won't stop and is used for manual test")
73-
public void testConsumingAllEvents() throws Exception {
98+
public void testConsumingScanEvents() throws Exception {
7499
inventoryDatabase.createAndInitialize();
75100
JdbcIncrementalSource<String> mySqlChangeEventSource =
76101
new MySqlSourceBuilder()
@@ -100,6 +125,154 @@ public void testConsumingAllEvents() throws Exception {
100125
env.execute("Print MySQL Snapshot + Binlog");
101126
}
102127

128+
@Test
129+
@Ignore("Test ignored because it won't stop and is used for manual test")
130+
public void testConsumingAllEvents() throws Exception {
131+
final DataType dataType =
132+
DataTypes.ROW(
133+
DataTypes.FIELD("id", DataTypes.BIGINT()),
134+
DataTypes.FIELD("name", DataTypes.STRING()),
135+
DataTypes.FIELD("description", DataTypes.STRING()),
136+
DataTypes.FIELD("weight", DataTypes.FLOAT()));
137+
138+
inventoryDatabase.createAndInitialize();
139+
final String tableId = inventoryDatabase.getDatabaseName() + ".products";
140+
JdbcIncrementalSource<RowData> mySqlChangeEventSource =
141+
new MySqlSourceBuilder()
142+
.hostname(MYSQL_CONTAINER.getHost())
143+
.port(MYSQL_CONTAINER.getDatabasePort())
144+
.databaseList(inventoryDatabase.getDatabaseName())
145+
.tableList(tableId)
146+
.username(inventoryDatabase.getUsername())
147+
.password(inventoryDatabase.getPassword())
148+
.serverId("5401-5404")
149+
.deserializer(buildRowDataDebeziumDeserializeSchema(dataType))
150+
.includeSchemaChanges(true) // output the schema changes as well
151+
.splitSize(2)
152+
.build();
153+
154+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
155+
156+
// enable checkpoint
157+
env.enableCheckpointing(3000);
158+
// set the source parallelism to 4
159+
CloseableIterator<RowData> iterator =
160+
env.fromSource(
161+
mySqlChangeEventSource,
162+
WatermarkStrategy.noWatermarks(),
163+
"MySqlParallelSource")
164+
.setParallelism(4)
165+
.executeAndCollect(); // collect record
166+
167+
String[] snapshotExpectedRecords =
168+
new String[] {
169+
"+I[101, scooter, Small 2-wheel scooter, 3.14]",
170+
"+I[102, car battery, 12V car battery, 8.1]",
171+
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
172+
"+I[104, hammer, 12oz carpenter's hammer, 0.75]",
173+
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
174+
"+I[106, hammer, 16oz carpenter's hammer, 1.0]",
175+
"+I[107, rocks, box of assorted rocks, 5.3]",
176+
"+I[108, jacket, water resistent black wind breaker, 0.1]",
177+
"+I[109, spare tire, 24 inch spare tire, 22.2]"
178+
};
179+
180+
// step-1: consume snapshot data
181+
List<RowData> snapshotRowDataList = new ArrayList();
182+
for (int i = 0; i < snapshotExpectedRecords.length && iterator.hasNext(); i++) {
183+
snapshotRowDataList.add(iterator.next());
184+
}
185+
186+
List<String> snapshotActualRecords = formatResult(snapshotRowDataList, dataType);
187+
assertEqualsInAnyOrder(Arrays.asList(snapshotExpectedRecords), snapshotActualRecords);
188+
189+
// step-2: make 6 change events in one MySQL transaction
190+
makeBinlogEvents(getConnection(), tableId);
191+
192+
String[] binlogExpectedRecords =
193+
new String[] {
194+
"-U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
195+
"+U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
196+
"+I[110, spare tire, 28 inch spare tire, 26.2]",
197+
"-D[110, spare tire, 28 inch spare tire, 26.2]",
198+
"-U[103, cart, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]",
199+
"+U[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]"
200+
};
201+
202+
// step-3: consume binlog change events
203+
List<RowData> binlogRowDataList = new ArrayList();
204+
for (int i = 0; i < binlogExpectedRecords.length && iterator.hasNext(); i++) {
205+
binlogRowDataList.add(iterator.next());
206+
}
207+
List<String> binlogActualRecords = formatResult(binlogRowDataList, dataType);
208+
assertEqualsInAnyOrder(Arrays.asList(binlogExpectedRecords), binlogActualRecords);
209+
210+
// stop the worker
211+
iterator.close();
212+
}
213+
214+
private RowDataDebeziumDeserializeSchema buildRowDataDebeziumDeserializeSchema(
215+
DataType dataType) {
216+
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
217+
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
218+
return RowDataDebeziumDeserializeSchema.newBuilder()
219+
.setPhysicalRowType((RowType) dataType.getLogicalType())
220+
.setResultTypeInfo(typeInfo)
221+
.build();
222+
}
223+
224+
private List<String> formatResult(List<RowData> records, DataType dataType) {
225+
RowRowConverter rowRowConverter = RowRowConverter.create(dataType);
226+
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
227+
return records.stream()
228+
.map(rowRowConverter::toExternal)
229+
.map(Object::toString)
230+
.collect(Collectors.toList());
231+
}
232+
233+
private MySqlConnection getConnection() {
234+
Map<String, String> properties = new HashMap<>();
235+
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
236+
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
237+
properties.put("database.user", inventoryDatabase.getUsername());
238+
properties.put("database.password", inventoryDatabase.getPassword());
239+
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
240+
io.debezium.config.Configuration configuration =
241+
io.debezium.config.Configuration.from(properties);
242+
return MySqlConnectionUtils.createMySqlConnection(configuration);
243+
}
244+
245+
private void makeBinlogEvents(JdbcConnection connection, String tableId) throws SQLException {
246+
try {
247+
connection.setAutoCommit(false);
248+
249+
// make binlog events
250+
connection.execute(
251+
"UPDATE " + tableId + " SET name = 'cart' where id = 103",
252+
"INSERT INTO "
253+
+ tableId
254+
+ " VALUES(110,'spare tire','28 inch spare tire','26.2')",
255+
"DELETE FROM " + tableId + " where id = 110",
256+
"UPDATE " + tableId + " SET name = '12-pack drill bits' where id = 103");
257+
connection.commit();
258+
} finally {
259+
connection.close();
260+
}
261+
}
262+
263+
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
264+
assertTrue(expected != null && actual != null);
265+
assertEqualsInOrder(
266+
expected.stream().sorted().collect(Collectors.toList()),
267+
actual.stream().sorted().collect(Collectors.toList()));
268+
}
269+
270+
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
271+
assertTrue(expected != null && actual != null);
272+
assertEquals(expected.size(), actual.size());
273+
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
274+
}
275+
103276
private static MySqlContainer createMySqlContainer(MySqlVersion version) {
104277
return (MySqlContainer)
105278
new MySqlContainer(version)

flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,14 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
136136
}
137137

138138
@Override
139-
public MySqlSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase) {
139+
public MySqlSourceFetchTaskContext createFetchTaskContext(
140+
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
140141
final MySqlConnection jdbcConnection =
141-
createMySqlConnection(sourceConfig.getDbzConfiguration());
142+
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
142143
final BinaryLogClient binaryLogClient =
143-
createBinaryClient(sourceConfig.getDbzConfiguration());
144-
return new MySqlSourceFetchTaskContext(sourceConfig, this, jdbcConnection, binaryLogClient);
144+
createBinaryClient(taskSourceConfig.getDbzConfiguration());
145+
return new MySqlSourceFetchTaskContext(
146+
taskSourceConfig, this, jdbcConnection, binaryLogClient);
145147
}
146148

147149
@Override

0 commit comments

Comments
 (0)