diff --git a/pom.xml b/pom.xml
index 4577ac91..92615759 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@
3.3.0
1.6
9.0.2
- 8.1.2
+ 9.0.2
3.7.0
3.1.9
2.13.0
diff --git a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java
index 31ce1f4f..6eb16046 100644
--- a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java
+++ b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java
@@ -1,9 +1,11 @@
package org.springframework.data.aerospike.transaction.reactive;
-import org.springframework.data.aerospike.transaction.sync.AerospikeTransactionResourceHolder;
+import com.aerospike.client.AbortStatus;
+import com.aerospike.client.CommitStatus;
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.SmartTransactionObject;
import org.springframework.util.Assert;
+import reactor.core.publisher.Mono;
/**
* A {@link SmartTransactionObject} implementation that has reactive transaction resource holder
@@ -39,26 +41,29 @@ void setResourceHolder(@Nullable AerospikeReactiveTransactionResourceHolder reso
this.resourceHolder = resourceHolder;
}
- private void failIfNoTransaction() {
- if (!hasResourceHolder()) {
- throw new IllegalStateException("Error: expecting transaction to exist");
- }
+ private Mono getResourceHolder() {
+ return Mono.fromCallable(() -> {
+ if (!hasResourceHolder()) {
+ throw new IllegalStateException("Error: expecting transaction to exist");
+ }
+ return resourceHolder;
+ });
}
/**
* Commit the transaction
*/
- public void commitTransaction() {
- failIfNoTransaction();
- resourceHolder.getClient().getAerospikeClient().commit(resourceHolder.getTransaction());
+ public Mono commitTransaction() {
+ return getResourceHolder()
+ .flatMap(h -> h.getClient().commit(h.getTransaction()));
}
/**
* Rollback (abort) the transaction
*/
- public void abortTransaction() {
- failIfNoTransaction();
- resourceHolder.getClient().getAerospikeClient().abort(resourceHolder.getTransaction());
+ public Mono abortTransaction() {
+ return getResourceHolder()
+ .flatMap(h -> h.getClient().abort(h.getTransaction()));
}
@Override
diff --git a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java
index c8db4f52..f1edef76 100644
--- a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java
+++ b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java
@@ -25,7 +25,6 @@ public class AerospikeReactiveTransactionManager extends AbstractReactiveTransac
/**
* Create a new instance of {@link AerospikeReactiveTransactionManager}
*/
-
public AerospikeReactiveTransactionManager(IAerospikeReactorClient client) {
this.client = client;
}
@@ -73,7 +72,12 @@ protected Mono doBegin(TransactionSynchronizationManager synchronizationMa
rHolder.setSynchronizedWithTransaction(true);
synchronizationManager.bindResource(client, rHolder);
})
- .onErrorMap(e -> new TransactionSystemException("Could not bind transaction resource", e))
+ .onErrorMap(e -> {
+ if (e instanceof TransactionSystemException) {
+ return e;
+ }
+ return new TransactionSystemException("Could not bind transaction resource", e);
+ })
.then();
});
}
@@ -89,10 +93,8 @@ private Mono createResourceHolder(IA
@Override
protected Mono doCommit(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) {
- return Mono.fromRunnable(() -> {
- AerospikeReactiveTransaction transaction = getTransaction(status);
- transaction.commitTransaction();
- })
+ return Mono.fromSupplier(() -> getTransaction(status))
+ .flatMap(AerospikeReactiveTransaction::commitTransaction)
.onErrorMap(e -> new TransactionSystemException("Could not commit transaction", e))
.then();
}
@@ -100,17 +102,15 @@ protected Mono doCommit(TransactionSynchronizationManager synchronizationM
@Override
protected Mono doRollback(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) {
- return Mono.fromRunnable(() -> {
- AerospikeReactiveTransaction transaction = getTransaction(status);
- transaction.abortTransaction();
- })
+ return Mono.fromSupplier(() -> getTransaction(status))
+ .flatMap(AerospikeReactiveTransaction::abortTransaction)
.onErrorMap(e -> new TransactionSystemException("Could not abort transaction", e))
.then();
}
@Override
- protected Mono