From 44b7f53ed2a0813815df18e6ce6085b7671a77d5 Mon Sep 17 00:00:00 2001 From: agrgr Date: Mon, 25 Nov 2024 15:48:59 +0200 Subject: [PATCH 1/3] align reactive queryMethod execute() with sync flow --- .../query/ReactiveAerospikePartTreeQuery.java | 13 ++++++++- .../query/reactive/find/NotEqualTests.java | 29 +++++++++++++++---- .../sample/ReactiveCustomerRepository.java | 5 ++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java b/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java index 3cef6291..fb9de022 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java +++ b/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java @@ -55,6 +55,7 @@ public ReactiveAerospikePartTreeQuery(QueryMethod queryMethod, } @Override + @SuppressWarnings({"NullableProblems"}) public Object execute(Object[] parameters) { ParametersParameterAccessor accessor = new ParametersParameterAccessor(queryMethod.getParameters(), parameters); Query query = prepareQuery(parameters, accessor); @@ -100,8 +101,18 @@ public Object execute(Object[] parameters) { } return getPage(unprocessedResults, size, pageable, query); }); + } else if (queryMethod.isStreamQuery()) { + throw new UnsupportedOperationException( + "Automatic converting of an async stream to a blocking sync Stream is not supported"); + } else if (queryMethod.isCollectionQuery()) { + return findByQuery(query, targetClass).collectList(); } - return findByQuery(query, targetClass); + else if (queryMethod.isQueryForEntity()) { + // Queries with Flux and Mono return types + return findByQuery(query, targetClass); + } + throw new UnsupportedOperationException("Query method " + queryMethod.getNamedQueryName() + " is not " + + "supported"); } protected Object runQueryWithIds(Class targetClass, List ids, Query query) { diff --git a/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java b/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java index 988e5176..2aea3bed 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java @@ -2,8 +2,7 @@ import org.junit.jupiter.api.Test; import org.springframework.data.aerospike.repository.query.reactive.ReactiveCustomerRepositoryQueryTests; -import org.springframework.data.aerospike.sample.Customer; -import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; import java.util.List; @@ -15,10 +14,28 @@ public class NotEqualTests extends ReactiveCustomerRepositoryQueryTests { @Test - public void findBySimplePropertyNot() { - List results = reactiveRepository.findByLastNameNot("Simpson") - .subscribeOn(Schedulers.parallel()).collectList().block(); + public void findBySimplePropertyNotEqual_String() { + StepVerifier.create(reactiveRepository.findByLastNameNot("Simpson")) + .recordWith(List::of) + .consumeRecordedWith(customers -> { + assertThat(customers).containsExactlyInAnyOrderElementsOf(List.of(matt, leela, fry)); + }) + .expectComplete(); - assertThat(results).contains(matt); + StepVerifier.create(reactiveRepository.findByFirstNameNotIgnoreCase("SimpSon")) + // this query returns Mono + .expectNextMatches(customers -> { + assertThat(customers).containsExactlyInAnyOrderElementsOf(List.of(matt, leela, fry)); + return false; + }) + .expectComplete(); + + StepVerifier.create(reactiveRepository.findOneByLastNameNot("Simpson")) + // this query returns Mono + .expectNextMatches(customer -> { + assertThat(customer).isIn(List.of(matt, leela, fry)); + return false; + }) + .expectComplete(); } } diff --git a/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java b/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java index 25a3b1de..a2d483c7 100644 --- a/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java +++ b/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java @@ -21,6 +21,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Collection; import java.util.List; /** @@ -40,6 +41,10 @@ public interface ReactiveCustomerRepository extends ReactiveAerospikeRepository< Flux findByLastNameNot(String lastName); + Mono findOneByLastNameNot(String lastName); + + Mono> findByFirstNameNotIgnoreCase(String lastName); + Mono findOneByLastName(String lastName); Flux findByLastNameOrderByFirstNameAsc(String lastName); From 8e317a17b2459cfcaa885746a6d794885ebfeb10 Mon Sep 17 00:00:00 2001 From: agrgr Date: Tue, 26 Nov 2024 14:38:47 +0200 Subject: [PATCH 2/3] process projections --- .../repository/query/AerospikePartTreeQuery.java | 1 + .../query/BaseAerospikePartTreeQuery.java | 13 ++++++++++++- .../query/ReactiveAerospikePartTreeQuery.java | 4 ++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/repository/query/AerospikePartTreeQuery.java b/src/main/java/org/springframework/data/aerospike/repository/query/AerospikePartTreeQuery.java index 606d6c40..87e90761 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/query/AerospikePartTreeQuery.java +++ b/src/main/java/org/springframework/data/aerospike/repository/query/AerospikePartTreeQuery.java @@ -86,6 +86,7 @@ public Object execute(Object[] parameters) { } else if (queryMethod.isStreamQuery()) { return findByQuery(query, targetClass); } else if (queryMethod.isCollectionQuery()) { + // All queries with Collection return type including projections return findByQuery(query, targetClass).collect(Collectors.toList()); } else if (queryMethod.isQueryForEntity()) { Stream result = findByQuery(query, targetClass); diff --git a/src/main/java/org/springframework/data/aerospike/repository/query/BaseAerospikePartTreeQuery.java b/src/main/java/org/springframework/data/aerospike/repository/query/BaseAerospikePartTreeQuery.java index 7f860a33..bdda8a27 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/query/BaseAerospikePartTreeQuery.java +++ b/src/main/java/org/springframework/data/aerospike/repository/query/BaseAerospikePartTreeQuery.java @@ -114,7 +114,7 @@ Class getTargetClass(ParametersParameterAccessor accessor) { return accessor.findDynamicProjection(); } // DTO projection - if (queryMethod.getReturnedObjectType() != queryMethod.getEntityInformation().getJavaType()) { + if (!isEntityAssignableFromReturnType(queryMethod)) { return queryMethod.getReturnedObjectType(); } // No projection - target class will be the entity class. @@ -170,4 +170,15 @@ protected boolean isCountQuery(QueryMethod queryMethod) { protected boolean isDeleteQuery(QueryMethod queryMethod) { return queryMethod.getName().startsWith("deleteBy") || queryMethod.getName().startsWith("removeBy"); } + + /** + * Find whether entity domain class is assignable from query method's returned object class. + * Not assignable when using a detached DTO (data transfer object, e.g., for projections). + * + * @param queryMethod QueryMethod in use + * @return true when entity is assignable from query method's return class, otherwise false + */ + protected boolean isEntityAssignableFromReturnType(QueryMethod queryMethod) { + return queryMethod.getEntityInformation().getJavaType().isAssignableFrom(queryMethod.getReturnedObjectType()); + } } diff --git a/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java b/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java index fb9de022..33340072 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java +++ b/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java @@ -107,8 +107,8 @@ public Object execute(Object[] parameters) { } else if (queryMethod.isCollectionQuery()) { return findByQuery(query, targetClass).collectList(); } - else if (queryMethod.isQueryForEntity()) { - // Queries with Flux and Mono return types + else if (queryMethod.isQueryForEntity() || !isEntityAssignableFromReturnType(queryMethod)) { + // Queries with Flux and Mono return types including projection queries return findByQuery(query, targetClass); } throw new UnsupportedOperationException("Query method " + queryMethod.getNamedQueryName() + " is not " + From b9ce85577f746c20b02da281dc12716c1aa2c702 Mon Sep 17 00:00:00 2001 From: agrgr Date: Sun, 1 Dec 2024 17:44:08 +0200 Subject: [PATCH 3/3] reactive flow: add support for returning stream, add reactive Customer negative tests repository --- .../query/ReactiveAerospikePartTreeQuery.java | 5 +-- .../ReactiveCustomerRepositoryQueryTests.java | 3 ++ .../query/reactive/find/NotEqualTests.java | 10 ++++++ ...activeCustomerNegativeTestsRepository.java | 33 +++++++++++++++++++ .../sample/ReactiveCustomerRepository.java | 3 ++ 5 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerNegativeTestsRepository.java diff --git a/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java b/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java index 33340072..09745d94 100644 --- a/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java +++ b/src/main/java/org/springframework/data/aerospike/repository/query/ReactiveAerospikePartTreeQuery.java @@ -102,9 +102,10 @@ public Object execute(Object[] parameters) { return getPage(unprocessedResults, size, pageable, query); }); } else if (queryMethod.isStreamQuery()) { - throw new UnsupportedOperationException( - "Automatic converting of an async stream to a blocking sync Stream is not supported"); + return findByQuery(query, targetClass).toStream(); } else if (queryMethod.isCollectionQuery()) { + // Currently there seems to be no way to distinguish return type Collection from Mono etc., + // so a query method with return type Collection will compile but throw ClassCastException in runtime return findByQuery(query, targetClass).collectList(); } else if (queryMethod.isQueryForEntity() || !isEntityAssignableFromReturnType(queryMethod)) { diff --git a/src/test/java/org/springframework/data/aerospike/repository/query/reactive/ReactiveCustomerRepositoryQueryTests.java b/src/test/java/org/springframework/data/aerospike/repository/query/reactive/ReactiveCustomerRepositoryQueryTests.java index 70904072..6072fc97 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/query/reactive/ReactiveCustomerRepositoryQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/query/reactive/ReactiveCustomerRepositoryQueryTests.java @@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.aerospike.BaseReactiveIntegrationTests; import org.springframework.data.aerospike.sample.Customer; +import org.springframework.data.aerospike.sample.ReactiveCustomerNegativeTestsRepository; import org.springframework.data.aerospike.sample.ReactiveCustomerRepository; import java.util.List; @@ -34,6 +35,8 @@ public class ReactiveCustomerRepositoryQueryTests extends BaseReactiveIntegratio @Autowired protected ReactiveCustomerRepository reactiveRepository; + @Autowired + protected ReactiveCustomerNegativeTestsRepository negativeTestsReactiveRepository; @BeforeAll void beforeAll() { diff --git a/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java b/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java index 2aea3bed..68bd6ec7 100644 --- a/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java +++ b/src/test/java/org/springframework/data/aerospike/repository/query/reactive/find/NotEqualTests.java @@ -2,11 +2,14 @@ import org.junit.jupiter.api.Test; import org.springframework.data.aerospike.repository.query.reactive.ReactiveCustomerRepositoryQueryTests; +import org.springframework.data.aerospike.sample.Customer; import reactor.test.StepVerifier; import java.util.List; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for the "Is not equal" reactive repository query. Keywords: Not, IsNot. @@ -37,5 +40,12 @@ public void findBySimplePropertyNotEqual_String() { return false; }) .expectComplete(); + + Stream customersStream = reactiveRepository.findByFirstNameNot("Simpson"); + assertThat(customersStream.toList()).containsExactlyInAnyOrderElementsOf(allCustomers); + + assertThatThrownBy(() -> negativeTestsReactiveRepository.findByLastNameNotIgnoreCase("Simpson")) + .isInstanceOf(ClassCastException.class) + .hasMessageContaining("cannot be cast"); } } diff --git a/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerNegativeTestsRepository.java b/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerNegativeTestsRepository.java new file mode 100644 index 00000000..caa7ca43 --- /dev/null +++ b/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerNegativeTestsRepository.java @@ -0,0 +1,33 @@ +/* + * Copyright 2012-2024 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.aerospike.sample; + +import org.springframework.data.aerospike.repository.ReactiveAerospikeRepository; + +import java.util.List; + +/** + * This repository acts as a storage for invalid method names used for testing. For actual repository see + * {@link ReactiveCustomerRepository} + */ +public interface ReactiveCustomerNegativeTestsRepository extends ReactiveAerospikeRepository { + + /** + * ClassCastException, cannot be automatically cast to sync List using reactive Repository. + * See {@link ReactiveCustomerRepository} for examples of used query methods return types + */ + List findByLastNameNotIgnoreCase(String lastName); +} diff --git a/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java b/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java index a2d483c7..95c49cd5 100644 --- a/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java +++ b/src/test/java/org/springframework/data/aerospike/sample/ReactiveCustomerRepository.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; +import java.util.stream.Stream; /** * Simple reactive repository interface managing {@link Customer}s. @@ -43,6 +44,8 @@ public interface ReactiveCustomerRepository extends ReactiveAerospikeRepository< Mono findOneByLastNameNot(String lastName); + Stream findByFirstNameNot(String lastName); + Mono> findByFirstNameNotIgnoreCase(String lastName); Mono findOneByLastName(String lastName);