Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import java.util.List;
import java.util.Map;

import static io.debezium.connector.postgresql.PostgresConnectorConfig.PLUGIN_NAME;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection;
import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder;
import static io.debezium.connector.postgresql.Utils.currentOffset;
Expand Down Expand Up @@ -94,6 +96,10 @@ public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
return jdbc;
}

public PostgresConnection openJdbcConnection() {
return (PostgresConnection) openJdbcConnection(sourceConfig);
}

public PostgresReplicationConnection openPostgresReplicationConnection() {
try {
PostgresConnection jdbcConnection =
Expand Down Expand Up @@ -215,4 +221,12 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
streamFetchTask.commitCurrentOffset();
}
}

public String getSlotName() {
return sourceConfig.getDbzProperties().getProperty(SLOT_NAME.name());
}

public String getPluginName() {
return sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;

import java.sql.SQLException;

/**
* The Postgres source enumerator that enumerates receive the split request and assign the split to
Expand Down Expand Up @@ -56,6 +60,24 @@ public void start() {
* reading the globalStreamSplit to catch all data changes.
*/
private void createSlotForGlobalStreamSplit() {
SlotState slotInfo = null;
try (PostgresConnection connection = postgresDialect.openJdbcConnection()) {
slotInfo =
connection.getReplicationSlotState(
postgresDialect.getSlotName(), postgresDialect.getPluginName());
} catch (SQLException e) {
throw new RuntimeException(
String.format(
"Fail to get the replication slot info, the slot name is %s.",
postgresDialect.getSlotName()),
e);
}

// skip creating the replication slot when the slot exists.
if (slotInfo != null) {
return;
}

try {
PostgresReplicationConnection replicationConnection =
postgresDialect.openPostgresReplicationConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void execute(Context context) throws Exception {
ctx.getDispatcher(),
ctx.getSnapshotChangeEventSourceMetrics(),
split,
ctx.getSlotName(),
((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask(),
ctx.getPluginName());

SnapshotSplitChangeEventSourceContext changeEventSourceContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
Expand Down Expand Up @@ -121,6 +121,22 @@ private PostgresOffsetContext loadStartingOffsetState(
public void configure(SourceSplitBase sourceSplitBase) {
LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase);
PostgresConnectorConfig dbzConfig = getDbzConnectorConfig();
if (sourceSplitBase instanceof SnapshotSplit) {
dbzConfig =
new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build());
}

PostgresConnectorConfig.SnapshotMode snapshotMode =
PostgresConnectorConfig.SnapshotMode.parse(
Expand Down Expand Up @@ -164,21 +180,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.taskContext,
jdbcConnection,
this.snapShotter.shouldSnapshot(),
sourceSplitBase instanceof StreamSplit
? dbzConfig
: new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build())));
dbzConfig));

this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;

/** Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */
public class PostgresSQLSourceTest extends PostgresTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PostgresSQLSourceTest.class);
public class PostgreSQLSourceTest extends PostgresTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PostgreSQLSourceTest.class);
private static final String SLOT_NAME = "flink";
// These tests only passes at the docker postgres:9.6
private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.postgres;

import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;

import io.debezium.config.Configuration;
Expand Down Expand Up @@ -137,4 +138,40 @@ protected PostgresConnection createConnection(Map<String, String> properties) {
Configuration config = Configuration.from(properties);
return new PostgresConnection(JdbcConfiguration.adapt(config), "test-connection");
}

protected void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(300);
}
}

protected void waitForSinkResult(String sinkName, List<String> expected)
throws InterruptedException {
List<String> actual = TestValuesTableFactory.getResults(sinkName);
actual = actual.stream().sorted().collect(Collectors.toList());
while (actual.size() != expected.size() || !actual.equals(expected)) {
actual =
TestValuesTableFactory.getResults(sinkName).stream()
.sorted()
.collect(Collectors.toList());
Thread.sleep(1000);
}
}

protected void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}

protected int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -674,28 +674,4 @@ public void testUpsertMode() throws Exception {

result.getJobClient().get().cancel().get();
}

private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(300);
}
}

private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}

private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}
Loading