diff --git a/.changes/next-release/feature-s3-fe521b0.json b/.changes/next-release/feature-s3-fe521b0.json new file mode 100644 index 000000000000..0c36868acb09 --- /dev/null +++ b/.changes/next-release/feature-s3-fe521b0.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "s3", + "contributor": "", + "description": "Add CRT shouldStream config as CRT_MEMORY_BUFFER_DISABLED SDK advanced client option" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/SdkAdvancedAsyncClientOption.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/SdkAdvancedAsyncClientOption.java index fb2a1135eeba..87ad499bfff4 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/SdkAdvancedAsyncClientOption.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/SdkAdvancedAsyncClientOption.java @@ -55,6 +55,15 @@ public final class SdkAdvancedAsyncClientOption extends ClientOption { public static final SdkAdvancedAsyncClientOption FUTURE_COMPLETION_EXECUTOR = new SdkAdvancedAsyncClientOption<>(Executor.class); + /** + * Advanced configuration for the native S3CrtAsyncClient which only applies for multipart uploads. When set to true, + * the client will skip buffering the part in native memory before sending the request. Default to false on small objects, + * and true when the object size exceed a certain threshold. When set to true, the client will also skip + * buffering for small objects. + */ + public static final SdkAdvancedAsyncClientOption CRT_MEMORY_BUFFER_DISABLED = + new SdkAdvancedAsyncClientOption<>(Boolean.class); + private SdkAdvancedAsyncClientOption(Class valueClass) { super(valueClass); } diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java index 911214dade11..6f09f757ef5e 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java @@ -36,6 +36,7 @@ import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.crt.CrtResource; @@ -79,6 +80,25 @@ public static void teardown() throws IOException { @Test void putObject_fileRequestBody_objectSentCorrectly() throws Exception { + S3AsyncClient crtClientWithMemoryBufferDisabled = S3CrtAsyncClient.builder() + .credentialsProvider(AwsTestBase.CREDENTIALS_PROVIDER_CHAIN) + .region(S3IntegrationTestBase.DEFAULT_REGION) + .advancedOption(SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED, true) + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath()); + crtClientWithMemoryBufferDisabled.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join(); + + ResponseInputStream objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + + byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath())); + + Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); + } + + @Test + void putObject_withMemoryBufferDisabled_fileRequestBody_objectSentCorrectly() throws Exception { AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath()); s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join(); @@ -90,6 +110,7 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception { Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); } + @Test void putObject_file_objectSentCorrectly() throws Exception { s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), testFile.toPath()).join(); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java index b4cedc3ec1c0..66569eb39f97 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java @@ -17,6 +17,7 @@ import java.net.URI; import java.nio.file.Path; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @@ -25,6 +26,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; +import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; import software.amazon.awssdk.identity.spi.IdentityProvider; import software.amazon.awssdk.regions.Region; @@ -361,6 +363,21 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer The type of the option. + */ + S3CrtAsyncClientBuilder advancedOption(SdkAdvancedAsyncClientOption option, T value); + + /** + * Configure the map of advanced override options. This will override all values currently configured. The values in the + * map must match the key type of the map, or a runtime exception will be raised. + */ + S3CrtAsyncClientBuilder advancedOptions(Map, ?> advancedOptions); @Override S3AsyncClient build(); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java index 676e060e3218..2436a27487fa 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java @@ -31,6 +31,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -47,6 +48,7 @@ import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttribute; @@ -78,6 +80,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.Validate; @@ -222,7 +225,8 @@ private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(Defau .readBufferSizeInBytes(builder.readBufferSizeInBytes) .httpConfiguration(builder.httpConfiguration) .thresholdInBytes(builder.thresholdInBytes) - .maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes); + .maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes) + .advancedOptions(builder.advancedOptions.build()); if (builder.retryConfiguration != null) { nativeClientBuilder.standardRetryOptions( @@ -257,6 +261,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB private Executor futureCompletionExecutor; private Boolean disableS3ExpressSessionAuth; + private AttributeMap.Builder advancedOptions = AttributeMap.builder(); @Override public DefaultS3CrtClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) { @@ -388,6 +393,18 @@ public DefaultS3CrtClientBuilder disableS3ExpressSessionAuth(Boolean disableS3Ex return this; } + @Override + public DefaultS3CrtClientBuilder advancedOption(SdkAdvancedAsyncClientOption option, T value) { + this.advancedOptions.put(option, value); + return this; + } + + @Override + public DefaultS3CrtClientBuilder advancedOptions(Map, ?> advancedOptions) { + this.advancedOptions.putAll(advancedOptions); + return this; + } + @Override public S3CrtAsyncClient build() { return new DefaultS3CrtAsyncClient(this); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java index 1fed55813d33..1963e7e7b357 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java @@ -48,6 +48,7 @@ import software.amazon.awssdk.crt.http.HttpProxyEnvironmentVariableSetting; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.s3.ChecksumConfig; +import software.amazon.awssdk.crt.s3.FileIoOptions; import software.amazon.awssdk.crt.s3.ResumeToken; import software.amazon.awssdk.crt.s3.S3Client; import software.amazon.awssdk.crt.s3.S3ClientOptions; @@ -127,6 +128,8 @@ private S3ClientOptions createS3ClientOption() { .ifPresent(options::withConnectTimeoutMs); Optional.ofNullable(s3NativeClientConfiguration.httpMonitoringOptions()) .ifPresent(options::withHttpMonitoringOptions); + Optional.ofNullable(s3NativeClientConfiguration.memoryBufferDisabled()) + .ifPresent(memoryBufferDisabled -> options.withFileIoOptions(new FileIoOptions(memoryBufferDisabled, 0.0, false))); return options; } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3NativeClientConfiguration.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3NativeClientConfiguration.java index f43cbf84d46e..e3c5bea0641f 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3NativeClientConfiguration.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3NativeClientConfiguration.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.services.s3.internal.crt; +import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED; import static software.amazon.awssdk.crtcore.CrtConfigurationUtils.resolveHttpMonitoringOptions; import static software.amazon.awssdk.crtcore.CrtConfigurationUtils.resolveProxy; @@ -33,6 +34,7 @@ import software.amazon.awssdk.identity.spi.IdentityProvider; import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; +import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.SdkAutoCloseable; import software.amazon.awssdk.utils.Validate; @@ -64,6 +66,7 @@ public class S3NativeClientConfiguration implements SdkAutoCloseable { private final HttpMonitoringOptions httpMonitoringOptions; private final Boolean useEnvironmentVariableProxyOptionsValues; private final long maxNativeMemoryLimitInBytes; + private final Boolean memoryBufferDisabled; public S3NativeClientConfiguration(Builder builder) { this.signingRegion = builder.signingRegion == null ? DefaultAwsRegionProviderChain.builder().build().getRegion().id() : @@ -113,6 +116,8 @@ public S3NativeClientConfiguration(Builder builder) { } this.standardRetryOptions = builder.standardRetryOptions; this.useEnvironmentVariableProxyOptionsValues = resolveUseEnvironmentVariableValues(builder); + this.memoryBufferDisabled = + builder.advancedOptions == null ? null : builder.advancedOptions.get(CRT_MEMORY_BUFFER_DISABLED); } private static Boolean resolveUseEnvironmentVariableValues(Builder builder) { @@ -191,6 +196,10 @@ public Long readBufferSizeInBytes() { return readBufferSizeInBytes; } + public Boolean memoryBufferDisabled() { + return memoryBufferDisabled; + } + @Override public void close() { clientBootstrap.close(); @@ -213,6 +222,8 @@ public static final class Builder { private Long thresholdInBytes; private Long maxNativeMemoryLimitInBytes; + private AttributeMap advancedOptions; + private Builder() { } @@ -274,5 +285,10 @@ public Builder thresholdInBytes(Long thresholdInBytes) { this.thresholdInBytes = thresholdInBytes; return this; } + + public Builder advancedOptions(AttributeMap advancedOptions) { + this.advancedOptions = advancedOptions; + return this; + } } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClientTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClientTest.java index 776b69c4a10a..8f69ff5b4807 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClientTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClientTest.java @@ -19,6 +19,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED; import static software.amazon.awssdk.http.Header.CONTENT_LENGTH; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME; @@ -68,6 +69,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.utils.AttributeMap; public class S3CrtAsyncHttpClientTest { private static final URI DEFAULT_ENDPOINT = URI.create("https://127.0.0.1:443"); @@ -124,7 +126,7 @@ public void defaultRequest_shouldSetMetaRequestOptionsCorrectly(Integer port) { .collect(HashMap::new, (m, h) -> m.put(h.getName(), h.getValue()) , Map::putAll); - String expectedPort = port == null || port.equals(443) ? "" : ":" + port; + String expectedPort = port == null || port.equals(443) ? "" : ":" + port; assertThat(headers).hasSize(4) .containsEntry("Host", DEFAULT_ENDPOINT.getHost() + expectedPort) .containsEntry("custom-header", "foobar") @@ -135,7 +137,7 @@ public void defaultRequest_shouldSetMetaRequestOptionsCorrectly(Integer port) { @Test public void getObject_shouldSetMetaRequestTypeCorrectly() { AsyncExecuteRequest asyncExecuteRequest = getExecuteRequestBuilder().putHttpExecutionAttribute(OPERATION_NAME, - "GetObject").build(); + "GetObject").build(); S3MetaRequestOptions actual = makeRequest(asyncExecuteRequest); assertThat(actual.getMetaRequestType()).isEqualTo(S3MetaRequestOptions.MetaRequestType.GET_OBJECT); @@ -145,7 +147,7 @@ public void getObject_shouldSetMetaRequestTypeCorrectly() { @Test public void putObject_shouldSetMetaRequestTypeCorrectly() { AsyncExecuteRequest asyncExecuteRequest = getExecuteRequestBuilder().putHttpExecutionAttribute(OPERATION_NAME, - "PutObject").build(); + "PutObject").build(); S3MetaRequestOptions actual = makeRequest(asyncExecuteRequest); assertThat(actual.getMetaRequestType()).isEqualTo(S3MetaRequestOptions.MetaRequestType.PUT_OBJECT); @@ -318,7 +320,7 @@ public void operationWithResponseAlgorithms_optInFromRequest_shouldHonor() { s3NativeClientConfiguration = S3NativeClientConfiguration.builder() .endpointOverride(DEFAULT_ENDPOINT) .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", - "test"))) + "test"))) .build(); asyncHttpClient = new S3CrtAsyncHttpClient(s3Client, S3CrtAsyncHttpClient.builder() @@ -443,6 +445,7 @@ void build_shouldPassThroughParameters() { .minimumThroughputTimeout(Duration.ofSeconds(2))) .proxyConfiguration(p -> p.host("127.0.0.1").port(8080)) .build()) + .advancedOptions(AttributeMap.builder().put(CRT_MEMORY_BUFFER_DISABLED, true).build()) .build(); try (S3CrtAsyncHttpClient client = (S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) { @@ -466,6 +469,26 @@ void build_shouldPassThroughParameters() { assertThat(clientOptions.getMaxConnections()).isEqualTo(100); assertThat(clientOptions.getThroughputTargetGbps()).isEqualTo(3.5); assertThat(clientOptions.getMemoryLimitInBytes()).isEqualTo(5L * 1024 * 1024 * 1024); + assertThat(clientOptions.getFileIoOptions()).isNotNull(); + assertThat(clientOptions.getFileIoOptions().getShouldStream()).isTrue(); + assertThat(clientOptions.getFileIoOptions().getDiskThroughputGbps()).isZero(); + assertThat(clientOptions.getFileIoOptions().getDirectIo()).isFalse(); + } + } + + @Test + void build_advancedOptionsNotSet_shouldUseDefault() { + String signingRegion = "us-west-2"; + S3NativeClientConfiguration configuration = + S3NativeClientConfiguration.builder() + .signingRegion(signingRegion) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", + "test"))) + .build(); + try (S3CrtAsyncHttpClient client = + (S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) { + S3ClientOptions clientOptions = client.s3ClientOptions(); + assertThat(clientOptions.getFileIoOptions()).isNull(); } } @@ -554,8 +577,8 @@ void build_ProxyConfigurationWithEnvironmentVariables(S3CrtHttpConfiguration s3C .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) .build(); - try(S3CrtAsyncHttpClient client = - (S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) { + try (S3CrtAsyncHttpClient client = + (S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build()) { S3ClientOptions clientOptions = client.s3ClientOptions(); if (environmentVariableType == null) { assertThat(clientOptions.getHttpProxyEnvironmentVariableSetting()).isNull();