Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
Expand Down Expand Up @@ -3108,34 +3109,52 @@ public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;

Set<TopicPartition> pendingPartitions = new HashSet<>(replicaDirInfoByPartition.keySet());
Map<String, Throwable> directoryFailures = new HashMap<>();

for (Map.Entry<String, LogDirDescription> responseEntry : logDirDescriptions(response).entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo = responseEntry.getValue();

// No replica info will be provided if the log directory is offline
if (logDirInfo.error() instanceof KafkaStorageException)
continue;
if (logDirInfo.error() != null)
handleFailure(new IllegalStateException(
"The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));

for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
TopicPartition tp = replicaInfoEntry.getKey();
ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
if (replicaLogDirInfo == null) {
log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp);
} else if (replicaInfo.isFuture()) {
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
logDir,
replicaInfo.offsetLag()));
} else {
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir,
replicaInfo.offsetLag(),
replicaLogDirInfo.getFutureReplicaLogDir(),
replicaLogDirInfo.getFutureReplicaOffsetLag()));
if (logDirInfo.error() instanceof ClusterAuthorizationException)
handleFailure(logDirInfo.error());

if (logDirInfo.error() == null) {
for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
TopicPartition tp = replicaInfoEntry.getKey();
ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
if (replicaLogDirInfo == null) {
log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp);
} else if (replicaInfo.isFuture()) {
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
logDir,
replicaInfo.offsetLag()));
} else {
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir,
replicaInfo.offsetLag(),
replicaLogDirInfo.getFutureReplicaLogDir(),
replicaLogDirInfo.getFutureReplicaOffsetLag()));
}
pendingPartitions.remove(tp);
}
} else {
directoryFailures.put(logDir, logDirInfo.error());
}
}

if (!pendingPartitions.isEmpty() && !directoryFailures.isEmpty()) {
List<String> errorAtDir = new ArrayList<>();
directoryFailures.forEach((k, v) -> errorAtDir.add(v.getClass().getName() + " at " + k));
Throwable error = new IllegalStateException("The error " + String.join(", ", errorAtDir) + " in the response from broker " + brokerId + " is illegal");
for (TopicPartition tp: pendingPartitions) {
KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
future.completeExceptionally(error);
}
}

Expand Down
Loading