diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 90f83eac935e7..fc5a2d50ba07d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -91,6 +91,7 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; @@ -3108,6 +3109,10 @@ public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; + + Set pendingPartitions = new HashSet<>(replicaDirInfoByPartition.keySet()); + Map directoryFailures = new HashMap<>(); + for (Map.Entry responseEntry : logDirDescriptions(response).entrySet()) { String logDir = responseEntry.getKey(); LogDirDescription logDirInfo = responseEntry.getValue(); @@ -3115,27 +3120,41 @@ public void handleResponse(AbstractResponse abstractResponse) { // No replica info will be provided if the log directory is offline if (logDirInfo.error() instanceof KafkaStorageException) continue; - if (logDirInfo.error() != null) - handleFailure(new IllegalStateException( - "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - - for (Map.Entry replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { - TopicPartition tp = replicaInfoEntry.getKey(); - ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); - ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); - if (replicaLogDirInfo == null) { - log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); - } else if (replicaInfo.isFuture()) { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), - replicaLogDirInfo.getCurrentReplicaOffsetLag(), - logDir, - replicaInfo.offsetLag())); - } else { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, - replicaInfo.offsetLag(), - replicaLogDirInfo.getFutureReplicaLogDir(), - replicaLogDirInfo.getFutureReplicaOffsetLag())); + if (logDirInfo.error() instanceof ClusterAuthorizationException) + handleFailure(logDirInfo.error()); + + if (logDirInfo.error() == null) { + for (Map.Entry replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { + TopicPartition tp = replicaInfoEntry.getKey(); + ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); + ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); + if (replicaLogDirInfo == null) { + log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); + } else if (replicaInfo.isFuture()) { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), + replicaLogDirInfo.getCurrentReplicaOffsetLag(), + logDir, + replicaInfo.offsetLag())); + } else { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, + replicaInfo.offsetLag(), + replicaLogDirInfo.getFutureReplicaLogDir(), + replicaLogDirInfo.getFutureReplicaOffsetLag())); + } + pendingPartitions.remove(tp); } + } else { + directoryFailures.put(logDir, logDirInfo.error()); + } + } + + if (!pendingPartitions.isEmpty() && !directoryFailures.isEmpty()) { + List errorAtDir = new ArrayList<>(); + directoryFailures.forEach((k, v) -> errorAtDir.add(v.getClass().getName() + " at " + k)); + Throwable error = new IllegalStateException("The error " + String.join(", ", errorAtDir) + " in the response from broker " + brokerId + " is illegal"); + for (TopicPartition tp: pendingPartitions) { + KafkaFutureImpl future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); + future.completeExceptionally(error); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index e7fa11177d3ed..2699f55ce0b55 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -309,6 +309,7 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.admin.KafkaAdminClient.DEFAULT_LEAVE_GROUP_REASON; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -869,7 +870,7 @@ public void testTimeoutWithoutMetadata() throws Exception { KafkaFuture future = env.adminClient().createTopics( singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); - TestUtils.assertFutureThrows(TimeoutException.class, future); + assertFutureThrows(TimeoutException.class, future); } } @@ -952,7 +953,7 @@ public void testPropagatedMetadataFetchException() throws Exception { KafkaFuture future = env.adminClient().createTopics( singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); - TestUtils.assertFutureThrows(SaslAuthenticationException.class, future); + assertFutureThrows(SaslAuthenticationException.class, future); } } @@ -982,7 +983,7 @@ public void testCreateTopicsPartialResponse() throws Exception { new NewTopic("myTopic2", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)); topicsResult.values().get("myTopic").get(); - TestUtils.assertFutureThrows(ApiException.class, topicsResult.values().get("myTopic2")); + assertFutureThrows(ApiException.class, topicsResult.values().get("myTopic2")); } } @@ -1086,7 +1087,7 @@ public void testCreateTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1131,9 +1132,9 @@ public void testCreateTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1157,9 +1158,9 @@ public void testCreateTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex new CreateTopicsOptions().retryOnQuotaViolation(false)); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1194,14 +1195,14 @@ public void testDeleteTopics() throws Exception { prepareDeleteTopicsResponse("myTopic", Errors.TOPIC_DELETION_DISABLED)); future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(TopicDeletionDisabledException.class, future); + assertFutureThrows(TopicDeletionDisabledException.class, future); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopics("myTopic"), prepareDeleteTopicsResponse("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)); future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, future); + assertFutureThrows(UnknownTopicOrPartitionException.class, future); // With topic IDs Uuid topicId = Uuid.randomUuid(); @@ -1218,14 +1219,14 @@ public void testDeleteTopics() throws Exception { prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED)); future = env.adminClient().deleteTopics(TopicCollection.ofTopicIds(singletonList(topicId)), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(TopicDeletionDisabledException.class, future); + assertFutureThrows(TopicDeletionDisabledException.class, future); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopicIds(topicId), prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID)); future = env.adminClient().deleteTopics(TopicCollection.ofTopicIds(singletonList(topicId)), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, future); + assertFutureThrows(UnknownTopicIdException.class, future); } } @@ -1244,7 +1245,7 @@ public void testDeleteTopicsPartialResponse() throws Exception { asList("myTopic", "myOtherTopic"), new DeleteTopicsOptions()); result.topicNameValues().get("myTopic").get(); - TestUtils.assertFutureThrows(ApiException.class, result.topicNameValues().get("myOtherTopic")); + assertFutureThrows(ApiException.class, result.topicNameValues().get("myOtherTopic")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1258,7 +1259,7 @@ public void testDeleteTopicsPartialResponse() throws Exception { TopicCollection.ofTopicIds(asList(topicId1, topicId2)), new DeleteTopicsOptions()); resultIds.topicIdValues().get(topicId1).get(); - TestUtils.assertFutureThrows(ApiException.class, resultIds.topicIdValues().get(topicId2)); + assertFutureThrows(ApiException.class, resultIds.topicIdValues().get(topicId2)); } } @@ -1290,7 +1291,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(result.topicNameValues().get("topic1").get()); assertNull(result.topicNameValues().get("topic2").get()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1320,7 +1321,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(resultIds.topicIdValues().get(topicId1).get()); assertNull(resultIds.topicIdValues().get(topicId2).get()); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); + assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1362,9 +1363,9 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO time.sleep(defaultApiTimeout + 1); assertNull(result.topicNameValues().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1398,9 +1399,9 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO time.sleep(defaultApiTimeout + 1); assertNull(resultIds.topicIdValues().get(topicId1).get()); - e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); + e = assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); + assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1421,9 +1422,9 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex new DeleteTopicsOptions().retryOnQuotaViolation(false)); assertNull(result.topicNameValues().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2")); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1441,9 +1442,9 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex new DeleteTopicsOptions().retryOnQuotaViolation(false)); assertNull(resultIds.topicIdValues().get(topicId1).get()); - e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); + e = assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2)); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); + assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1475,14 +1476,14 @@ public void testInvalidTopicNames() throws Exception { List sillyTopicNames = asList("", null); Map> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).topicNameValues(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(InvalidTopicException.class, deleteFutures.get(sillyTopicName)); + assertFutureThrows(InvalidTopicException.class, deleteFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); Map> describeFutures = env.adminClient().describeTopics(sillyTopicNames).topicNameValues(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(InvalidTopicException.class, describeFutures.get(sillyTopicName)); + assertFutureThrows(InvalidTopicException.class, describeFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); @@ -1493,7 +1494,7 @@ public void testInvalidTopicNames() throws Exception { Map> createFutures = env.adminClient().createTopics(newTopics).values(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(InvalidTopicException.class, createFutures.get(sillyTopicName)); + assertFutureThrows(InvalidTopicException.class, createFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); } @@ -1789,7 +1790,7 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiErrorHandling() thro asList(topicName1, topicName0), new DescribeTopicsOptions() ); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.allTopicNames()); + assertFutureThrows(TopicAuthorizationException.class, result.allTopicNames()); } } @@ -1888,10 +1889,10 @@ public void testDescribeAcls() throws Exception { env.kafkaClient().prepareResponse(new DescribeAclsResponse(new DescribeAclsResponseData() .setErrorCode(Errors.SECURITY_DISABLED.code()) .setErrorMessage("Security is disabled"), ApiKeys.DESCRIBE_ACLS.latestVersion())); - TestUtils.assertFutureThrows(SecurityDisabledException.class, env.adminClient().describeAcls(FILTER2).values()); + assertFutureThrows(SecurityDisabledException.class, env.adminClient().describeAcls(FILTER2).values()); // Test a call where we supply an invalid filter. - TestUtils.assertFutureThrows(InvalidRequestException.class, env.adminClient().describeAcls(UNKNOWN_FILTER).values()); + assertFutureThrows(InvalidRequestException.class, env.adminClient().describeAcls(UNKNOWN_FILTER).values()); } } @@ -1983,9 +1984,9 @@ public void testCreateAcls() throws Exception { new CreateAclsResponseData.AclCreationResult())))); results = env.adminClient().createAcls(asList(ACL1, ACL2)); assertCollectionIs(results.values().keySet(), ACL1, ACL2); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.values().get(ACL1)); + assertFutureThrows(SecurityDisabledException.class, results.values().get(ACL1)); results.values().get(ACL2).get(); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); + assertFutureThrows(SecurityDisabledException.class, results.all()); } } @@ -2013,8 +2014,8 @@ public void testDeleteAcls() throws Exception { assertEquals(ACL1, filter1Results.values().get(0).binding()); assertNull(filter1Results.values().get(1).exception()); assertEquals(ACL2, filter1Results.values().get(1).binding()); - TestUtils.assertFutureThrows(SecurityDisabledException.class, filterResults.get(FILTER2)); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); + assertFutureThrows(SecurityDisabledException.class, filterResults.get(FILTER2)); + assertFutureThrows(SecurityDisabledException.class, results.all()); // Test a call where one deletion result has an error. env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() @@ -2034,7 +2035,7 @@ public void testDeleteAcls() throws Exception { ApiKeys.DELETE_ACLS.latestVersion())); results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2)); assertTrue(results.values().get(FILTER2).get().values().isEmpty()); - TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); + assertFutureThrows(SecurityDisabledException.class, results.all()); // Test a call where there are no errors. env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() @@ -2105,7 +2106,7 @@ public void testElectLeaders() throws Exception { electionType, Set.of(topic1, topic2), new ElectLeadersOptions().timeoutMs(100)); - TestUtils.assertFutureThrows(TimeoutException.class, results.partitions()); + assertFutureThrows(TimeoutException.class, results.partitions()); } } } @@ -2170,7 +2171,7 @@ public void testDescribeConfigsPartialResponse() { topic2)).values(); assertEquals(Set.of(topic, topic2), result.keySet()); result.get(topic); - TestUtils.assertFutureThrows(ApiException.class, result.get(topic2)); + assertFutureThrows(ApiException.class, result.get(topic2)); } } @@ -2517,6 +2518,55 @@ public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, In } } + @Test + public void testDescribeReplicaLogDirsWithAuthorizationException() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0), + env.cluster().nodeById(tpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(List.of(tpr)); + Map> values = result.values(); + + assertFutureThrows(ClusterAuthorizationException.class, values.get(tpr)); + } + + } + + @Test + public void testDescribeReplicaLogDirsWithSingleDirException() throws ExecutionException, InterruptedException { + int brokerId = 1; + TopicPartitionReplica successfulTpr = new TopicPartitionReplica("topic", 12, brokerId); + TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 12, brokerId); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + String broker1log1 = "/var/data/kafka1"; + int broker1Log0PartitionSize = 987654321; + int broker1Log0OffsetLag = 24; + + DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult = prepareDescribeLogDirsResult( + successfulTpr, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false); + DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code()) + .setLogDir(broker1log1); + DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setResults(List.of(successfulResult, failedResult))); + env.kafkaClient().prepareResponseFrom(response, env.cluster().nodeById(successfulTpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(List.of(successfulTpr, failedTpr)); + Map> values = result.values(); + + assertNotNull(values.get(successfulTpr).get()); + Throwable e = assertFutureThrows(IllegalStateException.class, values.get(failedTpr)); + assertTrue(e.getMessage().equals("The error org.apache.kafka.common.errors.LogDirNotFoundException at " + broker1log1 + " in the response from broker " + brokerId + " is illegal")); + } + } + @Test public void testCreatePartitions() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { @@ -2578,7 +2628,7 @@ public void testCreatePartitionsRetryThrottlingExceptionWhenEnabled() throws Exc assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2624,9 +2674,9 @@ public void testCreatePartitionsRetryThrottlingExceptionWhenEnabledUntilRequestT time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2651,9 +2701,9 @@ public void testCreatePartitionsDontRetryThrottlingExceptionWhenDisabled() throw counts, new CreatePartitionsOptions().retryOnQuotaViolation(false)); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); + ThrottlingQuotaExceededException e = assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2")); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); + assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2688,7 +2738,7 @@ public void testDeleteRecordsTopicAuthorizationError() { recordsToDelete.put(partition, RecordsToDelete.beforeOffset(10L)); DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, results.lowWatermarks().get(partition)); + assertFutureThrows(TopicAuthorizationException.class, results.lowWatermarks().get(partition)); } } @@ -2734,7 +2784,7 @@ public void testDeleteRecordsMultipleSends() throws Exception { DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); assertEquals(3L, results.lowWatermarks().get(tp0).get().lowWatermark()); - TestUtils.assertFutureThrows(SaslAuthenticationException.class, results.lowWatermarks().get(tp1)); + assertFutureThrows(SaslAuthenticationException.class, results.lowWatermarks().get(tp1)); } } @@ -2856,13 +2906,13 @@ public void testDescribeTopicsByIds() throws ExecutionException, InterruptedExce DescribeTopicsResult result1 = env.adminClient().describeTopics( TopicCollection.ofTopicIds(singletonList(nonExistID))); - TestUtils.assertFutureThrows(UnknownTopicIdException.class, result1.allTopicIds()); + assertFutureThrows(UnknownTopicIdException.class, result1.allTopicIds()); Exception e = assertThrows(Exception.class, () -> result1.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); assertEquals(String.format("org.apache.kafka.common.errors.UnknownTopicIdException: TopicId %s not found.", nonExistID), e.getMessage()); DescribeTopicsResult result2 = env.adminClient().describeTopics( TopicCollection.ofTopicIds(singletonList(Uuid.ZERO_UUID))); - TestUtils.assertFutureThrows(InvalidTopicException.class, result2.allTopicIds()); + assertFutureThrows(InvalidTopicException.class, result2.allTopicIds()); e = assertThrows(Exception.class, () -> result2.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); assertEquals("The given topic id 'AAAAAAAAAAAAAAAAAAAAAA' cannot be represented in a request.", e.getCause().getMessage()); @@ -2924,10 +2974,10 @@ public void testDescribeClusterHandleError() { .setErrorMessage(errorMessage))); final DescribeClusterResult result = env.adminClient().describeCluster(); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.clusterId(), errorMessage); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.controller(), errorMessage); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.nodes(), errorMessage); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.authorizedOperations(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.clusterId(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.controller(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.nodes(), errorMessage); + assertFutureThrows(InvalidRequestException.class, result.authorizedOperations(), errorMessage); } } @@ -3088,7 +3138,7 @@ public void testListGroups() throws Exception { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(6, listings.size()); @@ -3122,7 +3172,7 @@ public void testListGroupsMetadataFailure() throws Exception { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -3279,7 +3329,7 @@ public void testListGroupsWithTypesOlderBrokerVersion() throws Exception { ListGroupsOptions options = new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE)); ListGroupsResult result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); // But a type filter which is just classic groups is permitted with an older broker, because they // only know about classic groups so the types filter can be omitted. @@ -3314,7 +3364,7 @@ public void testListGroupsWithTypesOlderBrokerVersion() throws Exception { options = new ListGroupsOptions().withTypes(Set.of(GroupType.CONSUMER)); result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3331,7 +3381,7 @@ public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers request -> request instanceof DescribeClusterRequest); final DescribeClusterResult result = env.adminClient().describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.nodes()); + assertFutureThrows(UnsupportedVersionException.class, result.nodes()); } } @@ -3428,7 +3478,7 @@ public void testListConsumerGroups() throws Exception { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(3, listings.size()); @@ -3463,7 +3513,7 @@ public void testListConsumerGroupsMetadataFailure() throws Exception { List.of())); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -3636,7 +3686,7 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3682,7 +3732,7 @@ public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception options = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.SHARE)); result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3780,7 +3830,7 @@ public void testListConsumerGroupsDeprecated() throws Exception { env.cluster().nodeById(3)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(3, listings.size()); @@ -3816,7 +3866,7 @@ public void testListConsumerGroupsDeprecatedMetadataFailure() throws Exception { List.of())); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -3953,7 +4003,7 @@ public void testListConsumerGroupsDeprecatedWithStatesOlderBrokerVersion() throw options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); result = env.adminClient().listConsumerGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3999,7 +4049,7 @@ public void testListConsumerGroupsDeprecatedWithTypesOlderBrokerVersion() throws options = new ListConsumerGroupsOptions().withTypes(Set.of(GroupType.SHARE)); result = env.adminClient().listConsumerGroups(options); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); // But a type filter which is just classic groups is permitted with an older broker, because they // only know about classic groups so the types filter can be omitted. @@ -4058,7 +4108,7 @@ public void testOffsetCommitNumRetries() throws Exception { offsets.put(tp1, new OffsetAndMetadata(123L)); final AlterConsumerGroupOffsetsResult result = env.adminClient().alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4081,7 +4131,7 @@ public void testStreamsOffsetCommitNumRetries() throws Exception { offsets.put(tp1, new OffsetAndMetadata(123L)); final AlterStreamsGroupOffsetsResult result = env.adminClient().alterStreamsGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4112,9 +4162,9 @@ public void testOffsetCommitWithMultipleErrors() throws Exception { .alterConsumerGroupOffsets(GROUP_ID, offsets); assertNull(result.partitionResult(foo0).get()); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); } } @@ -4145,9 +4195,9 @@ public void testStreamsOffsetCommitWithMultipleErrors() throws Exception { .alterStreamsGroupOffsets(GROUP_ID, offsets); assertNull(result.partitionResult(foo0).get()); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); + assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); } } @@ -4275,7 +4325,7 @@ public void testDescribeConsumerGroupNumRetries() throws Exception { final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4639,7 +4689,7 @@ public void testDescribeNonConsumerGroups() throws Exception { final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.describedGroups().get(GROUP_ID)); + assertFutureThrows(IllegalArgumentException.class, result.describedGroups().get(GROUP_ID)); } } @@ -4660,7 +4710,7 @@ public void testDescribeGroupsWithBothUnsupportedApis() throws InterruptedExcept request -> request instanceof DescribeGroupsRequest); DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.describedGroups().get(GROUP_ID)); + assertFutureThrows(UnsupportedVersionException.class, result.describedGroups().get(GROUP_ID)); } } @@ -4864,7 +4914,7 @@ public void testListConsumerGroupOffsetsNumRetries() throws Exception { final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - TestUtils.assertFutureThrows(TimeoutException.class, result.partitionsToOffsetAndMetadata()); + assertFutureThrows(TimeoutException.class, result.partitionsToOffsetAndMetadata()); } } @@ -4971,7 +5021,7 @@ public void testListConsumerGroupOffsetsNonRetriableErrors() throws Exception { ListConsumerGroupOffsetsResult errorResult = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionsToOffsetAndMetadata()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionsToOffsetAndMetadata()); } } } @@ -5345,7 +5395,7 @@ public void testDeleteConsumerGroupsNumRetries() throws Exception { final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5372,7 +5422,7 @@ public void testDeleteStreamsGroupsNumRetries() throws Exception { final DeleteStreamsGroupsResult result = env.adminClient().deleteStreamsGroups(groupIds); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5530,7 +5580,7 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); + assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); // Retriable errors should be retried env.kafkaClient().prepareResponse( @@ -5626,7 +5676,7 @@ public void testDeleteStreamsGroupsWithOlderBroker() throws Exception { prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); DeleteStreamsGroupsResult errorResult = env.adminClient().deleteStreamsGroups(groupIds); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); + assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); // Retriable errors should be retried env.kafkaClient().prepareResponse( @@ -5796,7 +5846,7 @@ public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception { final DeleteConsumerGroupOffsetsResult result = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5818,7 +5868,7 @@ public void testDeleteStreamsGroupOffsetsNumRetries() throws Exception { final DeleteStreamsGroupOffsetsResult result = env.adminClient() .deleteStreamsGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5957,8 +6007,8 @@ public void testDeleteConsumerGroupOffsets() throws Exception { GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); assertNull(errorResult.partitionResult(tp1).get()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); assertThrows(IllegalArgumentException.class, () -> errorResult.partitionResult(tp3)); } } @@ -6002,8 +6052,8 @@ public void testDeleteStreamsGroupOffsets() throws Exception { GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); assertNull(errorResult.partitionResult(tp1).get()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); + assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); assertThrows(IllegalArgumentException.class, () -> errorResult.partitionResult(tp3)); } } @@ -6120,8 +6170,8 @@ public void testDeleteConsumerGroupOffsetsNonRetriableErrors() throws Exception DeleteConsumerGroupOffsetsResult errorResult = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -6147,8 +6197,8 @@ public void testDeleteStreamsGroupOffsetsNonRetriableErrors() throws Exception { DeleteStreamsGroupOffsetsResult errorResult = env.adminClient() .deleteStreamsGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -6224,8 +6274,8 @@ public void testDeleteConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() th final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -6244,8 +6294,8 @@ public void testDeleteStreamsGroupOffsetsFindCoordinatorNonRetriableErrors() thr final DeleteStreamsGroupOffsetsResult errorResult = env.adminClient() .deleteStreamsGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -6561,7 +6611,7 @@ public void testListStreamsGroups() throws Exception { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(4, listings.size()); @@ -6596,7 +6646,7 @@ public void testListStreamsGroupsMetadataFailure() throws Exception { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -6655,7 +6705,7 @@ public void testListStreamsGroupsWithStatesOlderBrokerVersion() { .setGroupId("streams-group-1")))), env.cluster().nodeById(0)); ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -6971,7 +7021,7 @@ public void testListShareGroups() throws Exception { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(4, listings.size()); @@ -7006,7 +7056,7 @@ public void testListShareGroupsMetadataFailure() throws Exception { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); - TestUtils.assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.all()); } } @@ -7065,7 +7115,7 @@ public void testListShareGroupsWithStatesOlderBrokerVersion() { .setGroupId("share-group-1")))), env.cluster().nodeById(0)); ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -7301,10 +7351,10 @@ public void testIncrementalAlterConfigs() throws Exception { configs.put(groupResource, singletonList(alterConfigOp4)); AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs); - TestUtils.assertFutureThrows(ClusterAuthorizationException.class, result.values().get(brokerResource)); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(topicResource)); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(metricResource)); - TestUtils.assertFutureThrows(InvalidConfigurationException.class, result.values().get(groupResource)); + assertFutureThrows(ClusterAuthorizationException.class, result.values().get(brokerResource)); + assertFutureThrows(InvalidRequestException.class, result.values().get(topicResource)); + assertFutureThrows(InvalidRequestException.class, result.values().get(metricResource)); + assertFutureThrows(InvalidConfigurationException.class, result.values().get(groupResource)); // Test a call where there are no errors. responseData = new IncrementalAlterConfigsResponseData(); @@ -7398,7 +7448,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception { final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -7534,8 +7584,8 @@ public void testRemoveMembersFromGroupNonRetriableErrors() throws Exception { final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); - TestUtils.assertFutureThrows(error.exception().getClass(), result.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), result.memberResult(memberToRemove)); + assertFutureThrows(error.exception().getClass(), result.all()); + assertFutureThrows(error.exception().getClass(), result.memberResult(memberToRemove)); } } } @@ -7570,8 +7620,8 @@ public void testRemoveMembersFromGroup() throws Exception { MemberToRemove memberOne = new MemberToRemove(instanceOne); MemberToRemove memberTwo = new MemberToRemove(instanceTwo); - TestUtils.assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberOne)); - TestUtils.assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberTwo)); + assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberOne)); + assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberTwo)); MemberResponse responseOne = new MemberResponse() .setGroupInstanceId(instanceOne) @@ -7592,8 +7642,8 @@ public void testRemoveMembersFromGroup() throws Exception { new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); - TestUtils.assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.all()); - TestUtils.assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.memberResult(memberOne)); + assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.all()); + assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.memberResult(memberOne)); assertNull(memberLevelErrorResult.memberResult(memberTwo).get()); // Return with missing member. @@ -7607,9 +7657,9 @@ public void testRemoveMembersFromGroup() throws Exception { new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); - TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.all()); + assertFutureThrows(IllegalArgumentException.class, missingMemberResult.all()); // The memberOne was not included in the response. - TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.memberResult(memberOne)); + assertFutureThrows(IllegalArgumentException.class, missingMemberResult.memberResult(memberOne)); assertNull(missingMemberResult.memberResult(memberTwo).get()); @@ -7761,8 +7811,8 @@ public void testAlterPartitionReassignments() throws Exception { AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments); Future future1 = result1.all(); Future future2 = result1.values().get(tp1); - TestUtils.assertFutureThrows(UnknownServerException.class, future1); - TestUtils.assertFutureThrows(UnknownServerException.class, future2); + assertFutureThrows(UnknownServerException.class, future1); + assertFutureThrows(UnknownServerException.class, future2); // 2. NOT_CONTROLLER error handling AlterPartitionReassignmentsResponseData controllerErrResponseData = @@ -7813,7 +7863,7 @@ public void testAlterPartitionReassignments() throws Exception { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(partitionLevelErrData)); AlterPartitionReassignmentsResult partitionLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); - TestUtils.assertFutureThrows(InvalidReplicaAssignmentException.class, partitionLevelErrResult.values().get(tp1)); + assertFutureThrows(InvalidReplicaAssignmentException.class, partitionLevelErrResult.values().get(tp1)); partitionLevelErrResult.values().get(tp2).get(); // 4. top-level error @@ -7832,9 +7882,9 @@ public void testAlterPartitionReassignments() throws Exception { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(topLevelErrResponseData)); AlterPartitionReassignmentsResult topLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); - assertEquals(errorMessage, TestUtils.assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.all()).getMessage()); - assertEquals(errorMessage, TestUtils.assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp1)).getMessage()); - assertEquals(errorMessage, TestUtils.assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp2)).getMessage()); + assertEquals(errorMessage, assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.all()).getMessage()); + assertEquals(errorMessage, assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp1)).getMessage()); + assertEquals(errorMessage, assertFutureThrows(ClusterAuthorizationException.class, topLevelErrResult.values().get(tp2)).getMessage()); // 5. unrepresentable topic name error TopicPartition invalidTopicTP = new TopicPartition("", 0); @@ -7853,8 +7903,8 @@ public void testAlterPartitionReassignments() throws Exception { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(singlePartResponseData)); AlterPartitionReassignmentsResult unrepresentableTopicResult = env.adminClient().alterPartitionReassignments(invalidTopicReassignments); - TestUtils.assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidTopicTP)); - TestUtils.assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidPartitionTP)); + assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidTopicTP)); + assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidPartitionTP)); unrepresentableTopicResult.values().get(tp1).get(); // Test success scenario @@ -7923,7 +7973,7 @@ public void testListPartitionReassignments() throws Exception { env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(unknownTpData)); ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(Set.of(tp1, tp2)); - TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, unknownTpResult.reassignments()); + assertFutureThrows(UnknownTopicOrPartitionException.class, unknownTpResult.reassignments()); // 3. Success ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData() @@ -7974,7 +8024,7 @@ public void testAlterConsumerGroupOffsets() throws Exception { assertNull(result.all().get()); assertNull(result.partitionResult(tp1).get()); assertNull(result.partitionResult(tp2).get()); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); } } @@ -8006,7 +8056,7 @@ public void testAlterStreamsGroupOffsets() throws Exception { assertNull(result.all().get()); assertNull(result.partitionResult(tp1).get()); assertNull(result.partitionResult(tp2).get()); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); } } @@ -8118,8 +8168,8 @@ public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception { AlterConsumerGroupOffsetsResult errorResult = env.adminClient() .alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -8146,8 +8196,8 @@ public void testAlterStreamsGroupOffsetsNonRetriableErrors() throws Exception { AlterStreamsGroupOffsetsResult errorResult = env.adminClient() .alterStreamsGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); - TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); + assertFutureThrows(error.exception().getClass(), errorResult.all()); + assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -8229,8 +8279,8 @@ public void testAlterConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() thr final AlterConsumerGroupOffsetsResult errorResult = env.adminClient() .alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -8251,8 +8301,8 @@ public void testAlterStreamsGroupOffsetsFindCoordinatorNonRetriableErrors() thro final AlterStreamsGroupOffsetsResult errorResult = env.adminClient() .alterStreamsGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); + assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -8426,7 +8476,7 @@ public void testListOffsetsNonRetriableErrors() throws Exception { partitions.put(tp0, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.all()); + assertFutureThrows(TopicAuthorizationException.class, result.all()); } } @@ -8454,7 +8504,7 @@ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -8502,7 +8552,7 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex put(tp1, OffsetSpec.latest()); }}); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); + assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); assertEquals(345L, tp1Offset.offset()); @@ -8564,7 +8614,7 @@ public void testListOffsetsHandlesFulfillmentTimeouts() throws Exception { put(tp1, OffsetSpec.latest()); } }); - TestUtils.assertFutureThrows(TimeoutException.class, result.partitionResult(tp0)); + assertFutureThrows(TimeoutException.class, result.partitionResult(tp0)); ListOffsetsResultInfo tp1Result = result.partitionResult(tp1).get(); assertEquals(345L, tp1Result.offset()); assertEquals(543, tp1Result.leaderEpoch().get().intValue()); @@ -8627,7 +8677,7 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { ListOffsetsResult result = env.adminClient().listOffsets( Collections.singletonMap(tp0, OffsetSpec.latest())); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); + assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); } } @@ -8954,56 +9004,56 @@ public void testDescribeMetadataQuorumFailure() { body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.NONE, false, false, false, false, false)); KafkaFuture future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(InvalidRequestException.class, future); + assertFutureThrows(InvalidRequestException.class, future); // Test incorrect topic count env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, false, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test incorrect topic name env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, true, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test incorrect partition count env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, true, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test incorrect partition index env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test partition level error env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.INVALID_REQUEST, false, false, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(InvalidRequestException.class, future); + assertFutureThrows(InvalidRequestException.class, future); // Test all incorrect and no errors env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, true, true, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(UnknownServerException.class, future); + assertFutureThrows(UnknownServerException.class, future); // Test all incorrect and both errors env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.INVALID_REQUEST, true, true, true, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(InvalidRequestException.class, future); + assertFutureThrows(InvalidRequestException.class, future); } } @@ -9218,7 +9268,7 @@ public void testListOffsetsMetadataNonRetriableErrors( partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); - TestUtils.assertFutureThrows(expectedFailure, result.all()); + assertFutureThrows(expectedFailure, result.all()); } } @@ -9286,8 +9336,8 @@ public void testListOffsetsPartialResponse() throws Exception { partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); assertNotNull(result.partitionResult(tp0).get()); - TestUtils.assertFutureThrows(ApiException.class, result.partitionResult(tp1)); - TestUtils.assertFutureThrows(ApiException.class, result.all()); + assertFutureThrows(ApiException.class, result.partitionResult(tp1)); + assertFutureThrows(ApiException.class, result.all()); } } @@ -9406,7 +9456,7 @@ private void testApiTimeout(int requestTimeoutMs, }, "Timed out waiting for Metadata request to be sent"); time.sleep(requestTimeoutMs + 1); - TestUtils.assertFutureThrows(TimeoutException.class, result.future); + assertFutureThrows(TimeoutException.class, result.future); } } @@ -9446,7 +9496,7 @@ public void testRequestTimeoutExceedingDefaultApiTimeout() throws Exception { // Now sleep the remaining time for the request timeout to expire time.sleep(60000); - TestUtils.assertFutureThrows(TimeoutException.class, result.future); + assertFutureThrows(TimeoutException.class, result.future); } } @@ -9536,8 +9586,8 @@ public void testAlterClientQuotas() throws Exception { AlterClientQuotasResult result = env.adminClient().alterClientQuotas(entries); result.values().get(goodEntity); - TestUtils.assertFutureThrows(ClusterAuthorizationException.class, result.values().get(unauthorizedEntity)); - TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(invalidEntity)); + assertFutureThrows(ClusterAuthorizationException.class, result.values().get(unauthorizedEntity)); + assertFutureThrows(InvalidRequestException.class, result.values().get(invalidEntity)); // ensure immutable assertThrows(UnsupportedOperationException.class, () -> result.values().put(newClientQuotaEntity(ClientQuotaEntity.USER, "user-3"), null)); @@ -9576,7 +9626,7 @@ public void testAlterReplicaLogDirsLogDirNotFound() throws Exception { logDirs.put(tpr1, "/data1"); AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); assertNull(result.values().get(tpr0).get()); - TestUtils.assertFutureThrows(LogDirNotFoundException.class, result.values().get(tpr1)); + assertFutureThrows(LogDirNotFoundException.class, result.values().get(tpr1)); } } @@ -9607,7 +9657,7 @@ public void testAlterReplicaLogDirsPartialResponse() throws Exception { logDirs.put(tpr2, "/data1"); AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); assertNull(result.values().get(tpr1).get()); - TestUtils.assertFutureThrows(ApiException.class, result.values().get(tpr2)); + assertFutureThrows(ApiException.class, result.values().get(tpr2)); } } @@ -9643,7 +9693,7 @@ public void testAlterReplicaLogDirsPartialFailure() throws Exception { // Advance time past the default api timeout to time out the inflight request time.sleep(defaultApiTimeout + 1); - TestUtils.assertFutureThrows(TimeoutException.class, result.values().get(tpr1)); + assertFutureThrows(TimeoutException.class, result.values().get(tpr1)); assertNull(result.values().get(tpr2).get()); } } @@ -9831,7 +9881,7 @@ public void testDescribeLogDirsPartialFailure() throws Exception { // Advance time past the default api timeout to time out the inflight request time.sleep(defaultApiTimeout + 1); - TestUtils.assertFutureThrows(TimeoutException.class, result.descriptions().get(0)); + assertFutureThrows(TimeoutException.class, result.descriptions().get(0)); assertNotNull(result.descriptions().get(1).get()); } } @@ -9890,7 +9940,7 @@ public void testUnregisterBrokerFailure() { UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId); // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); } } @@ -9924,7 +9974,7 @@ public void testUnregisterBrokerTimeoutAndFailureRetry() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + assertFutureThrows(UnknownServerException.class, result.all()); } } @@ -9941,7 +9991,7 @@ public void testUnregisterBrokerTimeoutMaxRetry() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -9958,7 +10008,7 @@ public void testUnregisterBrokerTimeoutMaxWait() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); } } @@ -10018,7 +10068,7 @@ public void testDescribeProducersTimeout(boolean timeoutInMetadataLookup) throws "Future failed to timeout after expiration of timeout"); assertTrue(result.all().isCompletedExceptionally()); - TestUtils.assertFutureThrows(TimeoutException.class, result.all()); + assertFutureThrows(TimeoutException.class, result.all()); assertFalse(env.kafkaClient().hasInFlightRequests()); } } @@ -10756,7 +10806,7 @@ public void testListClientMetricsResourcesNotSupported() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -10820,7 +10870,7 @@ public void testListConfigResourcesNotSupported() { Set.of(ConfigResource.Type.UNKNOWN), new ListConfigResourcesOptions()); assertNotNull(result.all()); - TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -10966,7 +11016,7 @@ public void testAddRaftVoterRequest(boolean fail, boolean sendClusterId) throws options); assertNotNull(result.all()); if (fail) { - TestUtils.assertFutureThrows(DuplicateVoterException.class, result.all()); + assertFutureThrows(DuplicateVoterException.class, result.all()); } else { result.all().get(); } @@ -11051,7 +11101,7 @@ public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) thro options); assertNotNull(result.all()); if (fail) { - TestUtils.assertFutureThrows(VoterNotFoundException.class, result.all()); + assertFutureThrows(VoterNotFoundException.class, result.all()); } else { result.all().get(); } @@ -11375,7 +11425,7 @@ public void testAlterShareGroupOffsets() throws Exception { assertNull(result.partitionResult(fooTopicPartition0).get()); assertNull(result.partitionResult(fooTopicPartition1).get()); assertNull(result.partitionResult(barPartition0).get()); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); } } @@ -11395,9 +11445,9 @@ public void testAlterShareGroupOffsetsWithTopLevelError() throws Exception { env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data)); final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L)); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.all()); - TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.partitionResult(fooTopicPartition1)); - TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); + assertFutureThrows(GroupAuthorizationException.class, result.all()); + assertFutureThrows(GroupAuthorizationException.class, result.partitionResult(fooTopicPartition1)); + assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0)); } } @@ -11422,9 +11472,9 @@ public void testAlterShareGroupOffsetsWithErrorInOnePartition() throws Exception env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data)); final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L)); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.all()); + assertFutureThrows(TopicAuthorizationException.class, result.all()); assertNull(result.partitionResult(fooTopicPartition0).get()); - TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.partitionResult(fooTopicPartition1)); + assertFutureThrows(TopicAuthorizationException.class, result.partitionResult(fooTopicPartition1)); assertNull(result.partitionResult(barPartition0).get()); } } @@ -11521,7 +11571,7 @@ public void testDeleteShareGroupOffsetsWithErrorInGroup() throws Exception { env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data)); final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName, barName)); - TestUtils.assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(), result.all()); + assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(), result.all()); } } @@ -11552,8 +11602,8 @@ public void testDeleteShareGroupOffsetsWithErrorInOneTopic() throws Exception { env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data)); final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName, barName)); - TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.all()); - TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.topicResult(fooName)); + assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.all()); + assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.topicResult(fooName)); assertNull(result.topicResult(barName).get()); } }