diff --git a/pom.xml b/pom.xml index 6020e381a..0e7c54e3b 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,7 @@ 2.10.5 1.33 4.0.1 + 1.0.2.RELEASE @@ -247,6 +248,14 @@ test + + + io.projectreactor.tools + blockhound + ${blockhound.version} + test + + diff --git a/src/test/java/org/springframework/data/aerospike/BaseReactiveIntegrationTests.java b/src/test/java/org/springframework/data/aerospike/BaseReactiveIntegrationTests.java index fff4e59ea..48190eec9 100644 --- a/src/test/java/org/springframework/data/aerospike/BaseReactiveIntegrationTests.java +++ b/src/test/java/org/springframework/data/aerospike/BaseReactiveIntegrationTests.java @@ -1,10 +1,16 @@ package org.springframework.data.aerospike; +import com.aerospike.client.lua.LuaAerospikeLib; import com.aerospike.client.reactor.AerospikeReactorClient; +import org.junit.BeforeClass; +import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; +import reactor.blockhound.BlockHound; +import reactor.core.publisher.Mono; import java.io.Serializable; +import java.time.Duration; public abstract class BaseReactiveIntegrationTests extends BaseIntegrationTests { @@ -17,4 +23,23 @@ protected T findById(Serializable id, Class type) { return reactiveTemplate.findById(id, type).block(); } + @BeforeClass + public static void installBlockHound() { + BlockHound.install(); + } + + @Test(expected = RuntimeException.class) + public void shouldFailAsBlocking(){ + Mono.delay(Duration.ofSeconds(1)) + .doOnNext(it -> { + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .block(); + } + } \ No newline at end of file diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateCountRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateCountRelatedTests.java index c43bf8c0c..1fe992bf1 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateCountRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateCountRelatedTests.java @@ -8,6 +8,7 @@ import org.springframework.data.aerospike.repository.query.Query; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.repository.query.parser.Part; +import reactor.core.scheduler.Schedulers; import static org.assertj.core.api.Assertions.assertThat; @@ -32,13 +33,19 @@ public void count_shouldFindAllItemsByGivenCriteria() { reactiveTemplate.insert(new Person(nextId(), "vasili", 52)).block(); reactiveTemplate.insert(new Person(nextId(), "petya", 52)).block(); - Long vasyaCount = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName")), Person.class).block(); + Long vasyaCount = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName")), Person.class) + .subscribeOn(Schedulers.parallel()) + .block(); assertThat(vasyaCount).isEqualTo(3); - Long vasya51Count = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName").and("age").is(51, "age")), Person.class).block(); + Long vasya51Count = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName").and("age").is(51, "age")), Person.class) + .subscribeOn(Schedulers.parallel()) + .block(); assertThat(vasya51Count).isEqualTo(1); - Long petyaCount = reactiveTemplate.count(new Query(new Criteria().is("petya", "firstName")), Person.class).block(); + Long petyaCount = reactiveTemplate.count(new Query(new Criteria().is("petya", "firstName")), Person.class) + .subscribeOn(Schedulers.parallel()) + .block(); assertThat(petyaCount).isEqualTo(1); } @@ -52,12 +59,16 @@ public void count_shouldFindAllItemsByGivenCriteriaAndRespectsIgnoreCase() { assertThat(reactiveTemplate.count(query1, Person.class).block()).isEqualTo(3); Query query2 = new Query(new Criteria().startingWith("VaS", "firstName", Part.IgnoreCaseType.NEVER)); - assertThat(reactiveTemplate.count(query2, Person.class).block()).isEqualTo(1); + assertThat(reactiveTemplate.count(query2, Person.class) + .subscribeOn(Schedulers.parallel()) + .block()).isEqualTo(1); } @Test public void count_shouldReturnZeroIfNoDocumentsByProvidedCriteriaIsFound() { - Long count = reactiveTemplate.count(new Query(new Criteria().is("nastyushka", "firstName")), Person.class).block(); + Long count = reactiveTemplate.count(new Query(new Criteria().is("nastyushka", "firstName")), Person.class) + .subscribeOn(Schedulers.parallel()) + .block(); assertThat(count).isZero(); } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java index c25784882..661188e3c 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateDeleteRelatedTests.java @@ -1,13 +1,13 @@ package org.springframework.data.aerospike.core.reactive; -import com.aerospike.client.AerospikeClient; import com.aerospike.client.policy.GenerationPolicy; +import com.aerospike.client.policy.WritePolicy; import org.junit.Test; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.aerospike.BaseReactiveIntegrationTests; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.sample.Person; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import static org.springframework.data.aerospike.SampleClasses.VersionedClass; @@ -19,39 +19,37 @@ */ public class ReactiveAerospikeTemplateDeleteRelatedTests extends BaseReactiveIntegrationTests { - //TODO: remove me as soon as reactorClient has writePolicyDefault - @Autowired - AerospikeClient client; - @Test public void deleteByObject_ignoresDocumentVersionEvenIfDefaultGenerationPolicyIsSet() { - GenerationPolicy initialGenerationPolicy = client.writePolicyDefault.generationPolicy; - client.writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; + WritePolicy writePolicyDefault = reactorClient.getWritePolicyDefault(); + GenerationPolicy initialGenerationPolicy = writePolicyDefault.generationPolicy; + writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; try { VersionedClass initialDocument = new VersionedClass(id, "a"); reactiveTemplate.insert(initialDocument).block(); reactiveTemplate.update(new VersionedClass(id, "b", initialDocument.version)).block(); - Mono deleted = reactiveTemplate.delete(initialDocument); + Mono deleted = reactiveTemplate.delete(initialDocument).subscribeOn(Schedulers.parallel()); StepVerifier.create(deleted).expectNext(true).verifyComplete(); } finally { - client.writePolicyDefault.generationPolicy = initialGenerationPolicy; + writePolicyDefault.generationPolicy = initialGenerationPolicy; } } @Test public void deleteByObject_ignoresVersionEvenIfDefaultGenerationPolicyIsSet() { - GenerationPolicy initialGenerationPolicy = client.writePolicyDefault.generationPolicy; - client.writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; + WritePolicy writePolicyDefault = reactorClient.getWritePolicyDefault(); + GenerationPolicy initialGenerationPolicy = writePolicyDefault.generationPolicy; + writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; try { Person initialDocument = new Person(id, "a"); reactiveTemplate.insert(initialDocument).block(); reactiveTemplate.update(new Person(id, "b")).block(); - Mono deleted = reactiveTemplate.delete(initialDocument); + Mono deleted = reactiveTemplate.delete(initialDocument).subscribeOn(Schedulers.parallel()); StepVerifier.create(deleted).expectNext(true).verifyComplete(); } finally { - client.writePolicyDefault.generationPolicy = initialGenerationPolicy; + writePolicyDefault.generationPolicy = initialGenerationPolicy; } } @@ -60,15 +58,15 @@ public void testSimpleDeleteById() { // given Person person = new Person(id, "QLastName", 21); - Mono created = reactiveTemplate.insert(person); + Mono created = reactiveTemplate.insert(person).subscribeOn(Schedulers.parallel()); StepVerifier.create(created).expectNext(person).verifyComplete(); // when - Mono deleted = reactiveTemplate.delete(id, Person.class); + Mono deleted = reactiveTemplate.delete(id, Person.class).subscribeOn(Schedulers.parallel()); StepVerifier.create(deleted).expectNext(true).verifyComplete(); // then - Mono result = reactiveTemplate.findById(id, Person.class); + Mono result = reactiveTemplate.findById(id, Person.class).subscribeOn(Schedulers.parallel()); StepVerifier.create(result).expectComplete().verify(); ; } @@ -78,22 +76,22 @@ public void testSimpleDeleteByObject() { // given Person person = new Person(id, "QLastName", 21); - Mono created = reactiveTemplate.insert(person); + Mono created = reactiveTemplate.insert(person).subscribeOn(Schedulers.parallel()); StepVerifier.create(created).expectNext(person).verifyComplete(); // when - Mono deleted = reactiveTemplate.delete(person); + Mono deleted = reactiveTemplate.delete(person).subscribeOn(Schedulers.parallel()); StepVerifier.create(deleted).expectNext(true).verifyComplete(); // then - Mono result = reactiveTemplate.findById(id, Person.class); + Mono result = reactiveTemplate.findById(id, Person.class).subscribeOn(Schedulers.parallel()); StepVerifier.create(result).expectComplete().verify(); } @Test public void deleteById_shouldReturnFalseIfValueIsAbsent() { // when - Mono deleted = reactiveTemplate.delete(id, Person.class); + Mono deleted = reactiveTemplate.delete(id, Person.class).subscribeOn(Schedulers.parallel()); // then StepVerifier.create(deleted).expectComplete().verify(); @@ -105,7 +103,7 @@ public void deleteByObject_shouldReturnFalseIfValueIsAbsent() { Person person = Person.builder().id(id).firstName("tya").emailAddress("gmail.com").build(); // when - Mono deleted = reactiveTemplate.delete(person); + Mono deleted = reactiveTemplate.delete(person).subscribeOn(Schedulers.parallel()); // then StepVerifier.create(deleted).expectComplete().verify(); diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java index 6a68974ab..03c7f0e05 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java @@ -7,6 +7,7 @@ import org.springframework.data.aerospike.repository.query.Query; import org.springframework.data.aerospike.sample.Person; import org.springframework.data.domain.Sort; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.Collections; @@ -38,13 +39,16 @@ public void findAll_findsAllExistingDocuments() { .collect(Collectors.toList()); reactiveTemplate.insertAll(persons).blockLast(); - List result = reactiveTemplate.findAll(Person.class).collectList().block(); + List result = reactiveTemplate.findAll(Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(result).containsOnlyElementsOf(persons); } @Test public void findAll_findsNothing() { - StepVerifier.create(reactiveTemplate.findAll(Person.class)) + StepVerifier.create(reactiveTemplate.findAll(Person.class) + .subscribeOn(Schedulers.parallel())) .expectNextCount(0) .verifyComplete(); } @@ -55,7 +59,9 @@ public void findInRange_shouldFindLimitedNumberOfDocuments() { .mapToObj(id -> new Person(nextId(), "Firstname", "Lastname")).collect(Collectors.toList()); reactiveTemplate.insertAll(allUsers).blockLast(); - List actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class).collectList().block(); + List actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(actual) .hasSize(5) .containsAnyElementsOf(allUsers); @@ -67,7 +73,9 @@ public void findInRange_shouldFindLimitedNumberOfDocumentsAndSkip() { .mapToObj(id -> new Person(nextId(), "Firstname", "Lastname")).collect(Collectors.toList()); reactiveTemplate.insertAll(allUsers).blockLast(); - List actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class).collectList().block(); + List actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(actual) .hasSize(5) @@ -79,7 +87,9 @@ public void find_throwsExceptionForUnsortedQueryWithSpecifiedOffsetValue() { Query query = new Query((Sort) null); query.setOffset(1); - assertThatThrownBy(() -> reactiveTemplate.find(query, Person.class).collectList().block()) + assertThatThrownBy(() -> reactiveTemplate.find(query, Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Unsorted query must not have offset value. For retrieving paged results use sorted query."); } @@ -93,7 +103,9 @@ public void find_shouldWorkWithFilterEqual() { Query query = createQueryForMethodWithArgs("findPersonByFirstName", "Dave"); - List actual = reactiveTemplate.find(query, Person.class).collectList().block(); + List actual = reactiveTemplate.find(query, Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(actual) .hasSize(10) .containsExactlyInAnyOrderElementsOf(allUsers); @@ -109,7 +121,9 @@ public void find_shouldWorkWithFilterEqualOrderBy() { Query query = createQueryForMethodWithArgs("findByLastNameOrderByFirstNameAsc", "Matthews"); - List actual = reactiveTemplate.find(query, Person.class).collectList().block(); + List actual = reactiveTemplate.find(query, Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(actual) .hasSize(10) .containsExactlyElementsOf(allUsers); @@ -125,7 +139,9 @@ public void find_shouldWorkWithFilterEqualOrderByDesc() { Query query = createQueryForMethodWithArgs("findByLastNameOrderByFirstNameDesc", "Matthews"); - List actual = reactiveTemplate.find(query, Person.class).collectList().block(); + List actual = reactiveTemplate.find(query, Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(actual) .hasSize(10) .containsExactlyElementsOf(allUsers); @@ -140,7 +156,9 @@ public void find_shouldWorkWithFilterRange() { Query query = createQueryForMethodWithArgs("findCustomerByAgeBetween", 25, 30); - List actual = reactiveTemplate.find(query, Person.class).collectList().block(); + List actual = reactiveTemplate.find(query, Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(actual) .hasSize(6) diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindRelatedTests.java index fd716c78e..f61aa25a2 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindRelatedTests.java @@ -6,6 +6,7 @@ import org.springframework.data.aerospike.SampleClasses.DocumentWithTouchOnReadAndExpirationProperty; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.sample.Person; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.Arrays; @@ -27,7 +28,9 @@ public void findById_shouldReturnValueForExistingKey() { Person person = new Person(id, "Dave", "Matthews"); StepVerifier.create(reactiveTemplate.save(person)).expectNext(person).verifyComplete(); - StepVerifier.create(reactiveTemplate.findById(id, Person.class)).consumeNextWith(actual -> { + StepVerifier.create(reactiveTemplate.findById(id, Person.class) + .subscribeOn(Schedulers.parallel()) + ).consumeNextWith(actual -> { assertThat(actual.getFirstName()).isEqualTo(person.getFirstName()); assertThat(actual.getLastName()).isEqualTo(person.getLastName()); }).verifyComplete(); @@ -35,13 +38,16 @@ public void findById_shouldReturnValueForExistingKey() { @Test public void findById_shouldReturnNullForNonExistingKey() { - StepVerifier.create(reactiveTemplate.findById("dave-is-absent", Person.class)) + StepVerifier.create(reactiveTemplate.findById("dave-is-absent", Person.class) + .subscribeOn(Schedulers.parallel()) + ) .expectNextCount(0).verifyComplete(); } @Test public void findById_shouldReturnNullForNonExistingKeyIfTouchOnReadSetToTrue() { - StepVerifier.create(reactiveTemplate.findById("foo-is-absent", DocumentWithTouchOnRead.class)) + StepVerifier.create(reactiveTemplate.findById("foo-is-absent", DocumentWithTouchOnRead.class) + .subscribeOn(Schedulers.parallel())) .expectNextCount(0).verifyComplete(); } @@ -51,9 +57,9 @@ public void findById_shouldIncreaseVersionIfTouchOnReadSetToTrue() { DocumentWithTouchOnRead document = new DocumentWithTouchOnRead(id, 1); StepVerifier.create(reactiveTemplate.save(document)).expectNext(document).verifyComplete(); - StepVerifier.create(reactiveTemplate.findById(document.getId(), DocumentWithTouchOnRead.class)).consumeNextWith(actual -> { - assertThat(actual.getVersion()).isEqualTo(document.getVersion() + 1); - }).verifyComplete(); + StepVerifier.create(reactiveTemplate.findById(document.getId(), DocumentWithTouchOnRead.class) + .subscribeOn(Schedulers.parallel())) + .consumeNextWith(actual -> assertThat(actual.getVersion()).isEqualTo(document.getVersion() + 1)).verifyComplete(); } @Test @@ -61,14 +67,16 @@ public void findById_shouldFailOnTouchOnReadWithExpirationProperty() { DocumentWithTouchOnReadAndExpirationProperty document = new DocumentWithTouchOnReadAndExpirationProperty(id, EXPIRATION_ONE_MINUTE); reactiveTemplate.insert(document).block(); - assertThatThrownBy(() -> reactiveTemplate.findById(document.getId(), DocumentWithTouchOnReadAndExpirationProperty.class)) + assertThatThrownBy(() -> reactiveTemplate.findById(document.getId(), DocumentWithTouchOnReadAndExpirationProperty.class) + .subscribeOn(Schedulers.parallel())) .isInstanceOf(IllegalStateException.class) .hasMessage("Touch on read is not supported for entity without expiration property"); } @Test public void findByIds_shouldReturnEmptyList() { - StepVerifier.create(reactiveTemplate.findByIds(Collections.emptyList(), Person.class)) + StepVerifier.create(reactiveTemplate.findByIds(Collections.emptyList(), Person.class) + .subscribeOn(Schedulers.parallel())) .expectNextCount(0) .verifyComplete(); } @@ -81,7 +89,9 @@ public void findByIds_shouldFindExisting() { reactiveTemplate.insertAll(Arrays.asList(customer1, customer2, customer3)).blockLast(); List ids = Arrays.asList("unknown", customer1.getId(), customer2.getId()); - List actual = reactiveTemplate.findByIds(ids, Person.class).collectList().block(); + List actual = reactiveTemplate.findByIds(ids, Person.class) + .subscribeOn(Schedulers.parallel()) + .collectList().block(); assertThat(actual).containsExactlyInAnyOrder(customer1, customer2); } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java index bd11864e0..cba894ff0 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateInsertTests.java @@ -11,6 +11,7 @@ import org.springframework.data.aerospike.SampleClasses.VersionedClass; import org.springframework.data.aerospike.sample.Person; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.Arrays; @@ -63,7 +64,7 @@ public void insertsDocumentWithListMapDateStringLongValues() { public void insertsAndFindsDocumentWithByteArrayField() { DocumentWithByteArray document = new DocumentWithByteArray(id, new byte[]{1, 0, 0, 1, 1, 1, 0, 0}); - reactiveTemplate.insert(document).block(); + reactiveTemplate.insert(document).subscribeOn(Schedulers.parallel()).block(); DocumentWithByteArray result = findById(id, DocumentWithByteArray.class); assertThat(result).isEqualTo(document); @@ -73,7 +74,7 @@ public void insertsAndFindsDocumentWithByteArrayField() { public void insertsDocumentWithNullFields() { VersionedClass document = new VersionedClass(id, null); - reactiveTemplate.insert(document).block(); + reactiveTemplate.insert(document).subscribeOn(Schedulers.parallel()).block(); assertThat(document.getField()).isNull(); } @@ -82,7 +83,7 @@ public void insertsDocumentWithNullFields() { public void insertsDocumentWithZeroVersionIfThereIsNoDocumentWithSameKey() { VersionedClass document = new VersionedClass(id, "any"); - reactiveTemplate.insert(document).block(); + reactiveTemplate.insert(document).subscribeOn(Schedulers.parallel()).block(); assertThat(document.getVersion()).isEqualTo(1); } @@ -91,7 +92,7 @@ public void insertsDocumentWithZeroVersionIfThereIsNoDocumentWithSameKey() { public void insertsDocumentWithVersionGreaterThanZeroIfThereIsNoDocumentWithSameKey() { VersionedClass document = new VersionedClass(id, "any", 5L); - reactiveTemplate.insert(document).block(); + reactiveTemplate.insert(document).subscribeOn(Schedulers.parallel()).block(); assertThat(document.getVersion()).isEqualTo(1); } @@ -100,8 +101,8 @@ public void insertsDocumentWithVersionGreaterThanZeroIfThereIsNoDocumentWithSame public void throwsExceptionForDuplicateId() { Person person = new Person(id, "Amol", 28); - reactiveTemplate.insert(person).block(); - StepVerifier.create(reactiveTemplate.insert(person)) + reactiveTemplate.insert(person).subscribeOn(Schedulers.parallel()).block(); + StepVerifier.create(reactiveTemplate.insert(person).subscribeOn(Schedulers.parallel())) .expectError(DuplicateKeyException.class) .verify(); } @@ -110,8 +111,8 @@ public void throwsExceptionForDuplicateId() { public void throwsExceptionForDuplicateIdForVersionedDocument() { VersionedClass document = new VersionedClass(id, "any", 5L); - reactiveTemplate.insert(document).block(); - StepVerifier.create(reactiveTemplate.insert(document)) + reactiveTemplate.insert(document).subscribeOn(Schedulers.parallel()).block(); + StepVerifier.create(reactiveTemplate.insert(document).subscribeOn(Schedulers.parallel())) .expectError(DuplicateKeyException.class) .verify(); } @@ -126,6 +127,7 @@ public void insertsOnlyFirstDocumentAndNextAttemptsShouldFailWithDuplicateKeyExc long counterValue = counter.incrementAndGet(); String data = "value-" + counterValue; reactiveTemplate.insert(new VersionedClass(id, data)) + .subscribeOn(Schedulers.parallel()) .onErrorResume(DuplicateKeyException.class, e -> { duplicateKeyCounter.incrementAndGet(); return Mono.empty(); @@ -146,6 +148,7 @@ public void insertsOnlyFirstDocumentAndNextAttemptsShouldFailWithDuplicateKeyExc long counterValue = counter.incrementAndGet(); String data = "value-" + counterValue; reactiveTemplate.insert(new Person(id, data, 28)) + .subscribeOn(Schedulers.parallel()) .onErrorResume(DuplicateKeyException.class, e -> { duplicateKeyCounter.incrementAndGet(); return Mono.empty(); diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateMiscTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateMiscTests.java index 5b667cdc1..7e0de5fd4 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateMiscTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateMiscTests.java @@ -9,6 +9,7 @@ import org.springframework.data.aerospike.BaseReactiveIntegrationTests; import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.sample.Person; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; /** @@ -29,7 +30,7 @@ public void execute_shouldTranslateException() { StepVerifier.create(reactiveTemplate.execute(() -> { WritePolicy writePolicy = new WritePolicy(); writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; - return reactorClient.add(writePolicy, key, bin).block(); + return reactorClient.add(writePolicy, key, bin).subscribeOn(Schedulers.parallel()).block(); })) .expectError(DuplicateKeyException.class) .verify(); @@ -37,16 +38,16 @@ public void execute_shouldTranslateException() { public void exists_shouldReturnTrueIfValueIsPresent() { Person one = Person.builder().id(id).firstName("tya").emailAddress("gmail.com").build(); - reactiveTemplate.insert(one).block(); + reactiveTemplate.insert(one).subscribeOn(Schedulers.parallel()).block(); - StepVerifier.create(reactiveTemplate.exists(id, Person.class)) + StepVerifier.create(reactiveTemplate.exists(id, Person.class).subscribeOn(Schedulers.parallel())) .expectNext(true) .verifyComplete(); } @Test public void exists_shouldReturnFalseIfValueIsAbsent() { - StepVerifier.create(reactiveTemplate.exists(id, Person.class)) + StepVerifier.create(reactiveTemplate.exists(id, Person.class).subscribeOn(Schedulers.parallel())) .expectNext(false) .verifyComplete(); } diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateModificationRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateModificationRelatedTests.java index ad68ce37f..e11a67d4a 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateModificationRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateModificationRelatedTests.java @@ -5,6 +5,7 @@ import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.sample.Person; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.HashMap; @@ -26,7 +27,7 @@ public void shouldAdd() { StepVerifier.create(created).expectNext(one).verifyComplete(); // when - Mono updated = reactiveTemplate.add(one, "age", 1); + Mono updated = reactiveTemplate.add(one, "age", 1).subscribeOn(Schedulers.parallel()); // then StepVerifier.create(updated) @@ -38,11 +39,11 @@ public void shouldAdd() { public void shouldAppend() { // given Person one = Person.builder().id(id).firstName("Nas").build(); - Mono created = reactiveTemplate.insert(one); + Mono created = reactiveTemplate.insert(one).subscribeOn(Schedulers.parallel()); StepVerifier.create(created).expectNext(one).verifyComplete(); // when - Mono appended = reactiveTemplate.append(one, "firstName", "tya"); + Mono appended = reactiveTemplate.append(one, "firstName", "tya").subscribeOn(Schedulers.parallel()); // then Person expected = Person.builder().id(id).firstName("Nastya").build(); @@ -56,7 +57,7 @@ public void shouldAppend() { public void shouldAppendMultipleFields() { // given Person one = Person.builder().id(id).firstName("Nas").emailAddress("nastya@").build(); - Mono created = reactiveTemplate.insert(one); + Mono created = reactiveTemplate.insert(one).subscribeOn(Schedulers.parallel()); StepVerifier.create(created).expectNext(one).verifyComplete(); Map toBeUpdated = new HashMap<>(); @@ -64,13 +65,13 @@ public void shouldAppendMultipleFields() { toBeUpdated.put("email", "gmail.com"); // when - Mono appended = reactiveTemplate.append(one, toBeUpdated); + Mono appended = reactiveTemplate.append(one, toBeUpdated).subscribeOn(Schedulers.parallel()); // then Person expected = Person.builder().id(id).firstName("Nastya").emailAddress("nastya@gmail.com").build(); StepVerifier.create(appended).expectNext(expected).verifyComplete(); - Mono storedPerson = reactiveTemplate.findById(id, Person.class); + Mono storedPerson = reactiveTemplate.findById(id, Person.class).subscribeOn(Schedulers.parallel()); StepVerifier.create(storedPerson).expectNext(expected).verifyComplete(); } @@ -78,11 +79,12 @@ public void shouldAppendMultipleFields() { public void shouldPrepend() { // given Person one = Person.builder().id(id).firstName("tya").build(); - Mono created = reactiveTemplate.insert(one); + Mono created = reactiveTemplate.insert(one).subscribeOn(Schedulers.parallel()); StepVerifier.create(created).expectNext(one).verifyComplete(); // when - Mono appended = reactiveTemplate.prepend(one, "firstName", "Nas"); + Mono appended = reactiveTemplate.prepend(one, "firstName", "Nas") + .subscribeOn(Schedulers.parallel()); // then Person expected = Person.builder().id(id).firstName("Nastya").build(); @@ -96,7 +98,7 @@ public void shouldPrepend() { public void shouldPrependMultipleFields() { // given Person one = Person.builder().id(id).firstName("tya").emailAddress("gmail.com").build(); - Mono created = reactiveTemplate.insert(one); + Mono created = reactiveTemplate.insert(one).subscribeOn(Schedulers.parallel()); StepVerifier.create(created).expectNext(one).verifyComplete(); Map toBeUpdated = new HashMap<>(); @@ -104,13 +106,13 @@ public void shouldPrependMultipleFields() { toBeUpdated.put("email", "nastya@"); // when - Mono appended = reactiveTemplate.prepend(one, toBeUpdated); + Mono appended = reactiveTemplate.prepend(one, toBeUpdated).subscribeOn(Schedulers.parallel()); // then Person expected = Person.builder().id(id).firstName("Nastya").emailAddress("nastya@gmail.com").build(); StepVerifier.create(appended).expectNext(expected).verifyComplete(); - Mono storedPerson = reactiveTemplate.findById(id, Person.class); + Mono storedPerson = reactiveTemplate.findById(id, Person.class).subscribeOn(Schedulers.parallel()); StepVerifier.create(storedPerson).expectNext(expected).verifyComplete(); } } \ No newline at end of file diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java index f5da0d843..60399b685 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateSaveRelatedTests.java @@ -12,6 +12,7 @@ import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate; import org.springframework.data.aerospike.sample.Person; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.concurrent.atomic.AtomicLong; @@ -29,7 +30,7 @@ public class ReactiveAerospikeTemplateSaveRelatedTests extends BaseReactiveInteg @Test public void save_shouldSaveAndSetVersion() { VersionedClass first = new VersionedClass(id, "foo"); - reactiveTemplate.save(first).block(); + reactiveTemplate.save(first).subscribeOn(Schedulers.parallel()).block(); assertThat(first.version).isEqualTo(1); assertThat(findById(id, VersionedClass.class).version).isEqualTo(1); @@ -37,39 +38,39 @@ public void save_shouldSaveAndSetVersion() { @Test(expected = OptimisticLockingFailureException.class) public void save_shouldNotSaveDocumentIfItAlreadyExistsWithZeroVersion() { - reactiveTemplate.save(new VersionedClass(id, "foo", 0L)).block(); - reactiveTemplate.save(new VersionedClass(id, "foo", 0L)).block(); + reactiveTemplate.save(new VersionedClass(id, "foo", 0L)).subscribeOn(Schedulers.parallel()).block(); + reactiveTemplate.save(new VersionedClass(id, "foo", 0L)).subscribeOn(Schedulers.parallel()).block(); } @Test public void save_shouldSaveDocumentWithEqualVersion() { - reactiveTemplate.save(new VersionedClass(id, "foo")).block(); + reactiveTemplate.save(new VersionedClass(id, "foo")).subscribeOn(Schedulers.parallel()).block(); - reactiveTemplate.save(new VersionedClass(id, "foo", 1L)).block(); - reactiveTemplate.save(new VersionedClass(id, "foo", 2L)).block(); + reactiveTemplate.save(new VersionedClass(id, "foo", 1L)).subscribeOn(Schedulers.parallel()).block(); + reactiveTemplate.save(new VersionedClass(id, "foo", 2L)).subscribeOn(Schedulers.parallel()).block(); } @Test(expected = DataRetrievalFailureException.class) public void save_shouldFailSaveNewDocumentWithVersionGreaterThanZero() { - reactiveTemplate.save(new VersionedClass(id, "foo", 5L)).block(); + reactiveTemplate.save(new VersionedClass(id, "foo", 5L)).subscribeOn(Schedulers.parallel()).block(); } @Test public void save_shouldUpdateNullField() { VersionedClass versionedClass = new VersionedClass(id, null); - VersionedClass saved = reactiveTemplate.save(versionedClass).block(); - reactiveTemplate.save(saved).block(); + VersionedClass saved = reactiveTemplate.save(versionedClass).subscribeOn(Schedulers.parallel()).block(); + reactiveTemplate.save(saved).subscribeOn(Schedulers.parallel()).block(); } @Test public void save_shouldUpdateNullFieldForClassWithVersionField() { VersionedClass versionedClass = new VersionedClass(id, "field"); - reactiveTemplate.save(versionedClass).block(); + reactiveTemplate.save(versionedClass).subscribeOn(Schedulers.parallel()).block(); assertThat(findById(id, VersionedClass.class).getField()).isEqualTo("field"); versionedClass.setField(null); - reactiveTemplate.save(versionedClass).block(); + reactiveTemplate.save(versionedClass).subscribeOn(Schedulers.parallel()).block(); assertThat(findById(id, VersionedClass.class).getField()).isNull(); } @@ -77,12 +78,12 @@ public void save_shouldUpdateNullFieldForClassWithVersionField() { @Test public void save_shouldUpdateNullFieldForClassWithoutVersionField() { Person person = new Person(id, "Oliver"); - reactiveTemplate.save(person).block(); + reactiveTemplate.save(person).subscribeOn(Schedulers.parallel()).block(); assertThat(findById(id, Person.class).getFirstName()).isEqualTo("Oliver"); person.setFirstName(null); - reactiveTemplate.save(person).block(); + reactiveTemplate.save(person).subscribeOn(Schedulers.parallel()).block(); assertThat(findById(id, Person.class).getFirstName()).isNull(); } @@ -90,9 +91,9 @@ public void save_shouldUpdateNullFieldForClassWithoutVersionField() { @Test public void save_shouldUpdateExistingDocument() { VersionedClass one = new VersionedClass(id, "foo"); - reactiveTemplate.save(one).block(); + reactiveTemplate.save(one).subscribeOn(Schedulers.parallel()).block(); - reactiveTemplate.save(new VersionedClass(id, "foo1", one.version)).block(); + reactiveTemplate.save(new VersionedClass(id, "foo1", one.version)).subscribeOn(Schedulers.parallel()).block(); VersionedClass value = findById(id, VersionedClass.class); assertThat(value.version).isEqualTo(2); @@ -102,9 +103,9 @@ public void save_shouldUpdateExistingDocument() { @Test public void save_shouldSetVersionWhenSavingTheSameDocument() { VersionedClass one = new VersionedClass(id, "foo"); - reactiveTemplate.save(one).block(); - reactiveTemplate.save(one).block(); - reactiveTemplate.save(one).block(); + reactiveTemplate.save(one).subscribeOn(Schedulers.parallel()).block(); + reactiveTemplate.save(one).subscribeOn(Schedulers.parallel()).block(); + reactiveTemplate.save(one).subscribeOn(Schedulers.parallel()).block(); assertThat(one.version).isEqualTo(3); } @@ -115,7 +116,7 @@ public void save_shouldUpdateAlreadyExistingDocument() throws Exception { int numberOfConcurrentSaves = 5; VersionedClass initial = new VersionedClass(id, "value-0"); - reactiveTemplate.save(initial).block(); + reactiveTemplate.save(initial).subscribeOn(Schedulers.parallel()).block(); assertThat(initial.version).isEqualTo(1); AsyncUtils.executeConcurrently(numberOfConcurrentSaves, () -> { @@ -125,7 +126,7 @@ public void save_shouldUpdateAlreadyExistingDocument() throws Exception { VersionedClass messageData = findById(id, VersionedClass.class); messageData.field = "value-" + counterValue; try { - reactiveTemplate.save(messageData).block(); + reactiveTemplate.save(messageData).subscribeOn(Schedulers.parallel()).block(); saved = true; } catch (OptimisticLockingFailureException ignore) { } @@ -150,6 +151,7 @@ public void save_shouldSaveOnlyFirstDocumentAndNextAttemptsShouldFailWithOptimis String data = "value-" + counterValue; VersionedClass messageData = new VersionedClass(id, data); reactiveTemplate.save(messageData) + .subscribeOn(Schedulers.parallel()) .onErrorResume(OptimisticLockingFailureException.class, (e) -> { optimisticLockCounter.incrementAndGet(); return Mono.empty(); @@ -164,8 +166,8 @@ public void save_shouldSaveOnlyFirstDocumentAndNextAttemptsShouldFailWithOptimis public void save_shouldSaveMultipleTimeDocumentWithoutVersion() { CustomCollectionClass one = new CustomCollectionClass(id, "numbers"); - reactiveTemplate.save(one).block(); - reactiveTemplate.save(one).block(); + reactiveTemplate.save(one).subscribeOn(Schedulers.parallel()).block(); + reactiveTemplate.save(one).subscribeOn(Schedulers.parallel()).block(); assertThat(findById(id, CustomCollectionClass.class)).isEqualTo(one); } @@ -175,8 +177,8 @@ public void save_shouldUpdateDocumentDataWithoutVersion() { CustomCollectionClass first = new CustomCollectionClass(id, "numbers"); CustomCollectionClass second = new CustomCollectionClass(id, "hot dog"); - reactiveTemplate.save(first).block(); - reactiveTemplate.save(second).block(); + reactiveTemplate.save(first).subscribeOn(Schedulers.parallel()).block(); + reactiveTemplate.save(second).subscribeOn(Schedulers.parallel()).block(); assertThat(findById(id, CustomCollectionClass.class)).isEqualTo(second); } @@ -185,10 +187,10 @@ public void save_shouldUpdateDocumentDataWithoutVersion() { public void save_shouldReplaceAllBinsPresentInAerospikeWhenSavingDocument() { Key key = new Key(getNameSpace(), "versioned-set", id); VersionedClass first = new VersionedClass(id, "foo"); - reactiveTemplate.save(first).block(); + reactiveTemplate.save(first).subscribeOn(Schedulers.parallel()).block(); blockingAerospikeTestOperations.addNewFieldToSavedDataInAerospike(key); - reactiveTemplate.save(new VersionedClass(id, "foo2", 2L)).block(); + reactiveTemplate.save(new VersionedClass(id, "foo2", 2L)).subscribeOn(Schedulers.parallel()).block(); StepVerifier.create(reactorClient.get(new Policy(), key)) .assertNext(keyRecord -> { diff --git a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryDeleteRelatedTests.java b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryDeleteRelatedTests.java index 466bb3857..457dce89a 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryDeleteRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryDeleteRelatedTests.java @@ -8,6 +8,7 @@ import org.springframework.data.aerospike.sample.Customer; import org.springframework.data.aerospike.sample.ReactiveCustomerRepository; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.List; @@ -37,14 +38,14 @@ public void setUp() { @Test public void deleteById_ShouldDeleteExistent() { - StepVerifier.create(customerRepo.deleteById(customer2.getId())).verifyComplete(); + StepVerifier.create(customerRepo.deleteById(customer2.getId()).subscribeOn(Schedulers.parallel())).verifyComplete(); StepVerifier.create(customerRepo.findById(customer2.getId())).expectNextCount(0).verifyComplete(); } @Test public void deleteById_ShouldSkipNonexistent() { - StepVerifier.create(customerRepo.deleteById("non-existent-id")).verifyComplete(); + StepVerifier.create(customerRepo.deleteById("non-existent-id").subscribeOn(Schedulers.parallel())).verifyComplete(); } @Test(expected = IllegalArgumentException.class) @@ -54,7 +55,9 @@ public void deleteById_ShouldRejectsNullObject() { @Test public void deleteByIdPublisher_ShouldDeleteOnlyFirstElement() { - StepVerifier.create(customerRepo.deleteById(Flux.just(customer1.getId(), customer2.getId()))).verifyComplete(); + StepVerifier.create(customerRepo.deleteById(Flux.just(customer1.getId(), customer2.getId())) + .subscribeOn(Schedulers.parallel())) + .verifyComplete(); StepVerifier.create(customerRepo.findById(customer1.getId())).expectNextCount(0).verifyComplete(); StepVerifier.create(customerRepo.findById(customer2.getId())).expectNext(customer2).verifyComplete(); @@ -62,7 +65,9 @@ public void deleteByIdPublisher_ShouldDeleteOnlyFirstElement() { @Test public void deleteByIdPublisher_ShouldSkipNonexistent() { - StepVerifier.create(customerRepo.deleteById(Flux.just("non-existent-id"))).verifyComplete(); + StepVerifier.create(customerRepo.deleteById(Flux.just("non-existent-id")) + .subscribeOn(Schedulers.parallel())) + .verifyComplete(); } @Test(expected = IllegalArgumentException.class) @@ -72,7 +77,7 @@ public void deleteByIdPublisher_ShouldRejectsNullObject() { @Test public void delete_ShouldDeleteExistent() { - StepVerifier.create(customerRepo.delete(customer2)).verifyComplete(); + StepVerifier.create(customerRepo.delete(customer2).subscribeOn(Schedulers.parallel())).verifyComplete(); StepVerifier.create(customerRepo.findById(customer2.getId())).expectNextCount(0).verifyComplete(); } @@ -81,7 +86,8 @@ public void delete_ShouldDeleteExistent() { public void delete_ShouldSkipNonexistent() { Customer nonExistentCustomer = Customer.builder().id(nextId()).firstname("Bart").lastname("Simpson").age(15).build(); - StepVerifier.create(customerRepo.delete(nonExistentCustomer)).verifyComplete(); + StepVerifier.create(customerRepo.delete(nonExistentCustomer).subscribeOn(Schedulers.parallel())) + .verifyComplete(); } @Test(expected = IllegalArgumentException.class) @@ -91,7 +97,7 @@ public void delete_ShouldRejectsNullObject() { @Test public void deleteAllIterable_ShouldDeleteExistent() { - customerRepo.deleteAll(asList(customer1, customer2)).block(); + customerRepo.deleteAll(asList(customer1, customer2)).subscribeOn(Schedulers.parallel()).block(); StepVerifier.create(customerRepo.findById(customer1.getId())).expectNextCount(0).verifyComplete(); StepVerifier.create(customerRepo.findById(customer2.getId())).expectNextCount(0).verifyComplete(); @@ -101,7 +107,7 @@ public void deleteAllIterable_ShouldDeleteExistent() { public void deleteAllIterable_ShouldSkipNonexistent() { Customer nonExistentCustomer = Customer.builder().id(nextId()).firstname("Bart").lastname("Simpson").age(15).build(); - customerRepo.deleteAll(asList(customer1, nonExistentCustomer, customer2)).block(); + customerRepo.deleteAll(asList(customer1, nonExistentCustomer, customer2)).subscribeOn(Schedulers.parallel()).block(); StepVerifier.create(customerRepo.findById(customer1.getId())).expectNextCount(0).verifyComplete(); StepVerifier.create(customerRepo.findById(customer2.getId())).expectNextCount(0).verifyComplete(); @@ -110,13 +116,13 @@ public void deleteAllIterable_ShouldSkipNonexistent() { @Test(expected = IllegalArgumentException.class) public void deleteAllIterable_ShouldRejectsNullObject() { List entities = asList(customer1, null, customer2); - customerRepo.deleteAll(entities).block(); + customerRepo.deleteAll(entities).subscribeOn(Schedulers.parallel()).block(); } @Test public void deleteAllPublisher_ShouldDeleteExistent() { - customerRepo.deleteAll(Flux.just(customer1, customer2)).block(); + customerRepo.deleteAll(Flux.just(customer1, customer2)).subscribeOn(Schedulers.parallel()).block(); StepVerifier.create(customerRepo.findById(customer1.getId())).expectNextCount(0).verifyComplete(); StepVerifier.create(customerRepo.findById(customer2.getId())).expectNextCount(0).verifyComplete(); @@ -126,7 +132,7 @@ public void deleteAllPublisher_ShouldDeleteExistent() { public void deleteAllPublisher_ShouldSkipNonexistent() { Customer nonExistentCustomer = Customer.builder().id(nextId()).firstname("Bart").lastname("Simpson").age(15).build(); - customerRepo.deleteAll(Flux.just(customer1, nonExistentCustomer, customer2)).block(); + customerRepo.deleteAll(Flux.just(customer1, nonExistentCustomer, customer2)).subscribeOn(Schedulers.parallel()).block(); StepVerifier.create(customerRepo.findById(customer1.getId())).expectNextCount(0).verifyComplete(); StepVerifier.create(customerRepo.findById(customer2.getId())).expectNextCount(0).verifyComplete(); diff --git a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryExistRelatedTests.java b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryExistRelatedTests.java index b4f2f8514..98941d5b5 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryExistRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryExistRelatedTests.java @@ -8,6 +8,7 @@ import org.springframework.data.aerospike.sample.Customer; import org.springframework.data.aerospike.sample.ReactiveCustomerRepository; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; @@ -31,27 +32,33 @@ public void setUp() { @Test public void existsById_ShouldReturnTrueWhenExists() { - StepVerifier.create(customerRepo.existsById(customer2.getId())).expectNext(true).verifyComplete(); + StepVerifier.create(customerRepo.existsById(customer2.getId()).subscribeOn(Schedulers.parallel())) + .expectNext(true).verifyComplete(); } @Test public void existsById_ShouldReturnFalseWhenNotExists() { - StepVerifier.create(customerRepo.existsById("non-existent-id")).expectNext(false).verifyComplete(); + StepVerifier.create(customerRepo.existsById("non-existent-id").subscribeOn(Schedulers.parallel())) + .expectNext(false).verifyComplete(); } @Test public void existsByIdPublisher_ShouldReturnTrueWhenExists() { - StepVerifier.create(customerRepo.existsById(Flux.just(customer1.getId()))).expectNext(true).verifyComplete(); + StepVerifier.create(customerRepo.existsById(Flux.just(customer1.getId())).subscribeOn(Schedulers.parallel())) + .expectNext(true).verifyComplete(); } @Test public void existsByIdPublisher_ShouldReturnFalseWhenNotExists() { - StepVerifier.create(customerRepo.existsById(Flux.just("non-existent-id"))).expectNext(false).verifyComplete(); + StepVerifier.create(customerRepo.existsById(Flux.just("non-existent-id")).subscribeOn(Schedulers.parallel())) + .expectNext(false).verifyComplete(); } @Test public void existsByIdPublisher_ShouldCheckOnlyFirstElement() { - StepVerifier.create(customerRepo.existsById(Flux.just(customer1.getId(), "non-existent-id"))).expectNext(true).verifyComplete(); + StepVerifier.create(customerRepo.existsById(Flux.just(customer1.getId(), "non-existent-id")) + .subscribeOn(Schedulers.parallel())) + .expectNext(true).verifyComplete(); } } diff --git a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryFindRelatedTests.java b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryFindRelatedTests.java index f72db97fc..c2abf5d9d 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryFindRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositoryFindRelatedTests.java @@ -12,6 +12,7 @@ import org.springframework.data.domain.Sort; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.ArrayList; @@ -51,36 +52,44 @@ public void setUp() { @Test public void findById_ShouldReturnExistent() { - StepVerifier.create(customerRepo.findById(customer2.getId())).consumeNextWith(actual -> + StepVerifier.create(customerRepo.findById(customer2.getId()) + .subscribeOn(Schedulers.parallel())).consumeNextWith(actual -> assertThat(actual).isEqualTo(customer2) ).verifyComplete(); } @Test public void findById_ShouldNotReturnNotExistent() { - StepVerifier.create(customerRepo.findById("non-existent-id")).expectNextCount(0).verifyComplete(); + StepVerifier.create(customerRepo.findById("non-existent-id") + .subscribeOn(Schedulers.parallel())) + .expectNextCount(0).verifyComplete(); } @Test public void findByIdPublisher_ShouldReturnFirst() { Publisher ids = Flux.just(customer2.getId(), customer4.getId()); - StepVerifier.create(customerRepo.findById(ids)).consumeNextWith(actual -> - assertThat(actual).isEqualTo(customer2) - ).verifyComplete(); + StepVerifier.create(customerRepo.findById(ids) + .subscribeOn(Schedulers.parallel())) + .consumeNextWith(actual -> + assertThat(actual).isEqualTo(customer2) + ).verifyComplete(); } @Test public void findByIdPublisher_NotReturnFirstNotExistent() { Publisher ids = Flux.just("non-existent-id", customer2.getId(), customer4.getId()); - StepVerifier.create(customerRepo.findById(ids)).expectNextCount(0).verifyComplete(); + StepVerifier.create(customerRepo.findById(ids) + .subscribeOn(Schedulers.parallel())) + .expectNextCount(0).verifyComplete(); } @Test public void findAll_ShouldReturnAll() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findAll()), + StepVerifier.create(customerRepo.findAll() + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer1, customer2, customer3, customer4)); } @@ -88,7 +97,8 @@ public void findAll_ShouldReturnAll() { public void findAllByIDsIterable_ShouldReturnAllExistent() { Iterable ids = asList(customer2.getId(), "non-existent-id", customer4.getId()); assertConsumedCustomers( - StepVerifier.create(customerRepo.findAllById(ids)), + StepVerifier.create(customerRepo.findAllById(ids) + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer2, customer4)); } @@ -97,63 +107,72 @@ public void findAllByIDsIterable_ShouldReturnAllExistent() { public void findAllByIDsPublisher_ShouldReturnAllExistent() { Publisher ids = Flux.just(customer1.getId(), customer2.getId(), customer4.getId(), "non-existent-id"); assertConsumedCustomers( - StepVerifier.create(customerRepo.findAllById(ids)), + StepVerifier.create(customerRepo.findAllById(ids) + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer1, customer2, customer4)); } @Test public void findByLastname_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByLastname("Simpson")), + StepVerifier.create(customerRepo.findByLastname("Simpson") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer1, customer2, customer3)); } @Test public void findByLastnameName_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByLastnameNot("Simpson")), + StepVerifier.create(customerRepo.findByLastnameNot("Simpson") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer4)); } @Test public void findOneByLastname_ShouldWorkProperly() { - StepVerifier.create(customerRepo.findOneByLastname("Groening")).consumeNextWith(actual -> - assertThat(actual).isEqualTo(customer4) - ).verifyComplete(); + StepVerifier.create(customerRepo.findOneByLastname("Groening") + .subscribeOn(Schedulers.parallel())) + .consumeNextWith(actual -> assertThat(actual).isEqualTo(customer4) + ).verifyComplete(); } @Test public void findByLastnameOrderByFirstnameAsc_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByLastnameOrderByFirstnameAsc("Simpson")), + StepVerifier.create(customerRepo.findByLastnameOrderByFirstnameAsc("Simpson") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsExactly(customer3, customer1, customer2)); } @Test public void findByLastnameOrderByFirstnameDesc_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByLastnameOrderByFirstnameDesc("Simpson")), + StepVerifier.create(customerRepo.findByLastnameOrderByFirstnameDesc("Simpson") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsExactly(customer2, customer1, customer3)); } @Test public void findByFirstnameEndsWith_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByFirstnameEndsWith("t")), + StepVerifier.create(customerRepo.findByFirstnameEndsWith("t") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer3, customer4)); } @Test public void findByFirstnameStartsWithOrderByAgeAsc_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByFirstnameStartsWithOrderByAgeAsc("Ma")), + StepVerifier.create(customerRepo.findByFirstnameStartsWithOrderByAgeAsc("Ma") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsExactly(customer2, customer4)); } @Test public void findByAgeLessThan_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByAgeLessThan(40, Sort.by(asc("firstname")))), + StepVerifier.create(customerRepo.findByAgeLessThan(40, Sort.by(asc("firstname"))) + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsExactly(customer3, customer2) ); } @@ -161,68 +180,75 @@ public void findByAgeLessThan_ShouldWorkProperly() { @Test public void findByFirstnameIn_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByFirstnameIn(asList("Matt", "Homer"))), + StepVerifier.create(customerRepo.findByFirstnameIn(asList("Matt", "Homer")) + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer1, customer4)); } @Test public void findByFirstnameAndLastname_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByFirstnameAndLastname("Bart", "Simpson")), + StepVerifier.create(customerRepo.findByFirstnameAndLastname("Bart", "Simpson") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer3)); } @Test public void findOneByFirstnameAndLastname_ShouldWorkProperly() { - StepVerifier.create(customerRepo.findOneByFirstnameAndLastname("Bart", "Simpson")).consumeNextWith(actual -> - assertThat(actual).isEqualTo(customer3) - ).verifyComplete(); + StepVerifier.create(customerRepo.findOneByFirstnameAndLastname("Bart", "Simpson") + .subscribeOn(Schedulers.parallel())) + .consumeNextWith(actual ->assertThat(actual).isEqualTo(customer3) + ).verifyComplete(); } @Test public void findByLastnameAndAge_ShouldWorkProperly() { - StepVerifier.create(customerRepo.findByLastnameAndAge("Simpson", 15)).consumeNextWith(actual -> - assertThat(actual).isEqualTo(customer3) - ).verifyComplete(); + StepVerifier.create(customerRepo.findByLastnameAndAge("Simpson", 15) + .subscribeOn(Schedulers.parallel())) + .consumeNextWith(actual -> assertThat(actual).isEqualTo(customer3) + ).verifyComplete(); } @Test public void findByAgeBetween_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByAgeBetween(10, 40)), + StepVerifier.create(customerRepo.findByAgeBetween(10, 40) + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer2, customer3)); } @Test public void findByFirstnameContains_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByFirstnameContains("ar")), + StepVerifier.create(customerRepo.findByFirstnameContains("ar") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer2, customer3)); } @Test public void findByFirstnameContainingIgnoreCase_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByFirstnameContainingIgnoreCase("m")), + StepVerifier.create(customerRepo.findByFirstnameContainingIgnoreCase("m") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer1, customer2, customer4)); } @Test public void findByAgeBetweenAndLastname_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByAgeBetweenAndLastname(30, 70,"Simpson")), + StepVerifier.create(customerRepo.findByAgeBetweenAndLastname(30, 70,"Simpson") + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsOnly(customer1, customer2)); } @Test public void findByAgeBetweenOrderByFirstnameDesc_ShouldWorkProperly() { assertConsumedCustomers( - StepVerifier.create(customerRepo.findByAgeBetweenOrderByFirstnameDesc(30, 70)), + StepVerifier.create(customerRepo.findByAgeBetweenOrderByFirstnameDesc(30, 70) + .subscribeOn(Schedulers.parallel())), customers -> assertThat(customers).containsExactly(customer4, customer2, customer1)); } - - private void assertConsumedCustomers(StepVerifier.FirstStep step, Consumer> assertion) { step.recordWith(ArrayList::new) .thenConsumeWhile(customer -> true) diff --git a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositorySaveRelatedTests.java b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositorySaveRelatedTests.java index 9d3f546c4..5a4c82057 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositorySaveRelatedTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/reactive/ReactiveAerospikeRepositorySaveRelatedTests.java @@ -11,6 +11,7 @@ import org.springframework.data.aerospike.sample.ReactiveCustomerRepository; import org.springframework.data.aerospike.sample.SimpleObject; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.ArrayList; @@ -41,26 +42,26 @@ public void setUp() { @Test public void saveEntityShouldInsertNewEntity() { - StepVerifier.create(customerRepo.save(customer1)).expectNext(customer1).verifyComplete(); + StepVerifier.create(customerRepo.save(customer1).subscribeOn(Schedulers.parallel())).expectNext(customer1).verifyComplete(); assertCustomerExistsInRepo(customer1); } @Test public void saveEntityShouldUpdateExistingEntity() { - StepVerifier.create(customerRepo.save(customer1)).expectNext(customer1).verifyComplete(); + StepVerifier.create(customerRepo.save(customer1).subscribeOn(Schedulers.parallel())).expectNext(customer1).verifyComplete(); customer1.setFirstname("Matt"); customer1.setLastname("Groening"); - StepVerifier.create(customerRepo.save(customer1)).expectNext(customer1).verifyComplete(); + StepVerifier.create(customerRepo.save(customer1).subscribeOn(Schedulers.parallel())).expectNext(customer1).verifyComplete(); assertCustomerExistsInRepo(customer1); } @Test public void saveIterableOfNewEntitiesShouldInsertEntity() { - StepVerifier.create(customerRepo.saveAll(Arrays.asList(customer1, customer2, customer3))) + StepVerifier.create(customerRepo.saveAll(Arrays.asList(customer1, customer2, customer3)).subscribeOn(Schedulers.parallel())) .recordWith(ArrayList::new) .thenConsumeWhile(customer -> true) .consumeRecordedWith(actual -> @@ -74,12 +75,14 @@ public void saveIterableOfNewEntitiesShouldInsertEntity() { @Test public void saveIterableOfMixedEntitiesShouldInsertNewAndUpdateOld() { - StepVerifier.create(customerRepo.save(customer1)).expectNext(customer1).verifyComplete(); + StepVerifier.create(customerRepo.save(customer1).subscribeOn(Schedulers.parallel())) + .expectNext(customer1).verifyComplete(); customer1.setFirstname("Matt"); customer1.setLastname("Groening"); - StepVerifier.create(customerRepo.saveAll(Arrays.asList(customer1, customer2, customer3))).expectNextCount(3).verifyComplete(); + StepVerifier.create(customerRepo.saveAll(Arrays.asList(customer1, customer2, customer3)).subscribeOn(Schedulers.parallel())) + .expectNextCount(3).verifyComplete(); assertCustomerExistsInRepo(customer1); assertCustomerExistsInRepo(customer2); @@ -88,7 +91,8 @@ public void saveIterableOfMixedEntitiesShouldInsertNewAndUpdateOld() { @Test public void savePublisherOfEntitiesShouldInsertEntity() { - StepVerifier.create(customerRepo.saveAll(Flux.just(customer1, customer2, customer3))).expectNextCount(3).verifyComplete(); + StepVerifier.create(customerRepo.saveAll(Flux.just(customer1, customer2, customer3)).subscribeOn(Schedulers.parallel())) + .expectNextCount(3).verifyComplete(); assertCustomerExistsInRepo(customer1); assertCustomerExistsInRepo(customer2); @@ -97,7 +101,8 @@ public void savePublisherOfEntitiesShouldInsertEntity() { @Test public void savePublisherOfMixedEntitiesShouldInsertNewAndUpdateOld() { - StepVerifier.create(customerRepo.save(customer1)).expectNext(customer1).verifyComplete(); + StepVerifier.create(customerRepo.save(customer1).subscribeOn(Schedulers.parallel())) + .expectNext(customer1).verifyComplete(); customer1.setFirstname("Matt"); customer1.setLastname("Groening"); @@ -117,7 +122,8 @@ public void shouldSaveObjectWithPersistenceConstructorThatHasAllFields() { .simpleObject(SimpleObject.builder().property1("prop1").property2(555).build()) .build(); - StepVerifier.create(compositeRepo.save(expected)).expectNext(expected).verifyComplete(); + StepVerifier.create(compositeRepo.save(expected).subscribeOn(Schedulers.parallel())) + .expectNext(expected).verifyComplete(); StepVerifier.create(compositeRepo.findById(expected.getId())).consumeNextWith(actual -> { assertThat(actual.getIntValue()).isEqualTo(expected.getIntValue());