Skip to content

Commit b1bcb4f

Browse files
committed
fix review
1 parent 190fffb commit b1bcb4f

File tree

3 files changed

+37
-56
lines changed

3 files changed

+37
-56
lines changed

flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.ververica.cdc.connectors.postgres;
1818

19+
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
1920
import org.apache.flink.test.util.AbstractTestBase;
2021

2122
import io.debezium.config.Configuration;
@@ -137,4 +138,40 @@ protected PostgresConnection createConnection(Map<String, String> properties) {
137138
Configuration config = Configuration.from(properties);
138139
return new PostgresConnection(JdbcConfiguration.adapt(config), "test-connection");
139140
}
141+
142+
protected void waitForSnapshotStarted(String sinkName) throws InterruptedException {
143+
while (sinkSize(sinkName) == 0) {
144+
Thread.sleep(300);
145+
}
146+
}
147+
148+
protected void waitForSinkResult(String sinkName, List<String> expected)
149+
throws InterruptedException {
150+
List<String> actual = TestValuesTableFactory.getResults(sinkName);
151+
actual = actual.stream().sorted().collect(Collectors.toList());
152+
while (actual.size() != expected.size() || !actual.equals(expected)) {
153+
actual =
154+
TestValuesTableFactory.getResults(sinkName).stream()
155+
.sorted()
156+
.collect(Collectors.toList());
157+
Thread.sleep(1000);
158+
}
159+
}
160+
161+
protected void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
162+
while (sinkSize(sinkName) < expectedSize) {
163+
Thread.sleep(100);
164+
}
165+
}
166+
167+
protected int sinkSize(String sinkName) {
168+
synchronized (TestValuesTableFactory.class) {
169+
try {
170+
return TestValuesTableFactory.getRawResults(sinkName).size();
171+
} catch (IllegalArgumentException e) {
172+
// job is not started yet
173+
return 0;
174+
}
175+
}
176+
}
140177
}

flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -674,28 +674,4 @@ public void testUpsertMode() throws Exception {
674674

675675
result.getJobClient().get().cancel().get();
676676
}
677-
678-
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
679-
while (sinkSize(sinkName) == 0) {
680-
Thread.sleep(300);
681-
}
682-
}
683-
684-
private static void waitForSinkSize(String sinkName, int expectedSize)
685-
throws InterruptedException {
686-
while (sinkSize(sinkName) < expectedSize) {
687-
Thread.sleep(100);
688-
}
689-
}
690-
691-
private static int sinkSize(String sinkName) {
692-
synchronized (TestValuesTableFactory.class) {
693-
try {
694-
return TestValuesTableFactory.getRawResults(sinkName).size();
695-
} catch (IllegalArgumentException e) {
696-
// job is not started yet
697-
return 0;
698-
}
699-
}
700-
}
701677
}

flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.List;
4040
import java.util.Optional;
4141
import java.util.concurrent.ExecutionException;
42-
import java.util.stream.Collectors;
4342

4443
import static org.hamcrest.Matchers.containsInAnyOrder;
4544
import static org.junit.Assert.assertThat;
@@ -228,35 +227,4 @@ private String triggerSavepointWithRetry(JobClient jobClient, String savepointDi
228227
}
229228
return null;
230229
}
231-
232-
private static void waitForSinkResult(String sinkName, List<String> expected)
233-
throws InterruptedException {
234-
List<String> actual = TestValuesTableFactory.getResults(sinkName);
235-
actual = actual.stream().sorted().collect(Collectors.toList());
236-
while (actual.size() != expected.size() || !actual.equals(expected)) {
237-
actual =
238-
TestValuesTableFactory.getResults(sinkName).stream()
239-
.sorted()
240-
.collect(Collectors.toList());
241-
Thread.sleep(1000);
242-
}
243-
}
244-
245-
private static void waitForSinkSize(String sinkName, int expectedSize)
246-
throws InterruptedException {
247-
while (sinkSize(sinkName) < expectedSize) {
248-
Thread.sleep(100);
249-
}
250-
}
251-
252-
private static int sinkSize(String sinkName) {
253-
synchronized (TestValuesTableFactory.class) {
254-
try {
255-
return TestValuesTableFactory.getRawResults(sinkName).size();
256-
} catch (IllegalArgumentException e) {
257-
// job is not started yet
258-
return 0;
259-
}
260-
}
261-
}
262230
}

0 commit comments

Comments
 (0)