Skip to content

Commit 35cf6d3

Browse files
authored
Polish Kafka examples (#9340)
1 parent 49b5a16 commit 35cf6d3

File tree

4 files changed

+25
-31
lines changed

4 files changed

+25
-31
lines changed

examples/kafka-cluster/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ dependencies {
1515
testImplementation 'com.google.guava:guava:23.0'
1616
testImplementation 'ch.qos.logback:logback-classic:1.3.14'
1717
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.0'
18+
testImplementation 'org.awaitility:awaitility:4.2.2'
1819
}
1920

2021
test {

examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.example.kafkacluster;
22

33
import lombok.SneakyThrows;
4-
import org.rnorth.ducttape.unreliables.Unreliables;
4+
import org.awaitility.Awaitility;
55
import org.testcontainers.containers.Container;
66
import org.testcontainers.containers.GenericContainer;
77
import org.testcontainers.containers.KafkaContainer;
@@ -11,11 +11,12 @@
1111

1212
import java.time.Duration;
1313
import java.util.Collection;
14-
import java.util.concurrent.TimeUnit;
1514
import java.util.stream.Collectors;
1615
import java.util.stream.IntStream;
1716
import java.util.stream.Stream;
1817

18+
import static org.assertj.core.api.Assertions.assertThat;
19+
1920
/**
2021
* Provides an easy way to launch a Kafka cluster with multiple brokers.
2122
*/
@@ -87,10 +88,10 @@ public void start() {
8788
// sequential start to avoid resource contention on CI systems with weaker hardware
8889
brokers.forEach(GenericContainer::start);
8990

90-
Unreliables.retryUntilTrue(
91-
30,
92-
TimeUnit.SECONDS,
93-
() -> {
91+
Awaitility
92+
.await()
93+
.atMost(Duration.ofSeconds(30))
94+
.untilAsserted(() -> {
9495
Container.ExecResult result =
9596
this.zookeeper.execInContainer(
9697
"sh",
@@ -101,9 +102,8 @@ public void start() {
101102
);
102103
String brokers = result.getStdout();
103104

104-
return brokers != null && brokers.split(",").length == this.brokersNum;
105-
}
106-
);
105+
assertThat(brokers.split(",")).hasSize(this.brokersNum);
106+
});
107107
}
108108

109109
@Override

examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
import org.apache.kafka.clients.producer.ProducerRecord;
1414
import org.apache.kafka.common.serialization.StringDeserializer;
1515
import org.apache.kafka.common.serialization.StringSerializer;
16+
import org.awaitility.Awaitility;
1617
import org.junit.jupiter.api.Test;
17-
import org.rnorth.ducttape.unreliables.Unreliables;
1818

1919
import java.time.Duration;
2020
import java.util.Collection;
@@ -100,24 +100,17 @@ protected void testKafkaFunctionality(String bootstrapServers, int partitions, i
100100

101101
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
102102

103-
Unreliables.retryUntilTrue(
104-
10,
105-
TimeUnit.SECONDS,
106-
() -> {
103+
Awaitility
104+
.await()
105+
.atMost(Duration.ofSeconds(10))
106+
.untilAsserted(() -> {
107107
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
108108

109-
if (records.isEmpty()) {
110-
return false;
111-
}
112-
113109
assertThat(records)
114110
.hasSize(1)
115111
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
116112
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
117-
118-
return true;
119-
}
120-
);
113+
});
121114

122115
consumer.unsubscribe();
123116
}

examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.example.kafkacluster;
22

33
import org.apache.kafka.common.Uuid;
4-
import org.rnorth.ducttape.unreliables.Unreliables;
4+
import org.awaitility.Awaitility;
55
import org.testcontainers.containers.Container;
66
import org.testcontainers.containers.GenericContainer;
77
import org.testcontainers.containers.KafkaContainer;
@@ -11,10 +11,11 @@
1111

1212
import java.time.Duration;
1313
import java.util.Collection;
14-
import java.util.concurrent.TimeUnit;
1514
import java.util.stream.Collectors;
1615
import java.util.stream.IntStream;
1716

17+
import static org.assertj.core.api.Assertions.assertThat;
18+
1819
public class KafkaContainerKraftCluster implements Startable {
1920

2021
private final int brokersNum;
@@ -79,10 +80,10 @@ public void start() {
7980
// Needs to start all the brokers at once
8081
brokers.parallelStream().forEach(GenericContainer::start);
8182

82-
Unreliables.retryUntilTrue(
83-
30,
84-
TimeUnit.SECONDS,
85-
() -> {
83+
Awaitility
84+
.await()
85+
.atMost(Duration.ofSeconds(30))
86+
.untilAsserted(() -> {
8687
Container.ExecResult result =
8788
this.brokers.stream()
8889
.findFirst()
@@ -94,9 +95,8 @@ public void start() {
9495
);
9596
String brokers = result.getStdout().replace("\n", "");
9697

97-
return brokers != null && Integer.valueOf(brokers) == this.brokersNum;
98-
}
99-
);
98+
assertThat(brokers).asInt().isEqualTo(this.brokersNum);
99+
});
100100
}
101101

102102
@Override

0 commit comments

Comments
 (0)