Skip to content

Commit

Permalink
[ALS-7200] Upload large files via multipart
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Sikina committed Dec 30, 2024
1 parent c729973 commit dd50a7c
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;

Expand All @@ -32,6 +36,7 @@
public class DataUploadService {

private static final Logger LOG = LoggerFactory.getLogger(DataUploadService.class);
private static final int SIXTEEN_MB = 16 * 1024 * 1024;

@Autowired
private Semaphore uploadLock;
Expand Down Expand Up @@ -109,7 +114,7 @@ protected void uploadData(Query query, DataType dataType, String site) {
uploadLock.release();
}

private static void deleteFile(Path data) {
private void deleteFile(Path data) {
try {
Files.delete(data);
} catch (IOException e) {
Expand All @@ -118,20 +123,98 @@ private static void deleteFile(Path data) {
}

private boolean uploadFileFromPath(Path p, SiteAWSInfo site, String dir) {
Optional<S3Client> maybeClient = s3ClientBuilder.buildClientForSite(site.siteName());
if (maybeClient.isEmpty()) {
LOG.info("There is no client for site {}", site);
return false;
}
S3Client s3 = maybeClient.get();
LOG.info("Starting multipart upload for file {} to site {} in dir {}", p, site, dir);

CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder()
.bucket(site.bucket())
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
.ssekmsKeyId(site.kmsKeyID())
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
.build();
String uploadId;
try {
uploadId = s3.createMultipartUpload(createRequest).uploadId();
} catch (AwsServiceException e) {
LOG.error("Error creating multipart: ", e);
return false;
}
LOG.info("Created initial multipart request and notified S3");

LOG.info("Starting upload process...");
List<CompletedPart> completedParts = uploadAllParts(p, site, dir, uploadId, s3);
if (completedParts.isEmpty()) {
return false;
}
LOG.info("Upload complete! Uploaded {} parts", completedParts.size());

LOG.info("Notifying S3 of completed upload...");
CompletedMultipartUpload completedUpload = CompletedMultipartUpload.builder()
.parts(completedParts)
.build();

CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(site.bucket())
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
.uploadId(uploadId)
.multipartUpload(completedUpload)
.build();

try {
RequestBody body = RequestBody.fromFile(p.toFile());
PutObjectRequest request = PutObjectRequest.builder()
.bucket(site.bucket())
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
.ssekmsKeyId(site.kmsKeyID())
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
.build();
return s3ClientBuilder.buildClientForSite(site.siteName())
.map(client -> client.putObject(request, body))
.isPresent();
s3.completeMultipartUpload(completeRequest);
} catch (AwsServiceException | SdkClientException e) {
LOG.info("Error uploading file from {} to bucket {}", p, site.bucket(), e);
LOG.error("Error finishing multipart: ", e);
return false;
}
LOG.info("Done uploading {} to {}", p.getFileName(), site.siteName());
return true;
}

private List<CompletedPart> uploadAllParts(Path p, SiteAWSInfo site, String dir, String uploadId, S3Client s3) {
List<CompletedPart> completedParts = new ArrayList<>();
int part = 1;
ByteBuffer buffer = ByteBuffer.allocate(SIXTEEN_MB);

try (RandomAccessFile file = new RandomAccessFile(p.toString(), "r")) {
long fileSize = file.length();
long position = 0;

while (position < fileSize) {
file.seek(position);
int bytesRead = file.getChannel().read(buffer);

LOG.info("Uploading file {} part {}", p.getFileName(), part);
buffer.flip();
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(site.bucket())
.key(Path.of(dir, home + "_" + p.getFileName().toString()).toString())
.uploadId(uploadId)
.partNumber(part)
.contentLength((long) bytesRead)
.build();


UploadPartResponse response = s3.uploadPart(uploadPartRequest, RequestBody.fromByteBuffer(buffer));

completedParts.add(CompletedPart.builder()
.partNumber(part)
.eTag(response.eTag())
.build());

buffer.clear();
position += bytesRead;
part++;
}
} catch (IOException | AwsServiceException | SdkClientException e) {
LOG.error("Failed to upload file {}, part {}: ", p.getFileName(), part, e);
return List.of();
}
LOG.info("Uploaded all parts, finishing");
return completedParts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.*;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -102,15 +101,101 @@ void shouldNotUploadDataIfAWSUpset(@TempDir Path tempDir) throws IOException, In
Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)))
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
.thenThrow(AwsServiceException.builder().build());

subject.uploadData(q, DataType.Phenotypic, "bch");

Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class));
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Error);
Mockito.verify(statusService, Mockito.times(1)).
setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1)).
setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1))
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
Mockito.verify(statusService, Mockito.times(1)).
setPhenotypicStatus(q, UploadStatus.Error);
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
Mockito.verify(uploadLock, Mockito.times(1)).release();
}

@Test
void shouldNotUploadDataIfAWSUploadFails(@TempDir Path tempDir) throws IOException, InterruptedException {
Query q = new Query();
q.setPicSureId("my-id");
q.setId("my-id");

Files.createDirectory(Path.of(tempDir.toString(), q.getPicSureId()));
Files.writeString(Path.of(tempDir.toString(), q.getPicSureId(), DataType.Phenotypic.fileName), ":)");
ReflectionTestUtils.setField(subject, "roleARNs", roleARNs);
CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class);
Mockito.when(createResp.uploadId()).thenReturn("frank");

Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
.thenReturn(createResp);
Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)))
.thenThrow(AwsServiceException.builder().build());

subject.uploadData(q, DataType.Phenotypic, "bch");

Mockito.verify(statusService, Mockito.times(1)).
setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1)).
setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1))
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
Mockito.verify(s3Client, Mockito.times(1))
.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class));
Mockito.verify(statusService, Mockito.times(1)).
setPhenotypicStatus(q, UploadStatus.Error);
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
Mockito.verify(uploadLock, Mockito.times(1)).release();
}

@Test
void shouldNotUploadDataWhenCompleteFails(@TempDir Path tempDir) throws IOException, InterruptedException {
Query q = new Query();
q.setPicSureId("my-id");
q.setId("my-id");

Path fileToUpload = Path.of(tempDir.toString(), q.getPicSureId(), DataType.Phenotypic.fileName);
Files.createDirectory(Path.of(tempDir.toString(), q.getPicSureId()));
Files.writeString(fileToUpload, ":)");
ReflectionTestUtils.setField(subject, "roleARNs", roleARNs);

CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class);
Mockito.when(createResp.uploadId()).thenReturn("frank");
UploadPartResponse uploadResp = Mockito.mock(UploadPartResponse.class);
Mockito.when(uploadResp.eTag()).thenReturn("gus");

Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)))
.thenReturn(uploadResp);
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
.thenReturn(createResp);
Mockito.when(s3Client.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class)))
.thenThrow(AwsServiceException.builder().build());


subject.uploadData(q, DataType.Phenotypic, "bch");

Mockito.verify(statusService, Mockito.times(1))
.setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1))
.setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1))
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
Mockito.verify(s3Client, Mockito.times(1))
.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class));
Mockito.verify(s3Client, Mockito.times(1))
.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class));
Mockito.verify(statusService, Mockito.times(1))
.setPhenotypicStatus(q, UploadStatus.Error);
Assertions.assertFalse(Files.exists(fileToUpload));
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
Mockito.verify(uploadLock, Mockito.times(1)).release();
}
Expand All @@ -126,18 +211,34 @@ void shouldUploadData(@TempDir Path tempDir) throws IOException, InterruptedExce
Files.writeString(fileToUpload, ":)");
ReflectionTestUtils.setField(subject, "roleARNs", roleARNs);

CreateMultipartUploadResponse createResp = Mockito.mock(CreateMultipartUploadResponse.class);
Mockito.when(createResp.uploadId()).thenReturn("frank");
UploadPartResponse uploadResp = Mockito.mock(UploadPartResponse.class);
Mockito.when(uploadResp.eTag()).thenReturn("gus");

Mockito.when(sharingRoot.toString()).thenReturn(tempDir.toString());
Mockito.when(hpds.writePhenotypicData(q)).thenReturn(true);
Mockito.when(s3.buildClientForSite("bch")).thenReturn(Optional.of(s3Client));
Mockito.when(s3Client.putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class)))
.thenReturn(Mockito.mock(PutObjectResponse.class));
Mockito.when(s3Client.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class)))
.thenReturn(uploadResp);
Mockito.when(s3Client.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
.thenReturn(createResp);


subject.uploadData(q, DataType.Phenotypic, "bch");

Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1)).putObject(Mockito.any(PutObjectRequest.class), Mockito.any(RequestBody.class));
Mockito.verify(statusService, Mockito.times(1)).setPhenotypicStatus(q, UploadStatus.Uploaded);
Mockito.verify(statusService, Mockito.times(1))
.setPhenotypicStatus(q, UploadStatus.Querying);
Mockito.verify(statusService, Mockito.times(1))
.setPhenotypicStatus(q, UploadStatus.Uploading);
Mockito.verify(s3Client, Mockito.times(1))
.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class));
Mockito.verify(s3Client, Mockito.times(1))
.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class));
Mockito.verify(s3Client, Mockito.times(1))
.uploadPart(Mockito.any(UploadPartRequest.class), Mockito.any(RequestBody.class));
Mockito.verify(statusService, Mockito.times(1))
.setPhenotypicStatus(q, UploadStatus.Uploaded);
Assertions.assertFalse(Files.exists(fileToUpload));
Mockito.verify(uploadLock, Mockito.times(1)).acquire();
Mockito.verify(uploadLock, Mockito.times(1)).release();
Expand Down

0 comments on commit dd50a7c

Please sign in to comment.