-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19631 Admin#describeReplicaLogDirs does not handle error correctly #20461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jack2012aa: Thanks for the patch.
A few comments left.
} | ||
|
||
if (!pendingPartitions.isEmpty() && !directoryFailures.isEmpty()) { | ||
ArrayList<String> errorAtDir = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
ArrayList<String> errorAtDir = new ArrayList<>(); | |
List<String> errorAtDir = new ArrayList<>(); |
for (Map.Entry<String, Throwable> entry : directoryFailures.entrySet()) { | ||
errorAtDir.add(entry.getValue().getClass().getName() + " at " + entry.getKey()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
for (Map.Entry<String, Throwable> entry : directoryFailures.entrySet()) { | |
errorAtDir.add(entry.getValue().getClass().getName() + " at " + entry.getKey()); | |
} | |
directoryFailures.forEach((k, v) -> errorAtDir.add(v.getClass().getName() + " at " + k)); |
Throwable e = assertThrows(Exception.class, () -> values.get(tpr).get()); | ||
assertInstanceOf(ClusterAuthorizationException.class, e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use TestUtils#assertFutureThrows
instead.
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0), | ||
env.cluster().nodeById(tpr.brokerId())); | ||
|
||
DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(singletonList(tpr)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List.of
DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = new DescribeLogDirsResponseData.DescribeLogDirsResult() | ||
.setErrorCode(Errors.LOG_DIR_NOT_FOUND.code()) | ||
.setLogDir(broker1log1); | ||
DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setResults(asList(successfulResult, failedResult))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List.of
DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setResults(asList(successfulResult, failedResult))); | ||
env.kafkaClient().prepareResponseFrom(response, env.cluster().nodeById(successfulTpr.brokerId())); | ||
|
||
DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List.of
Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values(); | ||
|
||
assertNotNull(values.get(successfulTpr).get()); | ||
assertThrows(Exception.class, () -> values.get(failedTpr).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should assert the class of this exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, an assertion on the exception type and could better ensure that the future ends with the expected reason. They are included in the new commit.
Description
Admin#desceibeReplicaLogDirs fails all futures when any error happens, regardless of the exception type or the failed directory. Especially, required by the issue, CLUSTER_AUTHORIZATION_FAILED should be handled independently.
Changes
Tests
Two tests are added: one for checking whether user can catch the CLUSTER_AUTHORIZATION_FAILED by getCause; one for checking whether failure in unrelated directory will affect other requests.
CLUSTER_AUTHORIZATION_FAILED
It is handled by an additional if statement now.
Single Directory-level Error
It is hard to map failed directories back to requested topic partition replica. If a directory or a broker fails, the response won't contain information of any replicas. The new attempt fails replicas that are not mentioned in healthy responses, if there is some error codes. It won't break the behavior of giving an empty result to unreplied replicas if no error code is in response.