Skip to content

Commit

Permalink
Update logic for extractByteBuffer to handle proper byte extraction f…
Browse files Browse the repository at this point in the history
…or upload operations (Azure#40451)
  • Loading branch information
ibrahimrabab authored Jun 4, 2024
1 parent 376d523 commit 745273f
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -361,6 +362,66 @@ public void uploadStreamAccessTierCold() {
assertEquals(AccessTier.COLD, properties.getAccessTier());
}

@LiveOnly
@Test
public void uploadAndDownloadAndUploadAgain() {
byte[] randomData = getRandomByteArray(20 * Constants.MB);
ByteArrayInputStream input = new ByteArrayInputStream(randomData);

String blobName = generateBlobName();
BlobClient blobClient = cc.getBlobClient(blobName);

ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions()
.setBlockSizeLong((long) Constants.MB)
.setMaxSingleUploadSizeLong(2L * Constants.MB)
.setMaxConcurrency(5);
BlobParallelUploadOptions parallelUploadOptions = new BlobParallelUploadOptions(input)
.setParallelTransferOptions(parallelTransferOptions);

blobClient.uploadWithResponse(parallelUploadOptions, null, null);

InputStream inputStream = blobClient.openInputStream();

// Upload the downloaded content to a different location
String blobName2 = generateBlobName();

parallelUploadOptions = new BlobParallelUploadOptions(inputStream)
.setParallelTransferOptions(parallelTransferOptions);

BlobClient blobClient2 = cc.getBlobClient(blobName2);
blobClient2.uploadWithResponse(parallelUploadOptions, null, null);
}

@LiveOnly
@Test
public void uploadAndDownloadAndUploadAgainWithSize() {
byte[] randomData = getRandomByteArray(20 * Constants.MB);
ByteArrayInputStream input = new ByteArrayInputStream(randomData);

String blobName = generateBlobName();
BlobClient blobClient = cc.getBlobClient(blobName);

ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions()
.setBlockSizeLong((long) Constants.MB)
.setMaxSingleUploadSizeLong(2L * Constants.MB)
.setMaxConcurrency(5);
BlobParallelUploadOptions parallelUploadOptions = new BlobParallelUploadOptions(input)
.setParallelTransferOptions(parallelTransferOptions);

blobClient.uploadWithResponse(parallelUploadOptions, null, null);

InputStream inputStream = blobClient.openInputStream();

// Upload the downloaded content to a different location
String blobName2 = generateBlobName();

parallelUploadOptions = new BlobParallelUploadOptions(inputStream, 20 * Constants.MB)
.setParallelTransferOptions(parallelTransferOptions);

BlobClient blobClient2 = cc.getBlobClient(blobName2);
blobClient2.uploadWithResponse(parallelUploadOptions, null, null);
}

@RequiredServiceVersion(clazz = BlobServiceVersion.class, min = "2019-12-12")
@Test
public void downloadAllNull() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.storage.common.Utility;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -210,7 +211,7 @@ public static Flux<ByteBuffer> extractByteBuffer(Flux<ByteBuffer> data, Long opt
if (data == null && optionalLength == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH, blockSize);
data = FluxUtil.toFluxByteBuffer(dataStream, chunkSize);
data = FluxUtil.toFluxByteBuffer(dataStream, chunkSize).subscribeOn(Schedulers.boundedElastic());
// specified length (legacy requirement): use custom converter. no marking because we buffer anyway.
} else if (data == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2584,6 +2584,36 @@ private void uploadSmallJson(int numCopies) {
fc.flush(b.length(), true);
}

@LiveOnly
@Test
public void uploadAndDownloadAndUploadAgain() {
byte[] randomData = getRandomByteArray(20 * Constants.MB);
ByteArrayInputStream input = new ByteArrayInputStream(randomData);

String pathName = generatePathName();
DataLakeFileClient fileClient = dataLakeFileSystemClient.getFileClient(pathName);
fileClient.createIfNotExists();

ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions()
.setBlockSizeLong((long) Constants.MB)
.setMaxSingleUploadSizeLong(2L * Constants.MB)
.setMaxConcurrency(5);
FileParallelUploadOptions parallelUploadOptions = new FileParallelUploadOptions(input)
.setParallelTransferOptions(parallelTransferOptions);

fileClient.uploadWithResponse(parallelUploadOptions, null, null);

DataLakeFileOpenInputStreamResult inputStreamResult = fileClient.openInputStream();

// Upload the downloaded content to a different location
String pathName2 = generatePathName();

parallelUploadOptions = new FileParallelUploadOptions(inputStreamResult.getInputStream())
.setParallelTransferOptions(parallelTransferOptions);

DataLakeFileClient fileClient2 = dataLakeFileSystemClient.getFileClient(pathName2);
fileClient2.uploadWithResponse(parallelUploadOptions, null, null);
}

private static byte[] readFromInputStream(InputStream stream, int numBytesToRead) {
byte[] queryData = new byte[numBytesToRead];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,38 @@ public void uploadRangeAndDownloadDataWithArgs() {
assertArrayEquals(DATA.getDefaultBytes(), stream.toByteArray());
}

@LiveOnly
@Test
public void uploadAndDownloadAndUploadAgain() {
byte[] randomData = getRandomByteArray(20 * Constants.MB);
ByteArrayInputStream input = new ByteArrayInputStream(randomData);

String pathName = generatePathName();
ShareFileClient fileClient = shareClient.getFileClient(pathName);
fileClient.create(20 * Constants.MB);

ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions()
.setBlockSizeLong((long) Constants.MB)
.setMaxSingleUploadSizeLong(2L * Constants.MB)
.setMaxConcurrency(5);
ShareFileUploadOptions parallelUploadOptions = new ShareFileUploadOptions(input)
.setParallelTransferOptions(parallelTransferOptions);

fileClient.uploadWithResponse(parallelUploadOptions, null, null);

StorageFileInputStream inputStreamResult = fileClient.openInputStream();

// Upload the downloaded content to a different location
String pathName2 = generatePathName();

parallelUploadOptions = new ShareFileUploadOptions(inputStreamResult)
.setParallelTransferOptions(parallelTransferOptions);

ShareFileClient fileClient2 = shareClient.getFileClient(pathName2);
fileClient2.create(20 * Constants.MB);
fileClient2.uploadWithResponse(parallelUploadOptions, null, null);
}

@Test
public void downloadAllNull() {
primaryFileClient.create(DATA.getDefaultDataSizeLong());
Expand Down

0 comments on commit 745273f

Please sign in to comment.