Skip to content

Commit b2c3fcc

Browse files
iliaxiliaxHaarolean
authored
BE: ACL enablement check fixes (#4034)
Co-authored-by: iliax <[email protected]> Co-authored-by: Roman Zabaluev <[email protected]>
1 parent 1cd303a commit b2c3fcc

File tree

1 file changed

+37
-30
lines changed

1 file changed

+37
-30
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.provectus.kafka.ui.util.KafkaVersion;
1616
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
1717
import java.io.Closeable;
18+
import java.time.Duration;
19+
import java.time.temporal.ChronoUnit;
1820
import java.util.ArrayList;
1921
import java.util.Collection;
2022
import java.util.HashMap;
@@ -129,38 +131,41 @@ private record ConfigRelatedInfo(String version,
129131
Set<SupportedFeature> features,
130132
boolean topicDeletionIsAllowed) {
131133

132-
private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
133-
return loadBrokersConfig(ac, List.of(controllerId))
134-
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
135-
.flatMap(configs -> {
136-
String version = "1.0-UNKNOWN";
137-
boolean topicDeletionEnabled = true;
138-
for (ConfigEntry entry : configs) {
139-
if (entry.name().contains("inter.broker.protocol.version")) {
140-
version = entry.value();
141-
}
142-
if (entry.name().equals("delete.topic.enable")) {
143-
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
144-
}
145-
}
146-
var builder = ConfigRelatedInfo.builder()
147-
.version(version)
148-
.topicDeletionIsAllowed(topicDeletionEnabled);
149-
return SupportedFeature.forVersion(ac, version)
150-
.map(features -> builder.features(features).build());
151-
});
134+
static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS);
135+
136+
private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
137+
return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
138+
.flatMap(desc -> {
139+
// choosing node from which we will get configs (starting with controller)
140+
var targetNodeId = Optional.ofNullable(desc.controller)
141+
.map(Node::id)
142+
.orElse(desc.getNodes().iterator().next().id());
143+
return loadBrokersConfig(ac, List.of(targetNodeId))
144+
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
145+
.flatMap(configs -> {
146+
String version = "1.0-UNKNOWN";
147+
boolean topicDeletionEnabled = true;
148+
for (ConfigEntry entry : configs) {
149+
if (entry.name().contains("inter.broker.protocol.version")) {
150+
version = entry.value();
151+
}
152+
if (entry.name().equals("delete.topic.enable")) {
153+
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
154+
}
155+
}
156+
final String finalVersion = version;
157+
final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
158+
return SupportedFeature.forVersion(ac, version)
159+
.map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
160+
});
161+
})
162+
.cache(UPDATE_DURATION);
152163
}
153164
}
154165

155166
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
156-
return describeClusterImpl(adminClient, Set.of())
157-
// choosing node from which we will get configs (starting with controller)
158-
.flatMap(descr -> descr.controller != null
159-
? Mono.just(descr.controller)
160-
: Mono.justOrEmpty(descr.nodes.stream().findFirst())
161-
)
162-
.flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
163-
.map(info -> new ReactiveAdminClient(adminClient, info));
167+
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
168+
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
164169
}
165170

166171

@@ -170,7 +175,7 @@ private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullab
170175
.doOnError(th -> !(th instanceof SecurityDisabledException)
171176
&& !(th instanceof InvalidRequestException)
172177
&& !(th instanceof UnsupportedVersionException),
173-
th -> log.warn("Error checking if security enabled", th))
178+
th -> log.debug("Error checking if security enabled", th))
174179
.onErrorReturn(false);
175180
}
176181

@@ -202,6 +207,8 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
202207

203208
@Getter(AccessLevel.PACKAGE) // visible for testing
204209
private final AdminClient client;
210+
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
211+
205212
private volatile ConfigRelatedInfo configRelatedInfo;
206213

207214
public Set<SupportedFeature> getClusterFeatures() {
@@ -228,7 +235,7 @@ public Mono<Void> updateInternalStats(@Nullable Node controller) {
228235
if (controller == null) {
229236
return Mono.empty();
230237
}
231-
return ConfigRelatedInfo.extract(client, controller.id())
238+
return configRelatedInfoMono
232239
.doOnNext(info -> this.configRelatedInfo = info)
233240
.then();
234241
}

0 commit comments

Comments
 (0)