Skip to content

Commit

Permalink
BlockHound used to prove reactivity (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
kptfh authored Mar 4, 2020
1 parent 9119f27 commit fbc21c5
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 153 deletions.
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<joda-time.version>2.10.5</joda-time.version>
<embedded-aerospike.version>1.33</embedded-aerospike.version>
<awaitility.version>4.0.1</awaitility.version>
<blockhound.version>1.0.2.RELEASE</blockhound.version>
</properties>

<licenses>
Expand Down Expand Up @@ -247,6 +248,14 @@
<scope>test</scope>
</dependency>

<!--Needed for tracking blocking calls in reactive application https://github.com/reactor/BlockHound -->
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<version>${blockhound.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package org.springframework.data.aerospike;

import com.aerospike.client.lua.LuaAerospikeLib;
import com.aerospike.client.reactor.AerospikeReactorClient;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate;
import reactor.blockhound.BlockHound;
import reactor.core.publisher.Mono;

import java.io.Serializable;
import java.time.Duration;

public abstract class BaseReactiveIntegrationTests extends BaseIntegrationTests {

Expand All @@ -17,4 +23,23 @@ protected <T> T findById(Serializable id, Class<T> type) {
return reactiveTemplate.findById(id, type).block();
}

@BeforeClass
public static void installBlockHound() {
BlockHound.install();
}

@Test(expected = RuntimeException.class)
public void shouldFailAsBlocking(){
Mono.delay(Duration.ofSeconds(1))
.doOnNext(it -> {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.block();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.sample.Person;
import org.springframework.data.repository.query.parser.Part;
import reactor.core.scheduler.Schedulers;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -32,13 +33,19 @@ public void count_shouldFindAllItemsByGivenCriteria() {
reactiveTemplate.insert(new Person(nextId(), "vasili", 52)).block();
reactiveTemplate.insert(new Person(nextId(), "petya", 52)).block();

Long vasyaCount = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName")), Person.class).block();
Long vasyaCount = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName")), Person.class)
.subscribeOn(Schedulers.parallel())
.block();
assertThat(vasyaCount).isEqualTo(3);

Long vasya51Count = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName").and("age").is(51, "age")), Person.class).block();
Long vasya51Count = reactiveTemplate.count(new Query(new Criteria().is("vasili", "firstName").and("age").is(51, "age")), Person.class)
.subscribeOn(Schedulers.parallel())
.block();
assertThat(vasya51Count).isEqualTo(1);

Long petyaCount = reactiveTemplate.count(new Query(new Criteria().is("petya", "firstName")), Person.class).block();
Long petyaCount = reactiveTemplate.count(new Query(new Criteria().is("petya", "firstName")), Person.class)
.subscribeOn(Schedulers.parallel())
.block();
assertThat(petyaCount).isEqualTo(1);
}

Expand All @@ -52,12 +59,16 @@ public void count_shouldFindAllItemsByGivenCriteriaAndRespectsIgnoreCase() {
assertThat(reactiveTemplate.count(query1, Person.class).block()).isEqualTo(3);

Query query2 = new Query(new Criteria().startingWith("VaS", "firstName", Part.IgnoreCaseType.NEVER));
assertThat(reactiveTemplate.count(query2, Person.class).block()).isEqualTo(1);
assertThat(reactiveTemplate.count(query2, Person.class)
.subscribeOn(Schedulers.parallel())
.block()).isEqualTo(1);
}

@Test
public void count_shouldReturnZeroIfNoDocumentsByProvidedCriteriaIsFound() {
Long count = reactiveTemplate.count(new Query(new Criteria().is("nastyushka", "firstName")), Person.class).block();
Long count = reactiveTemplate.count(new Query(new Criteria().is("nastyushka", "firstName")), Person.class)
.subscribeOn(Schedulers.parallel())
.block();

assertThat(count).isZero();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.springframework.data.aerospike.core.reactive;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.policy.GenerationPolicy;
import com.aerospike.client.policy.WritePolicy;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.aerospike.BaseReactiveIntegrationTests;
import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate;
import org.springframework.data.aerospike.sample.Person;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.springframework.data.aerospike.SampleClasses.VersionedClass;
Expand All @@ -19,39 +19,37 @@
*/
public class ReactiveAerospikeTemplateDeleteRelatedTests extends BaseReactiveIntegrationTests {

//TODO: remove me as soon as reactorClient has writePolicyDefault
@Autowired
AerospikeClient client;

@Test
public void deleteByObject_ignoresDocumentVersionEvenIfDefaultGenerationPolicyIsSet() {
GenerationPolicy initialGenerationPolicy = client.writePolicyDefault.generationPolicy;
client.writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
WritePolicy writePolicyDefault = reactorClient.getWritePolicyDefault();
GenerationPolicy initialGenerationPolicy = writePolicyDefault.generationPolicy;
writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
try {
VersionedClass initialDocument = new VersionedClass(id, "a");
reactiveTemplate.insert(initialDocument).block();
reactiveTemplate.update(new VersionedClass(id, "b", initialDocument.version)).block();

Mono<Boolean> deleted = reactiveTemplate.delete(initialDocument);
Mono<Boolean> deleted = reactiveTemplate.delete(initialDocument).subscribeOn(Schedulers.parallel());
StepVerifier.create(deleted).expectNext(true).verifyComplete();
} finally {
client.writePolicyDefault.generationPolicy = initialGenerationPolicy;
writePolicyDefault.generationPolicy = initialGenerationPolicy;
}
}

@Test
public void deleteByObject_ignoresVersionEvenIfDefaultGenerationPolicyIsSet() {
GenerationPolicy initialGenerationPolicy = client.writePolicyDefault.generationPolicy;
client.writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
WritePolicy writePolicyDefault = reactorClient.getWritePolicyDefault();
GenerationPolicy initialGenerationPolicy = writePolicyDefault.generationPolicy;
writePolicyDefault.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
try {
Person initialDocument = new Person(id, "a");
reactiveTemplate.insert(initialDocument).block();
reactiveTemplate.update(new Person(id, "b")).block();

Mono<Boolean> deleted = reactiveTemplate.delete(initialDocument);
Mono<Boolean> deleted = reactiveTemplate.delete(initialDocument).subscribeOn(Schedulers.parallel());
StepVerifier.create(deleted).expectNext(true).verifyComplete();
} finally {
client.writePolicyDefault.generationPolicy = initialGenerationPolicy;
writePolicyDefault.generationPolicy = initialGenerationPolicy;
}
}

Expand All @@ -60,15 +58,15 @@ public void testSimpleDeleteById() {
// given
Person person = new Person(id, "QLastName", 21);

Mono<Person> created = reactiveTemplate.insert(person);
Mono<Person> created = reactiveTemplate.insert(person).subscribeOn(Schedulers.parallel());
StepVerifier.create(created).expectNext(person).verifyComplete();

// when
Mono<Boolean> deleted = reactiveTemplate.delete(id, Person.class);
Mono<Boolean> deleted = reactiveTemplate.delete(id, Person.class).subscribeOn(Schedulers.parallel());
StepVerifier.create(deleted).expectNext(true).verifyComplete();

// then
Mono<Person> result = reactiveTemplate.findById(id, Person.class);
Mono<Person> result = reactiveTemplate.findById(id, Person.class).subscribeOn(Schedulers.parallel());
StepVerifier.create(result).expectComplete().verify();
;
}
Expand All @@ -78,22 +76,22 @@ public void testSimpleDeleteByObject() {
// given
Person person = new Person(id, "QLastName", 21);

Mono<Person> created = reactiveTemplate.insert(person);
Mono<Person> created = reactiveTemplate.insert(person).subscribeOn(Schedulers.parallel());
StepVerifier.create(created).expectNext(person).verifyComplete();

// when
Mono<Boolean> deleted = reactiveTemplate.delete(person);
Mono<Boolean> deleted = reactiveTemplate.delete(person).subscribeOn(Schedulers.parallel());
StepVerifier.create(deleted).expectNext(true).verifyComplete();

// then
Mono<Person> result = reactiveTemplate.findById(id, Person.class);
Mono<Person> result = reactiveTemplate.findById(id, Person.class).subscribeOn(Schedulers.parallel());
StepVerifier.create(result).expectComplete().verify();
}

@Test
public void deleteById_shouldReturnFalseIfValueIsAbsent() {
// when
Mono<Boolean> deleted = reactiveTemplate.delete(id, Person.class);
Mono<Boolean> deleted = reactiveTemplate.delete(id, Person.class).subscribeOn(Schedulers.parallel());

// then
StepVerifier.create(deleted).expectComplete().verify();
Expand All @@ -105,7 +103,7 @@ public void deleteByObject_shouldReturnFalseIfValueIsAbsent() {
Person person = Person.builder().id(id).firstName("tya").emailAddress("gmail.com").build();

// when
Mono<Boolean> deleted = reactiveTemplate.delete(person);
Mono<Boolean> deleted = reactiveTemplate.delete(person).subscribeOn(Schedulers.parallel());

// then
StepVerifier.create(deleted).expectComplete().verify();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.sample.Person;
import org.springframework.data.domain.Sort;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.util.Collections;
Expand Down Expand Up @@ -38,13 +39,16 @@ public void findAll_findsAllExistingDocuments() {
.collect(Collectors.toList());
reactiveTemplate.insertAll(persons).blockLast();

List<Person> result = reactiveTemplate.findAll(Person.class).collectList().block();
List<Person> result = reactiveTemplate.findAll(Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block();
assertThat(result).containsOnlyElementsOf(persons);
}

@Test
public void findAll_findsNothing() {
StepVerifier.create(reactiveTemplate.findAll(Person.class))
StepVerifier.create(reactiveTemplate.findAll(Person.class)
.subscribeOn(Schedulers.parallel()))
.expectNextCount(0)
.verifyComplete();
}
Expand All @@ -55,7 +59,9 @@ public void findInRange_shouldFindLimitedNumberOfDocuments() {
.mapToObj(id -> new Person(nextId(), "Firstname", "Lastname")).collect(Collectors.toList());
reactiveTemplate.insertAll(allUsers).blockLast();

List<Person> actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class).collectList().block();
List<Person> actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block();
assertThat(actual)
.hasSize(5)
.containsAnyElementsOf(allUsers);
Expand All @@ -67,7 +73,9 @@ public void findInRange_shouldFindLimitedNumberOfDocumentsAndSkip() {
.mapToObj(id -> new Person(nextId(), "Firstname", "Lastname")).collect(Collectors.toList());
reactiveTemplate.insertAll(allUsers).blockLast();

List<Person> actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class).collectList().block();
List<Person> actual = reactiveTemplate.findInRange(0, 5, Sort.unsorted(), Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block();

assertThat(actual)
.hasSize(5)
Expand All @@ -79,7 +87,9 @@ public void find_throwsExceptionForUnsortedQueryWithSpecifiedOffsetValue() {
Query query = new Query((Sort) null);
query.setOffset(1);

assertThatThrownBy(() -> reactiveTemplate.find(query, Person.class).collectList().block())
assertThatThrownBy(() -> reactiveTemplate.find(query, Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unsorted query must not have offset value. For retrieving paged results use sorted query.");
}
Expand All @@ -93,7 +103,9 @@ public void find_shouldWorkWithFilterEqual() {

Query query = createQueryForMethodWithArgs("findPersonByFirstName", "Dave");

List<Person> actual = reactiveTemplate.find(query, Person.class).collectList().block();
List<Person> actual = reactiveTemplate.find(query, Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block();
assertThat(actual)
.hasSize(10)
.containsExactlyInAnyOrderElementsOf(allUsers);
Expand All @@ -109,7 +121,9 @@ public void find_shouldWorkWithFilterEqualOrderBy() {

Query query = createQueryForMethodWithArgs("findByLastNameOrderByFirstNameAsc", "Matthews");

List<Person> actual = reactiveTemplate.find(query, Person.class).collectList().block();
List<Person> actual = reactiveTemplate.find(query, Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block();
assertThat(actual)
.hasSize(10)
.containsExactlyElementsOf(allUsers);
Expand All @@ -125,7 +139,9 @@ public void find_shouldWorkWithFilterEqualOrderByDesc() {

Query query = createQueryForMethodWithArgs("findByLastNameOrderByFirstNameDesc", "Matthews");

List<Person> actual = reactiveTemplate.find(query, Person.class).collectList().block();
List<Person> actual = reactiveTemplate.find(query, Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block();
assertThat(actual)
.hasSize(10)
.containsExactlyElementsOf(allUsers);
Expand All @@ -140,7 +156,9 @@ public void find_shouldWorkWithFilterRange() {

Query query = createQueryForMethodWithArgs("findCustomerByAgeBetween", 25, 30);

List<Person> actual = reactiveTemplate.find(query, Person.class).collectList().block();
List<Person> actual = reactiveTemplate.find(query, Person.class)
.subscribeOn(Schedulers.parallel())
.collectList().block();

assertThat(actual)
.hasSize(6)
Expand Down
Loading

0 comments on commit fbc21c5

Please sign in to comment.