Skip to content

Commit 2dc86bf

Browse files
committed
[postgres] Fix the slot name conflict bug
1 parent daba72e commit 2dc86bf

File tree

6 files changed

+314
-19
lines changed

6 files changed

+314
-19
lines changed

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
import java.util.List;
5454
import java.util.Map;
5555

56+
import static io.debezium.connector.postgresql.PostgresConnectorConfig.PLUGIN_NAME;
57+
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
5658
import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection;
5759
import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder;
5860
import static io.debezium.connector.postgresql.Utils.currentOffset;
@@ -94,6 +96,10 @@ public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
9496
return jdbc;
9597
}
9698

99+
public PostgresConnection openJdbcConnection() {
100+
return (PostgresConnection) openJdbcConnection(sourceConfig);
101+
}
102+
97103
public PostgresReplicationConnection openPostgresReplicationConnection() {
98104
try {
99105
PostgresConnection jdbcConnection =
@@ -215,4 +221,12 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
215221
streamFetchTask.commitCurrentOffset();
216222
}
217223
}
224+
225+
public String getSlotName() {
226+
return sourceConfig.getDbzProperties().getProperty(SLOT_NAME.name());
227+
}
228+
229+
public String getPluginName() {
230+
return sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name());
231+
}
218232
}

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
2525
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
2626
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
27+
import io.debezium.connector.postgresql.connection.PostgresConnection;
2728
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
29+
import io.debezium.connector.postgresql.spi.SlotState;
30+
31+
import java.sql.SQLException;
2832

2933
/**
3034
* The Postgres source enumerator that enumerates receive the split request and assign the split to
@@ -56,6 +60,20 @@ public void start() {
5660
* reading the globalStreamSplit to catch all data changes.
5761
*/
5862
private void createSlotForGlobalStreamSplit() {
63+
SlotState slotInfo = null;
64+
try (PostgresConnection connection = postgresDialect.openJdbcConnection()) {
65+
slotInfo =
66+
connection.getReplicationSlotState(
67+
postgresDialect.getSlotName(), postgresDialect.getPluginName());
68+
} catch (SQLException e) {
69+
// do nothing
70+
}
71+
72+
// skip creating the replication slot when the slot exists.
73+
if (slotInfo != null) {
74+
return;
75+
}
76+
5977
try {
6078
PostgresReplicationConnection replicationConnection =
6179
postgresDialect.openPostgresReplicationConnection();

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void execute(Context context) throws Exception {
109109
ctx.getDispatcher(),
110110
ctx.getSnapshotChangeEventSourceMetrics(),
111111
split,
112-
ctx.getSlotName(),
112+
((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask(),
113113
ctx.getPluginName());
114114

115115
SnapshotSplitChangeEventSourceContext changeEventSourceContext =

flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
2323
import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
2424
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
25+
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
2526
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
26-
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
2727
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
2828
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
2929
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
@@ -121,6 +121,22 @@ private PostgresOffsetContext loadStartingOffsetState(
121121
public void configure(SourceSplitBase sourceSplitBase) {
122122
LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase);
123123
PostgresConnectorConfig dbzConfig = getDbzConnectorConfig();
124+
if (sourceSplitBase instanceof SnapshotSplit) {
125+
dbzConfig =
126+
new PostgresConnectorConfig(
127+
dbzConfig
128+
.getConfig()
129+
.edit()
130+
.with(
131+
SLOT_NAME.name(),
132+
((PostgresSourceConfig) sourceConfig)
133+
.getSlotNameForBackfillTask())
134+
// drop slot for backfill stream split
135+
.with(DROP_SLOT_ON_STOP.name(), true)
136+
// Disable heartbeat event in snapshot split fetcher
137+
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
138+
.build());
139+
}
124140

125141
PostgresConnectorConfig.SnapshotMode snapshotMode =
126142
PostgresConnectorConfig.SnapshotMode.parse(
@@ -164,21 +180,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
164180
this.taskContext,
165181
jdbcConnection,
166182
this.snapShotter.shouldSnapshot(),
167-
sourceSplitBase instanceof StreamSplit
168-
? dbzConfig
169-
: new PostgresConnectorConfig(
170-
dbzConfig
171-
.getConfig()
172-
.edit()
173-
.with(
174-
SLOT_NAME.name(),
175-
((PostgresSourceConfig) sourceConfig)
176-
.getSlotNameForBackfillTask())
177-
// drop slot for backfill stream split
178-
.with(DROP_SLOT_ON_STOP.name(), true)
179-
// Disable heartbeat event in snapshot split fetcher
180-
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
181-
.build())));
183+
dbzConfig));
182184

183185
this.queue =
184186
new ChangeEventQueue.Builder<DataChangeEvent>()
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@
7777
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
7878

7979
/** Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */
80-
public class PostgresSQLSourceTest extends PostgresTestBase {
81-
private static final Logger LOG = LoggerFactory.getLogger(PostgresSQLSourceTest.class);
80+
public class PostgreSQLSourceTest extends PostgresTestBase {
81+
private static final Logger LOG = LoggerFactory.getLogger(PostgreSQLSourceTest.class);
8282
private static final String SLOT_NAME = "flink";
8383
// These tests only passes at the docker postgres:9.6
8484
private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD =

0 commit comments

Comments
 (0)