Skip to content

Commit

Permalink
Retrying on a new BulkWriter when commit task gets stuck (Azure#41553)
Browse files Browse the repository at this point in the history
* Retrying on a new BulkWriter when commit task gets stuck

* Fixing build errors

* Update CosmosBulkWriter.java

* Update BulkWriter.scala

* Update SparkE2EBulkWriteITest.scala

* Adding change log

* Update cosmos-emulator-matrix.json

* Fixed scheduleWrite path

* Implement E2E timeout for batch

* Update CosmosConfig.scala

* Including BulkWriter retries on close path

* Update CosmosWriterBase.scala

* Update BulkWriter.scala

* Update BulkWriter.scala

* Update BulkWriterNoProgressException.scala

* Update BulkWriter.scala

* Update BulkWriter.scala
  • Loading branch information
FabianMeiswinkel authored Aug 23, 2024
1 parent 30e6467 commit fc2be76
Show file tree
Hide file tree
Showing 45 changed files with 1,061 additions and 439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.cosmos.encryption.implementation.EncryptionUtils;
import com.azure.cosmos.encryption.implementation.mdesrc.cryptography.MicrosoftDataEncryptionException;
import com.azure.cosmos.encryption.models.SqlQuerySpecWithEncryption;
import com.azure.cosmos.implementation.CosmosBulkExecutionOptionsImpl;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
Expand Down Expand Up @@ -1683,8 +1684,9 @@ private void setRequestHeaders(CosmosBatchRequestOptions requestOptions) {
}

private void setRequestHeaders(CosmosBulkExecutionOptions requestOptions) {
cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
CosmosBulkExecutionOptionsImpl requestOptionsImpl = cosmosBulkExecutionOptionsAccessor.getImpl(requestOptions);
requestOptionsImpl.setHeader(Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
requestOptionsImpl.setHeader(Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
}

boolean isIncorrectContainerRid(CosmosException cosmosException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ private CosmosBulkExecutionOptions getBulkExecutionOperations() {
ImplementationBridgeHelpers
.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.setMaxConcurrentCosmosPartitions(bulkExecutionOptions, this.writeConfig.getBulkMaxConcurrentCosmosPartitions());
.getImpl(bulkExecutionOptions)
.setMaxConcurrentCosmosPartitions(this.writeConfig.getBulkMaxConcurrentCosmosPartitions());
}

CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig);
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed an issue to avoid transient `IllegalArgumentException` due to duplicate json properties for the `uniqueKeyPolicy` property. - See [PR 41608](https://github.com/Azure/azure-sdk-for-java/pull/41608)

#### Other Changes
* Added retries on a new `BulkWriter` instance when first attempt to commit times out for bulk write jobs. - See [PR 41553](https://github.com/Azure/azure-sdk-for-java/pull/41553)

### 4.33.0 (2024-06-22)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestFaultInjectionClientInterceptor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestWriteOnRetryCommitInterceptor
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed an issue to avoid transient `IllegalArgumentException` due to duplicate json properties for the `uniqueKeyPolicy` property. - See [PR 41608](https://github.com/Azure/azure-sdk-for-java/pull/41608)

#### Other Changes
* Added retries on a new `BulkWriter` instance when first attempt to commit times out for bulk write jobs. - See [PR 41553](https://github.com/Azure/azure-sdk-for-java/pull/41553)

### 4.33.0 (2024-06-22)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestFaultInjectionClientInterceptor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestWriteOnRetryCommitInterceptor
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed an issue to avoid transient `IllegalArgumentException` due to duplicate json properties for the `uniqueKeyPolicy` property. - See [PR 41608](https://github.com/Azure/azure-sdk-for-java/pull/41608)

#### Other Changes
* Added retries on a new `BulkWriter` instance when first attempt to commit times out for bulk write jobs. - See [PR 41553](https://github.com/Azure/azure-sdk-for-java/pull/41553)

### 4.33.0 (2024-06-22)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestFaultInjectionClientInterceptor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestWriteOnRetryCommitInterceptor
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed an issue to avoid transient `IllegalArgumentException` due to duplicate json properties for the `uniqueKeyPolicy` property. - See [PR 41608](https://github.com/Azure/azure-sdk-for-java/pull/41608)

#### Other Changes
* Added retries on a new `BulkWriter` instance when first attempt to commit times out for bulk write jobs. - See [PR 41553](https://github.com/Azure/azure-sdk-for-java/pull/41553)

### 4.33.0 (2024-06-22)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestFaultInjectionClientInterceptor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestWriteOnRetryCommitInterceptor
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed an issue to avoid transient `IllegalArgumentException` due to duplicate json properties for the `uniqueKeyPolicy` property. - See [PR 41608](https://github.com/Azure/azure-sdk-for-java/pull/41608)

#### Other Changes
* Added retries on a new `BulkWriter` instance when first attempt to commit times out for bulk write jobs. - See [PR 41553](https://github.com/Azure/azure-sdk-for-java/pull/41553)

### 4.33.0 (2024-06-22)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestFaultInjectionClientInterceptor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.TestWriteOnRetryCommitInterceptor
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private trait AsyncItemWriter {
* Don't wait for any remaining work but signal to the writer the ungraceful close
* Should not throw any exceptions
*/
def abort(): Unit
def abort(shouldThrow: Boolean): Unit

private[spark] def getETag(objectNode: ObjectNode) = {
val eTagField = objectNode.get(CosmosConstants.Properties.ETag)
Expand Down
Loading

0 comments on commit fc2be76

Please sign in to comment.