From 78fba948a097bfe8168e2fd621eccf74606a849d Mon Sep 17 00:00:00 2001 From: Luke Sikina Date: Fri, 27 Dec 2024 13:54:52 -0500 Subject: [PATCH] [ALS-7200] - Switch to async upload - Allows for 5G+ uploads to S3 --- uploader/pom.xml | 2 +- .../dataupload/aws/AWSClientBuilder.java | 15 ++++++---- .../dataupload/aws/S3StateVerifier.java | 6 ++-- .../dataupload/hpds/HttpClientConfig.java | 12 ++++---- .../dataupload/upload/DataUploadService.java | 3 +- .../dataupload/aws/AWSClientBuilderTest.java | 30 +++++++++---------- .../upload/DataUploadServiceTest.java | 15 ++++++---- 7 files changed, 46 insertions(+), 37 deletions(-) diff --git a/uploader/pom.xml b/uploader/pom.xml index c7f2b9e..616d657 100644 --- a/uploader/pom.xml +++ b/uploader/pom.xml @@ -59,7 +59,7 @@ software.amazon.awssdk - apache-client + netty-nio-client ${aws.version} diff --git a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilder.java b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilder.java index 85bf254..d787b69 100644 --- a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilder.java +++ b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilder.java @@ -8,6 +8,9 @@ import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; @@ -25,15 +28,15 @@ public class AWSClientBuilder { private final Map sites; private final StsClientProvider stsClientProvider; - private final S3ClientBuilder s3ClientBuilder; - private final SdkHttpClient sdkHttpClient; + private final S3AsyncClientBuilder s3ClientBuilder; + private final SdkAsyncHttpClient sdkHttpClient; @Autowired public AWSClientBuilder( Map sites, StsClientProvider stsClientProvider, - S3ClientBuilder s3ClientBuilder, - @Autowired(required = false) SdkHttpClient sdkHttpClient + S3AsyncClientBuilder s3ClientBuilder, + @Autowired(required = false) SdkAsyncHttpClient sdkHttpClient ) { this.sites = sites; this.stsClientProvider = stsClientProvider; @@ -41,7 +44,7 @@ public AWSClientBuilder( this.sdkHttpClient = sdkHttpClient; } - public Optional buildClientForSite(String siteName) { + public Optional buildClientForSite(String siteName) { log.info("Building client for site {}", siteName); if (!sites.containsKey(siteName)) { log.warn("Could not find site {}", siteName); @@ -78,7 +81,7 @@ public Optional buildClientForSite(String siteName) { return Optional.of(buildFromProvider(provider)); } - private S3Client buildFromProvider(StaticCredentialsProvider provider) { + private S3AsyncClient buildFromProvider(StaticCredentialsProvider provider) { if (sdkHttpClient == null) { return s3ClientBuilder.credentialsProvider(provider).build(); } diff --git a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/S3StateVerifier.java b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/S3StateVerifier.java index 40085eb..5d04811 100644 --- a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/S3StateVerifier.java +++ b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/aws/S3StateVerifier.java @@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.*; @@ -17,6 +18,7 @@ import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; @ConditionalOnProperty(name = "production", havingValue = "true") @Component @@ -59,7 +61,6 @@ private Optional deleteFileFromBucket(String s, SiteAWSInfo info) { DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(info.bucket()).key(s).build(); return clientBuilder.buildClientForSite(info.siteName()) .map(c -> c.deleteObject(request)) - .map(DeleteObjectResponse::deleteMarker) .map((ignored) -> s); } @@ -74,7 +75,8 @@ private String waitABit(String s) { private Optional uploadFileFromPath(Path p, SiteAWSInfo info) { LOG.info("Verifying upload capabilities"); - RequestBody body = RequestBody.fromFile(p.toFile()); + AsyncRequestBody body = AsyncRequestBody.fromFile(p.toFile()); + PutObjectRequest request = PutObjectRequest.builder() .bucket(info.bucket()) .serverSideEncryption(ServerSideEncryption.AWS_KMS) diff --git a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/hpds/HttpClientConfig.java b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/hpds/HttpClientConfig.java index e1f16f0..a780715 100644 --- a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/hpds/HttpClientConfig.java +++ b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/hpds/HttpClientConfig.java @@ -14,9 +14,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; @Configuration public class HttpClientConfig { @@ -45,17 +45,17 @@ public HttpClient getHttpClient() { @Bean - public SdkHttpClient getSdkClient() { + public SdkAsyncHttpClient getSdkClient() { if (!StringUtils.hasLength(proxyUser)) { return null; } LOG.info("Found proxy user {}, will configure sdk proxy", proxyUser); - ProxyConfiguration proxy = ProxyConfiguration.builder() + var proxy = ProxyConfiguration.builder() .useSystemPropertyValues(true) .username(proxyUser) .password(proxyPassword) .build(); - return ApacheHttpClient.builder() + return NettyNioAsyncHttpClient.builder() .proxyConfiguration(proxy) .build(); } diff --git a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java index a5cadab..aaf527f 100644 --- a/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java +++ b/uploader/src/main/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadService.java @@ -14,6 +14,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -119,7 +120,7 @@ private static void deleteFile(Path data) { private boolean uploadFileFromPath(Path p, SiteAWSInfo site, String dir) { try { - RequestBody body = RequestBody.fromFile(p.toFile()); + AsyncRequestBody body = AsyncRequestBody.fromFile(p.toFile()); PutObjectRequest request = PutObjectRequest.builder() .bucket(site.bucket()) .serverSideEncryption(ServerSideEncryption.AWS_KMS) diff --git a/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilderTest.java b/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilderTest.java index 2f39dae..6038d5b 100644 --- a/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilderTest.java +++ b/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/aws/AWSClientBuilderTest.java @@ -13,8 +13,8 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; @@ -38,7 +38,7 @@ class AWSClientBuilderTest { StsClientProvider stsClientProvider; @MockBean - S3ClientBuilder s3ClientBuilder; + S3AsyncClientBuilder S3AsyncClientBuilder; @Autowired AWSClientBuilder subject; @@ -48,8 +48,8 @@ void shouldNotBuildClientIfSiteDNE() { Mockito.when(sites.get("Narnia")) .thenReturn(null); - Optional actual = subject.buildClientForSite("Narnia"); - Optional expected = Optional.empty(); + Optional actual = subject.buildClientForSite("Narnia"); + Optional expected = Optional.empty(); Assertions.assertEquals(expected, actual); } @@ -72,8 +72,8 @@ void shouldNotBuildClientIfRoleRequestFails() { Mockito.when(stsClientProvider.createClient()) .thenReturn(Optional.of(stsClient)); - Optional actual = subject.buildClientForSite("bch"); - Optional expected = Optional.empty(); + Optional actual = subject.buildClientForSite("bch"); + Optional expected = Optional.empty(); Assertions.assertEquals(expected, actual); } @@ -111,14 +111,14 @@ void shouldBuildClient() { StaticCredentialsProvider provider = StaticCredentialsProvider.create(sessionCredentials); ArgumentMatcher credsMatcher = (AwsCredentialsProvider p) -> p.toString().equals(provider.toString()); - S3Client s3Client = Mockito.mock(S3Client.class); - Mockito.when(s3ClientBuilder.credentialsProvider(Mockito.argThat(credsMatcher))) - .thenReturn(s3ClientBuilder); - Mockito.when(s3ClientBuilder.build()) - .thenReturn(s3Client); - - Optional actual = subject.buildClientForSite("bch"); - Optional expected = Optional.of(s3Client); + S3AsyncClient S3AsyncClient = Mockito.mock(S3AsyncClient.class); + Mockito.when(S3AsyncClientBuilder.credentialsProvider(Mockito.argThat(credsMatcher))) + .thenReturn(S3AsyncClientBuilder); + Mockito.when(S3AsyncClientBuilder.build()) + .thenReturn(S3AsyncClient); + + Optional actual = subject.buildClientForSite("bch"); + Optional expected = Optional.of(S3AsyncClient); Assertions.assertEquals(expected, actual); } diff --git a/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java b/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java index a4144c1..2394f06 100644 --- a/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java +++ b/uploader/src/test/java/edu/harvard/dbmi/avillach/dataupload/upload/DataUploadServiceTest.java @@ -16,7 +16,9 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.util.ReflectionTestUtils; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; @@ -26,6 +28,7 @@ import java.nio.file.Path; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; @SpringBootTest @@ -44,7 +47,7 @@ class DataUploadServiceTest { private Path sharingRoot; @Mock - S3Client s3Client; + S3AsyncClient s3Client; @Mock AWSClientBuilder s3; @@ -102,14 +105,14 @@ void shouldNotUploadDataIfAWSUpset(@TempDir Path tempDir) throws IOException, In Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString()); Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true); Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client)); - Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class))) + Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class))) .thenThrow(AwsServiceException.builder().build()); subject.uploadData(q, DataType.Phenotypic, "bch"); Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying); Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading); - Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)); + Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class)); Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Error); Mockito.verify(uploadLock, Mockito.times(1)).acquire(); Mockito.verify(uploadLock, Mockito.times(1)).release(); @@ -129,14 +132,14 @@ void shouldUploadData(@TempDir Path tempDir) throws IOException, InterruptedExce Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString()); Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true); Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client)); - Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class))) - .thenReturn(Mockito.mock(PutObjectResponse.class)); + Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class))) + .thenReturn(CompletableFuture.completedFuture(Mockito.mock(PutObjectResponse.class))); subject.uploadData(q, DataType.Phenotypic, "bch"); Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying); Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading); - Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)); + Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class)); Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploaded); Assertions.assertFalse(Files.exists(fileToUpload)); Mockito.verify(uploadLock, Mockito.times(1)).acquire();