Skip to content

Commit

Permalink
[ALS-7200]
Browse files Browse the repository at this point in the history
- Switch to async upload
- Allows for 5G+ uploads to S3
  • Loading branch information
Luke Sikina committed Dec 27, 2024
1 parent c729973 commit 78fba94
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 37 deletions.
2 changes: 1 addition & 1 deletion uploader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<artifactId>netty-nio-client</artifactId>
<version>${aws.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
Expand All @@ -25,23 +28,23 @@ public class AWSClientBuilder {

private final Map<String, SiteAWSInfo> sites;
private final StsClientProvider stsClientProvider;
private final S3ClientBuilder s3ClientBuilder;
private final SdkHttpClient sdkHttpClient;
private final S3AsyncClientBuilder s3ClientBuilder;
private final SdkAsyncHttpClient sdkHttpClient;

@Autowired
public AWSClientBuilder(
Map<String, SiteAWSInfo> sites,
StsClientProvider stsClientProvider,
S3ClientBuilder s3ClientBuilder,
@Autowired(required = false) SdkHttpClient sdkHttpClient
S3AsyncClientBuilder s3ClientBuilder,
@Autowired(required = false) SdkAsyncHttpClient sdkHttpClient
) {
this.sites = sites;
this.stsClientProvider = stsClientProvider;
this.s3ClientBuilder = s3ClientBuilder;
this.sdkHttpClient = sdkHttpClient;
}

public Optional<S3Client> buildClientForSite(String siteName) {
public Optional<S3AsyncClient> buildClientForSite(String siteName) {
log.info("Building client for site {}", siteName);
if (!sites.containsKey(siteName)) {
log.warn("Could not find site {}", siteName);
Expand Down Expand Up @@ -78,7 +81,7 @@ public Optional<S3Client> buildClientForSite(String siteName) {
return Optional.of(buildFromProvider(provider));
}

private S3Client buildFromProvider(StaticCredentialsProvider provider) {
private S3AsyncClient buildFromProvider(StaticCredentialsProvider provider) {
if (sdkHttpClient == null) {
return s3ClientBuilder.credentialsProvider(provider).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.*;

Expand All @@ -17,6 +18,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

@ConditionalOnProperty(name = "production", havingValue = "true")
@Component
Expand Down Expand Up @@ -59,7 +61,6 @@ private Optional<String> deleteFileFromBucket(String s, SiteAWSInfo info) {
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(info.bucket()).key(s).build();
return clientBuilder.buildClientForSite(info.siteName())
.map(c -> c.deleteObject(request))
.map(DeleteObjectResponse::deleteMarker)
.map((ignored) -> s);
}

Expand All @@ -74,7 +75,8 @@ private String waitABit(String s) {

private Optional<String> uploadFileFromPath(Path p, SiteAWSInfo info) {
LOG.info("Verifying upload capabilities");
RequestBody body = RequestBody.fromFile(p.toFile());
AsyncRequestBody body = AsyncRequestBody.fromFile(p.toFile());

PutObjectRequest request = PutObjectRequest.builder()
.bucket(info.bucket())
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;

@Configuration
public class HttpClientConfig {
Expand Down Expand Up @@ -45,17 +45,17 @@ public HttpClient getHttpClient() {


@Bean
public SdkHttpClient getSdkClient() {
public SdkAsyncHttpClient getSdkClient() {
if (!StringUtils.hasLength(proxyUser)) {
return null;
}
LOG.info("Found proxy user {}, will configure sdk proxy", proxyUser);
ProxyConfiguration proxy = ProxyConfiguration.builder()
var proxy = ProxyConfiguration.builder()
.useSystemPropertyValues(true)
.username(proxyUser)
.password(proxyPassword)
.build();
return ApacheHttpClient.builder()
return NettyNioAsyncHttpClient.builder()
.proxyConfiguration(proxy)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
Expand Down Expand Up @@ -119,7 +120,7 @@ private static void deleteFile(Path data) {

private boolean uploadFileFromPath(Path p, SiteAWSInfo site, String dir) {
try {
RequestBody body = RequestBody.fromFile(p.toFile());
AsyncRequestBody body = AsyncRequestBody.fromFile(p.toFile());
PutObjectRequest request = PutObjectRequest.builder()
.bucket(site.bucket())
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
Expand All @@ -38,7 +38,7 @@ class AWSClientBuilderTest {
StsClientProvider stsClientProvider;

@MockBean
S3ClientBuilder s3ClientBuilder;
S3AsyncClientBuilder S3AsyncClientBuilder;

@Autowired
AWSClientBuilder subject;
Expand All @@ -48,8 +48,8 @@ void shouldNotBuildClientIfSiteDNE() {
Mockito.when(sites.get("Narnia"))
.thenReturn(null);

Optional<S3Client> actual = subject.buildClientForSite("Narnia");
Optional<S3Client> expected = Optional.empty();
Optional<S3AsyncClient> actual = subject.buildClientForSite("Narnia");
Optional<S3AsyncClient> expected = Optional.empty();

Assertions.assertEquals(expected, actual);
}
Expand All @@ -72,8 +72,8 @@ void shouldNotBuildClientIfRoleRequestFails() {
Mockito.when(stsClientProvider.createClient())
.thenReturn(Optional.of(stsClient));

Optional<S3Client> actual = subject.buildClientForSite("bch");
Optional<S3Client> expected = Optional.empty();
Optional<S3AsyncClient> actual = subject.buildClientForSite("bch");
Optional<S3AsyncClient> expected = Optional.empty();

Assertions.assertEquals(expected, actual);
}
Expand Down Expand Up @@ -111,14 +111,14 @@ void shouldBuildClient() {

StaticCredentialsProvider provider = StaticCredentialsProvider.create(sessionCredentials);
ArgumentMatcher<AwsCredentialsProvider> credsMatcher = (AwsCredentialsProvider p) -> p.toString().equals(provider.toString());
S3Client s3Client = Mockito.mock(S3Client.class);
Mockito.when(s3ClientBuilder.credentialsProvider(Mockito.argThat(credsMatcher)))
.thenReturn(s3ClientBuilder);
Mockito.when(s3ClientBuilder.build())
.thenReturn(s3Client);

Optional<S3Client> actual = subject.buildClientForSite("bch");
Optional<S3Client> expected = Optional.of(s3Client);
S3AsyncClient S3AsyncClient = Mockito.mock(S3AsyncClient.class);
Mockito.when(S3AsyncClientBuilder.credentialsProvider(Mockito.argThat(credsMatcher)))
.thenReturn(S3AsyncClientBuilder);
Mockito.when(S3AsyncClientBuilder.build())
.thenReturn(S3AsyncClient);

Optional<S3AsyncClient> actual = subject.buildClientForSite("bch");
Optional<S3AsyncClient> expected = Optional.of(S3AsyncClient);

Assertions.assertEquals(expected, actual);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
Expand All @@ -26,6 +28,7 @@
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;

@SpringBootTest
Expand All @@ -44,7 +47,7 @@ class DataUploadServiceTest {
private Path sharingRoot;

@Mock
S3Client s3Client;
S3AsyncClient s3Client;

@Mock
AWSClientBuilder s3;
Expand Down Expand Up @@ -102,14 +105,14 @@ void shouldNotUploadDataIfAWSUpset(@TempDir Path tempDir) throws IOException, In
Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)))
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class)))
.thenThrow(AwsServiceException.builder().build());

subject.uploadData(q, DataType.Phenotypic, "bch");

Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class));
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class));
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Error);
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
Mockito.verify(uploadLock, Mockito.times(1)).release();
Expand All @@ -129,14 +132,14 @@ void shouldUploadData(@TempDir Path tempDir) throws IOException, InterruptedExce
Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)))
.thenReturn(Mockito.mock(PutObjectResponse.class));
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class)))
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(PutObjectResponse.class)));

subject.uploadData(q, DataType.Phenotypic, "bch");

Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class));
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(AsyncRequestBody.class));
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploaded);
Assertions.assertFalse(Files.exists(fileToUpload));
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
Expand Down

0 comments on commit 78fba94

Please sign in to comment.