Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<amazon-s3-analyticsaccelerator-s3.version>1.2.1</amazon-s3-analyticsaccelerator-s3.version>
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
<hsqldb.version>2.7.1</hsqldb.version>
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,10 +993,10 @@ public InputStreamType streamType() {
private class FactoryCallbacks implements StreamFactoryCallbacks {

@Override
public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
public S3Client getOrCreateSyncClient() throws IOException {
// Needs support of the CRT before the requireCRT can be used
LOG.debug("Stream factory requested async client");
return clientManager().getOrCreateAsyncClient();
return clientManager().getOrCreateS3Client();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -118,7 +119,7 @@ private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()

private CallableRaisingIOE<S3SeekableInputStreamFactory> createS3SeekableInputStreamFactory() {
return () -> new S3SeekableInputStreamFactory(
new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
new S3SyncSdkObjectClient(callbacks().getOrCreateSyncClient()),
seekableInputStreamConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.io.IOException;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;

import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.StreamCapabilities;
Expand Down Expand Up @@ -80,12 +80,11 @@ ObjectInputStream readObject(ObjectReadParameters parameters)
interface StreamFactoryCallbacks {

/**
* Get the Async S3Client, raising a failure to create as an IOException.
* @param requireCRT is the CRT required.
* Get the Sync S3Client, raising a failure to create as an IOException.
* @return the Async S3 client
* @throws IOException failure to create the client.
*/
S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException;
S3Client getOrCreateSyncClient() throws IOException;

void incrementFactoryStatistic(Statistic statistic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.s3a.prefetch.PrefetchingInputStreamFactory;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import software.amazon.awssdk.services.s3.S3Client;

import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_CUSTOM_FACTORY;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
Expand Down Expand Up @@ -329,9 +330,8 @@ public FactoryFailsToInstantiate() {
* Callbacks from {@link ObjectInputStreamFactory} instances.
*/
private static final class Callbacks implements ObjectInputStreamFactory.StreamFactoryCallbacks {

@Override
public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
public S3Client getOrCreateSyncClient() throws IOException {
throw new UnsupportedOperationException("not implemented");
}

Expand Down