Skip to content

Commit

Permalink
add support for returning stream in reactive flow, add reactive Custo…
Browse files Browse the repository at this point in the history
…mer negative tests repository, code format
  • Loading branch information
agrgr committed Dec 1, 2024
1 parent 3a39a66 commit 82f6d75
Show file tree
Hide file tree
Showing 19 changed files with 164 additions and 154 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.springframework.data.aerospike.query.QueryEngine;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.transactions.reactive.AerospikeReactiveTransactionResourceHolder;
import org.springframework.data.aerospike.transactions.sync.AerospikeTransactionResourceHolder;
import org.springframework.data.aerospike.transaction.reactive.AerospikeReactiveTransactionResourceHolder;
import org.springframework.data.aerospike.transaction.sync.AerospikeTransactionResourceHolder;
import org.springframework.data.mapping.PropertyHandler;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.transaction.NoTransactionException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package org.springframework.data.aerospike.transactions.reactive;
package org.springframework.data.aerospike.transaction.reactive;

import org.springframework.data.aerospike.transactions.sync.AerospikeTransactionResourceHolder;
import org.springframework.data.aerospike.transaction.sync.AerospikeTransactionResourceHolder;
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.SmartTransactionObject;
import org.springframework.util.Assert;

/**
* A {@link SmartTransactionObject} implementation that has reactive transaction resource holder
* and basic transaction API
*/
public class AerospikeReactiveTransaction implements SmartTransactionObject {

@Nullable
Expand All @@ -15,7 +19,7 @@ public class AerospikeReactiveTransaction implements SmartTransactionObject {
}

/**
* @return {@literal true} if {@link AerospikeTransactionResourceHolder} is set
* @return {@literal true} if {@link AerospikeReactiveTransactionResourceHolder} is set
*/
final boolean hasResourceHolder() {
return resourceHolder != null;
Expand All @@ -27,7 +31,7 @@ AerospikeReactiveTransactionResourceHolder getRequiredResourceHolder() {
}

/**
* Set corresponding {@link AerospikeTransactionResourceHolder}
* Set corresponding {@link AerospikeReactiveTransactionResourceHolder}
*
* @param resourceHolder can be {@literal null}.
*/
Expand All @@ -42,15 +46,15 @@ private void failIfNoTransaction() {
}

/**
* Commit the transaction.
* Commit the transaction
*/
public void commitTransaction() {
failIfNoTransaction();
resourceHolder.getClient().getAerospikeClient().commit(resourceHolder.getTransaction());
}

/**
* Rollback (abort) the transaction.
* Rollback (abort) the transaction
*/
public void abortTransaction() {
failIfNoTransaction();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.springframework.data.aerospike.transactions.reactive;
package org.springframework.data.aerospike.transaction.reactive;

import com.aerospike.client.reactor.IAerospikeReactorClient;
import lombok.Getter;
Expand All @@ -12,7 +12,7 @@
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

import static org.springframework.data.aerospike.transactions.reactive.AerospikeReactiveTransactionResourceHolder.determineTimeout;
import static org.springframework.data.aerospike.transaction.reactive.AerospikeReactiveTransactionResourceHolder.determineTimeout;

/**
* A {@link org.springframework.transaction.ReactiveTransactionManager} implementation for managing transactions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.springframework.data.aerospike.transactions.reactive;
package org.springframework.data.aerospike.transaction.reactive;

import com.aerospike.client.Txn;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import lombok.Getter;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.ResourceHolderSupport;

/**
* Aerospike reactive transaction resource holder for managing transaction resources,
* extends {@link ResourceHolderSupport}
*/
@Getter
public class AerospikeReactiveTransactionResourceHolder extends ResourceHolderSupport {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import org.springframework.lang.Nullable;
import org.springframework.transaction.support.SmartTransactionObject;
import org.springframework.transaction.support.TransactionSynchronizationUtils;
import org.springframework.util.Assert;

/**
* A {@link SmartTransactionObject} implementation that has transaction resource holder and basic transaction API
*/
public class AerospikeTransaction implements SmartTransactionObject {

@Nullable
Expand Down Expand Up @@ -42,15 +45,15 @@ private void failIfNoTransaction() {
}

/**
* Commit the transaction.
* Commit the transaction
*/
public void commitTransaction() {
failIfNoTransaction();
resourceHolder.getClient().commit(resourceHolder.getTransaction());
}

/**
* Rollback (abort) the transaction.
* Rollback (abort) the transaction
*/
public void abortTransaction() {
failIfNoTransaction();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import com.aerospike.client.IAerospikeClient;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Txn;
import lombok.Getter;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.ResourceHolderSupport;

/**
* Aerospike transaction resource holder for managing transaction resources, extends {@link ResourceHolderSupport}
*/
@Getter
public class AerospikeTransactionResourceHolder extends ResourceHolderSupport {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.springframework.data.aerospike.sample.CustomerRepository;
import org.springframework.data.aerospike.sample.SampleClasses;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.aerospike.transactions.sync.AerospikeTransactionManager;
import org.springframework.data.aerospike.transaction.sync.AerospikeTransactionManager;
import org.springframework.data.aerospike.util.AdditionalAerospikeTestOperations;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.support.TransactionTemplate;
Expand Down Expand Up @@ -51,10 +51,6 @@ protected ClientPolicy getClientPolicy() {
clientPolicy.batchPolicyDefault.totalTimeout = totalTimeout;
clientPolicy.infoPolicyDefault.timeout = totalTimeout;
clientPolicy.readPolicyDefault.maxRetries = 3;
// Durable delete is supported by Enterprise edition
// clientPolicy.writePolicyDefault.durableDelete = true;
// clientPolicy.batchWritePolicyDefault.durableDelete = true;
// clientPolicy.batchDeletePolicyDefault.durableDelete = true;
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.springframework.data.aerospike.sample.ReactiveCustomerRepository;
import org.springframework.data.aerospike.sample.SampleClasses;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.aerospike.transactions.reactive.AerospikeReactiveTransactionManager;
import org.springframework.data.aerospike.transaction.reactive.AerospikeReactiveTransactionManager;
import org.springframework.data.aerospike.util.AdditionalAerospikeTestOperations;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
Expand Down Expand Up @@ -61,10 +61,6 @@ protected ClientPolicy getClientPolicy() {
clientPolicy.batchPolicyDefault.totalTimeout = totalTimeout;
clientPolicy.infoPolicyDefault.timeout = totalTimeout;
clientPolicy.readPolicyDefault.maxRetries = 3;
// Durable delete is supported by Enterprise edition
// clientPolicy.writePolicyDefault.durableDelete = true;
// clientPolicy.batchWritePolicyDefault.durableDelete = true;
// clientPolicy.batchDeletePolicyDefault.durableDelete = true;
return clientPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.aerospike.transactions.reactive;
package org.springframework.data.aerospike.transaction.reactive;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -31,23 +31,19 @@
import org.springframework.data.aerospike.sample.SampleClasses;
import org.springframework.data.aerospike.sample.SampleClasses.DocumentWithPrimitiveIntId;
import org.springframework.data.aerospike.util.AsyncUtils;
import org.springframework.data.aerospike.util.AwaitilityUtils;
import org.springframework.data.aerospike.util.TestUtils;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.InstanceOfAssertFactories.throwable;
import static org.mockito.Mockito.mock;

@Slf4j
Expand Down Expand Up @@ -89,8 +85,7 @@ void verifyTransactionResourcesReleased() {

@AfterAll
public void afterAll() {
deleteAll(Person.class, DocumentWithPrimitiveIntId.class,
SampleClasses.DocumentWithIntegerId.class);
deleteAll(Person.class, DocumentWithPrimitiveIntId.class, SampleClasses.DocumentWithIntegerId.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.aerospike.transactions.reactive;
package org.springframework.data.aerospike.transaction.reactive;

import com.aerospike.client.policy.WritePolicy;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.springframework.data.aerospike.transactions.reactive;
package org.springframework.data.aerospike.transaction.reactive;

import com.aerospike.client.Txn;
import com.aerospike.client.reactor.IAerospikeReactorClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import com.aerospike.client.AerospikeException;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import com.aerospike.client.Txn;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -41,7 +41,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.springframework.data.aerospike.transactions.sync.AerospikeTransactionTestUtils.getTransaction;
import static org.springframework.data.aerospike.transaction.sync.AerospikeTransactionTestUtils.getTransaction;
import static org.springframework.transaction.TransactionDefinition.PROPAGATION_MANDATORY;
import static org.springframework.transaction.TransactionDefinition.PROPAGATION_NESTED;
import static org.springframework.transaction.TransactionDefinition.PROPAGATION_NEVER;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Txn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Txn;
Expand All @@ -38,9 +38,9 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.springframework.data.aerospike.transactions.sync.AerospikeTransactionTestUtils.callGetTransaction;
import static org.springframework.data.aerospike.transactions.sync.AerospikeTransactionTestUtils.getTransaction;
import static org.springframework.data.aerospike.transactions.sync.AerospikeTransactionTestUtils.getTransaction2;
import static org.springframework.data.aerospike.transaction.sync.AerospikeTransactionTestUtils.callGetTransaction;
import static org.springframework.data.aerospike.transaction.sync.AerospikeTransactionTestUtils.getTransaction;
import static org.springframework.data.aerospike.transaction.sync.AerospikeTransactionTestUtils.getTransaction2;

@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.springframework.data.aerospike.transactions.sync;
package org.springframework.data.aerospike.transaction.sync;

import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
Expand Down

0 comments on commit 82f6d75

Please sign in to comment.