Skip to content

Commit ce92fdd

Browse files
loserwang1024e-mhui
authored andcommitted
[postgres] Not drop replication slot for stream split (apache#2436)
1 parent 5a53bf9 commit ce92fdd

File tree

4 files changed

+136
-22
lines changed

4 files changed

+136
-22
lines changed

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
4646

4747
protected final JdbcSourceConfig sourceConfig;
4848
protected final JdbcDataSourceDialect dataSourceDialect;
49-
protected final CommonConnectorConfig dbzConnectorConfig;
49+
protected CommonConnectorConfig dbzConnectorConfig;
5050
protected final SchemaNameAdjuster schemaNameAdjuster;
5151

5252
public JdbcSourceFetchTaskContext(
@@ -156,6 +156,10 @@ public CommonConnectorConfig getDbzConnectorConfig() {
156156
return dbzConnectorConfig;
157157
}
158158

159+
public void setDbzConnectorConfig(CommonConnectorConfig dbzConnectorConfig) {
160+
this.dbzConnectorConfig = dbzConnectorConfig;
161+
}
162+
159163
public SchemaNameAdjuster getSchemaNameAdjuster() {
160164
return SchemaNameAdjuster.create();
161165
}

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,13 @@
2929
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
3030
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
3131
import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
32-
import io.debezium.config.Configuration;
3332
import io.debezium.connector.postgresql.PostgresConnectorConfig;
3433
import io.debezium.connector.postgresql.PostgresOffsetContext;
3534
import io.debezium.connector.postgresql.PostgresPartition;
3635
import io.debezium.connector.postgresql.PostgresSchema;
3736
import io.debezium.connector.postgresql.connection.PostgresConnection;
3837
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
3938
import io.debezium.connector.postgresql.spi.SlotState;
40-
import io.debezium.heartbeat.Heartbeat;
4139
import io.debezium.pipeline.EventDispatcher;
4240
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
4341
import io.debezium.pipeline.source.spi.ChangeEventSource;
@@ -60,8 +58,6 @@
6058
import java.util.ArrayList;
6159
import java.util.Objects;
6260

63-
import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP;
64-
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
6561
import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady;
6662
import static io.debezium.connector.postgresql.Utils.currentOffset;
6763
import static io.debezium.connector.postgresql.Utils.refreshSchema;
@@ -164,24 +160,9 @@ private void executeBackfillTask(
164160
PostgresOffsetUtils.getPostgresOffsetContext(
165161
loader, backfillSplit.getStartingOffset());
166162

167-
// we should only capture events for the current table,
168-
// otherwise, we may not find corresponding schema
169-
PostgresSourceConfig config = (PostgresSourceConfig) ctx.getSourceConfig();
170-
Configuration dbzConf =
171-
ctx.getDbzConnectorConfig()
172-
.getConfig()
173-
.edit()
174-
.with("table.include.list", split.getTableId().toString())
175-
.with(SLOT_NAME.name(), config.getSlotNameForBackfillTask())
176-
// drop slot for backfill stream split
177-
.with(DROP_SLOT_ON_STOP.name(), true)
178-
// Disable heartbeat event in snapshot split fetcher
179-
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
180-
.build();
181-
182163
final PostgresStreamFetchTask.StreamSplitReadTask backfillReadTask =
183164
new PostgresStreamFetchTask.StreamSplitReadTask(
184-
new PostgresConnectorConfig(dbzConf),
165+
ctx.getDbzConnectorConfig(),
185166
ctx.getSnapShotter(),
186167
ctx.getConnection(),
187168
ctx.getDispatcher(),
@@ -195,7 +176,7 @@ private void executeBackfillTask(
195176
LOG.info(
196177
"Execute backfillReadTask for split {} with slot name {}",
197178
split,
198-
dbzConf.getString(SLOT_NAME.name()));
179+
((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask());
199180
backfillReadTask.execute(
200181
new PostgresChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext);
201182
}

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ public void configure(SourceSplitBase sourceSplitBase) {
128128
dbzConfig
129129
.getConfig()
130130
.edit()
131+
.with(
132+
"table.include.list",
133+
((SnapshotSplit) sourceSplitBase)
134+
.getTableId()
135+
.toString())
131136
.with(
132137
SLOT_NAME.name(),
133138
((PostgresSourceConfig) sourceConfig)
@@ -137,8 +142,19 @@ public void configure(SourceSplitBase sourceSplitBase) {
137142
// Disable heartbeat event in snapshot split fetcher
138143
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
139144
.build());
145+
} else {
146+
dbzConfig =
147+
new PostgresConnectorConfig(
148+
dbzConfig
149+
.getConfig()
150+
.edit()
151+
// never drop slot for stream split, which is also global split
152+
.with(DROP_SLOT_ON_STOP.name(), false)
153+
.build());
140154
}
141155

156+
LOG.info("PostgresConnectorConfig is ", dbzConfig.getConfig().asProperties().toString());
157+
setDbzConnectorConfig(dbzConfig);
142158
PostgresConnectorConfig.SnapshotMode snapshotMode =
143159
PostgresConnectorConfig.SnapshotMode.parse(
144160
dbzConfig.getConfig().getString(SNAPSHOT_MODE));

flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,64 @@ public void testConsumingTableWithoutPrimaryKey() {
198198
}
199199
}
200200

201+
@Test
202+
public void testDebeziumSlotDropOnStop() throws Exception {
203+
String scanStartupMode = DEFAULT_SCAN_STARTUP_MODE;
204+
customDatabase.createAndInitialize();
205+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
206+
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
207+
208+
env.setParallelism(2);
209+
env.enableCheckpointing(200L);
210+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
211+
String sourceDDL =
212+
format(
213+
"CREATE TABLE customers ("
214+
+ " id BIGINT NOT NULL,"
215+
+ " name STRING,"
216+
+ " address STRING,"
217+
+ " phone_number STRING,"
218+
+ " primary key (id) not enforced"
219+
+ ") WITH ("
220+
+ " 'connector' = 'postgres-cdc',"
221+
+ " 'scan.incremental.snapshot.enabled' = 'true',"
222+
+ " 'hostname' = '%s',"
223+
+ " 'port' = '%s',"
224+
+ " 'username' = '%s',"
225+
+ " 'password' = '%s',"
226+
+ " 'database-name' = '%s',"
227+
+ " 'schema-name' = '%s',"
228+
+ " 'table-name' = '%s',"
229+
+ " 'scan.startup.mode' = '%s',"
230+
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
231+
+ " 'slot.name' = '%s', "
232+
+ " 'debezium.slot.drop.on.stop' = 'true'"
233+
+ ")",
234+
customDatabase.getHost(),
235+
customDatabase.getDatabasePort(),
236+
customDatabase.getUsername(),
237+
customDatabase.getPassword(),
238+
customDatabase.getDatabaseName(),
239+
SCHEMA_NAME,
240+
"customers",
241+
scanStartupMode,
242+
getSlotName());
243+
tEnv.executeSql(sourceDDL);
244+
TableResult tableResult = tEnv.executeSql("select * from customers");
245+
246+
// first step: check the snapshot data
247+
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
248+
checkSnapshotData(
249+
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
250+
}
251+
252+
// second step: check the stream data
253+
checkStreamDataWithDDLDuringFailover(
254+
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
255+
256+
tableResult.getJobClient().get().cancel().get();
257+
}
258+
201259
private void testPostgresParallelSource(
202260
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
203261
throws Exception {
@@ -371,6 +429,61 @@ private void checkStreamData(
371429
assertTrue(!hasNextData(iterator));
372430
}
373431

432+
private void checkStreamDataWithDDLDuringFailover(
433+
TableResult tableResult,
434+
FailoverType failoverType,
435+
FailoverPhase failoverPhase,
436+
String[] captureCustomerTables)
437+
throws Exception {
438+
waitUntilJobRunning(tableResult);
439+
CloseableIterator<Row> iterator = tableResult.collect();
440+
JobID jobId = tableResult.getJobClient().get().getJobID();
441+
442+
for (String tableId : captureCustomerTables) {
443+
makeFirstPartStreamEvents(
444+
getConnection(),
445+
customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId);
446+
}
447+
448+
// wait for the stream reading
449+
Thread.sleep(2000L);
450+
451+
if (failoverPhase == FailoverPhase.STREAM) {
452+
triggerFailover(
453+
failoverType,
454+
jobId,
455+
miniClusterResource.getMiniCluster(),
456+
() -> {
457+
for (String tableId : captureCustomerTables) {
458+
try {
459+
makeSecondPartStreamEvents(
460+
getConnection(),
461+
customDatabase.getDatabaseName()
462+
+ '.'
463+
+ SCHEMA_NAME
464+
+ '.'
465+
+ tableId);
466+
} catch (SQLException e) {
467+
throw new RuntimeException(e);
468+
}
469+
}
470+
sleepMs(200);
471+
});
472+
waitUntilJobRunning(tableResult);
473+
}
474+
475+
List<String> expectedStreamData = new ArrayList<>();
476+
for (int i = 0; i < captureCustomerTables.length; i++) {
477+
expectedStreamData.addAll(firstPartStreamEvents);
478+
expectedStreamData.addAll(secondPartStreamEvents);
479+
}
480+
// wait for the stream reading
481+
Thread.sleep(2000L);
482+
483+
assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
484+
assertTrue(!hasNextData(iterator));
485+
}
486+
374487
private void sleepMs(long millis) {
375488
try {
376489
Thread.sleep(millis);

0 commit comments

Comments
 (0)