Skip to content
This repository has been archived by the owner on Oct 15, 2024. It is now read-only.

Commit

Permalink
fix query for mongoRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinSi96 committed Dec 20, 2023
1 parent 4fec28a commit 465f579
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 22 deletions.
8 changes: 4 additions & 4 deletions app/src/main/resources/swagger/api-docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -4559,7 +4559,7 @@
"post" : {
"tags" : [ "scheduler" ],
"summary" : "startUsers",
"description" : "${swagger.ms-core.scheduler.api.start.users}",
"description" : "Service to resend old user onboardings to DL",
"operationId" : "startUsersUsingPOST",
"parameters" : [ {
"name" : "size",
Expand All @@ -4583,10 +4583,10 @@
}
}, {
"name" : "userId",
"in" : "path",
"in" : "query",
"description" : "userId",
"required" : true,
"style" : "simple",
"required" : false,
"style" : "form",
"schema" : {
"type" : "string"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface UserConnector {
OnboardedUser save(OnboardedUser example);

List<OnboardedUser> findAll();
List<OnboardedUser> findAll(Integer page, Integer size);
List<OnboardedUser> findAll(Integer page, Integer size, String productId);

void deleteById(String id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ public List<OnboardedUser> findAll() {
}

@Override
public List<OnboardedUser> findAll(Integer page, Integer size) {
public List<OnboardedUser> findAll(Integer page, Integer size, String productId) {
Pageable pageable = PageRequest.of(page, size);
return repository.find(null, pageable, UserEntity.class)
Query queryMatch = Query.query(Criteria.where(UserEntity.Fields.bindings.name())
.elemMatch(Criteria.where(UserBinding.Fields.products.name())
.elemMatch(Criteria.where(OnboardedProductEntity.Fields.productId.name()).is(productId)
.and(OnboardedProductEntity.Fields.status.name()).in(List.of(RelationshipState.ACTIVE.name(), RelationshipState.DELETED.name(), RelationshipState.SUSPENDED.name())))));
return repository.find(queryMatch, pageable, UserEntity.class)
.stream()
.map(userMapper::toOnboardedUser)
.collect(Collectors.toList());
Expand Down Expand Up @@ -112,7 +116,7 @@ public List<OnboardedUser> findAllByExistingIds(List<String> users) {
return findAll(users, true);
}

private List<OnboardedUser> findAll(List<String> userIds, boolean existingOnly){
private List<OnboardedUser> findAll(List<String> userIds, boolean existingOnly) {
List<OnboardedUser> userList = new ArrayList<>();
Set<String> remainingUserIds = new HashSet<>(userIds);

Expand All @@ -136,7 +140,7 @@ public void findAndUpdateState(String userId, @Nullable String relationshipId, @
.set(constructQuery(CURRENT_ANY, UserBinding.Fields.products.name(), CURRENT_PRODUCT_REF, OnboardedProduct.Fields.status.name()), state)
.set(constructQuery(CURRENT_ANY, UserBinding.Fields.products.name(), CURRENT_PRODUCT_REF, OnboardedProduct.Fields.updatedAt.name()), OffsetDateTime.now());
if (relationshipId != null) {
update.filterArray(Criteria.where(CURRENT_PRODUCT + OnboardedProduct.Fields.relationshipId.name()).is(relationshipId));
update.filterArray(Criteria.where(CURRENT_PRODUCT + OnboardedProduct.Fields.relationshipId.name()).is(relationshipId));
}
if (token != null) {
update.filterArray(Criteria.where(CURRENT_PRODUCT + OnboardedProduct.Fields.tokenId.name()).is(token.getId()));
Expand All @@ -156,8 +160,8 @@ public void findAndUpdateStateByInstitutionAndProduct(String userId, String inst
.set(constructQuery(CURRENT_USER_BINDING_REF, UserBinding.Fields.products.name(), CURRENT_PRODUCT_REF, OnboardedProduct.Fields.status.name()), state)
.set(constructQuery(CURRENT_USER_BINDING_REF, UserBinding.Fields.products.name(), CURRENT_PRODUCT_REF, OnboardedProduct.Fields.updatedAt.name()), OffsetDateTime.now());

update.filterArray(Criteria.where(CURRENT_PRODUCT + OnboardedProduct.Fields.productId.name()).is(productId)
.and(CURRENT_PRODUCT + OnboardedProduct.Fields.status.name()).is(RelationshipState.ACTIVE.name()));
update.filterArray(Criteria.where(CURRENT_PRODUCT + OnboardedProduct.Fields.productId.name()).is(productId)
.and(CURRENT_PRODUCT + OnboardedProduct.Fields.status.name()).is(RelationshipState.ACTIVE.name()));
update.filterArray(Criteria.where(CURRENT_USER_BINDING + UserBinding.Fields.institutionId.name()).is(institutionId));

FindAndModifyOptions findAndModifyOptions = FindAndModifyOptions.options().upsert(false).returnNew(false);
Expand Down Expand Up @@ -309,11 +313,11 @@ public List<UserInstitutionAggregation> getUserInfo(String userId, String instit
UnwindOperation unwindBindings = Aggregation.unwind("$bindings");
UnwindOperation unwindProducts = Aggregation.unwind("$bindings.products");

if(Objects.nonNull(states) && states.length > 0) {
if (Objects.nonNull(states) && states.length > 0) {
criterias.add(Criteria.where("bindings.products.status").in(states));
}

if(Objects.nonNull(institutionId)) {
if (Objects.nonNull(institutionId)) {
criterias.add(Criteria.where("bindings.institutionId").is(institutionId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SchedulerServiceImpl implements SchedulerService{
private final ContractService contractService;
private final UserEventService userEventService;
private Optional<Integer> page_size_api = Optional.empty();

private Optional<Integer> page = Optional.empty();
private final TokenConnector tokenConnector;
private final InstitutionConnector institutionConnector;
private Optional<List<String>> productsFilter;
Expand Down Expand Up @@ -109,17 +109,17 @@ private void regenerateUserNotifications(Optional<String> userId){
if (productsFilter.isPresent()){
for (String productId: productsFilter.get()){
boolean nextPage = true;
int page = 0;
int page = this.page.orElse(0);
if (userId.isPresent()){
OnboardedUser user = userConnector.findById(userId.get());
userEventService.sendOnboardedUserNotification(user, productId);
}
else {
do {
List<OnboardedUser> users = userConnector.findAll(page, page_size_api.orElse(USER_PAGE_SIZE));
List<OnboardedUser> users = userConnector.findAll(page, page_size_api.orElse(USER_PAGE_SIZE), productId);
sendDataLakeUserNotifications(users, productId);
page += 1;
if (users.size() < USER_PAGE_SIZE) {
if (users.size() < USER_PAGE_SIZE || this.page.isPresent()) {
nextPage = false;
log.debug("[KAFKA] USER TOTAL NUMBER {}", page * USER_PAGE_SIZE + users.size());
}
Expand All @@ -142,9 +142,10 @@ public void startScheduler(Optional<Integer> size, List<String> productsFilter)
}

@Override
public void startUsersScheduler(Optional<Integer> size, List<String> productsFilter, Optional<String> userId) {
public void startUsersScheduler(Optional<Integer> size, Optional<Integer> page, List<String> productsFilter, Optional<String> userId) {
this.page_size_api = size;
this.productsFilter = Optional.of(productsFilter);
this.page=page;
regenerateUserNotifications(userId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void sendLegalTokenUserNotification(Token token) {
List<UserToNotify> usersToNotify = toUserToNotify(tokenUser.getUserId(), token.getInstitutionId(), token.getProductId(), Optional.empty(), Optional.of(token.getId()));
usersToNotify.forEach(user -> {
UserNotificationToSend notification = notificationMapper.setNotificationDetailsFromToken(token, user, QueueEvent.ADD);
String id = user.getUserId().concat(notification.getInstitutionId()).concat(notification.getProductId()).concat(user.getProductRole());
String id = user.getUserId().concat("_"+notification.getInstitutionId()).concat("_"+notification.getProductId()).concat("_"+user.getProductRole());
notification.setId(id);
try {
String msg = mapper.writeValueAsString(notification);
Expand All @@ -101,7 +101,7 @@ public void sendOnboardedUserNotification(OnboardedUser onboardedUser, String pr
for (OnboardedProduct onboardedProduct : userBinding.getProducts()) {
if (productId.equals(onboardedProduct.getProductId()) && ALLOWED_RELATIONSHIP_STATUSES.contains(onboardedProduct.getStatus())) {
UserNotificationToSend notification = notificationMapper.setNotificationDetailsFromOnboardedProduct(toUserToNotify(user.getId(), userBinding.getInstitutionId(), user, onboardedProduct), onboardedProduct, userBinding.getInstitutionId());
String id = user.getId().concat(notification.getInstitutionId()).concat(notification.getProductId()).concat(onboardedProduct.getProductRole());
String id = user.getId().concat("_"+notification.getInstitutionId()).concat("_"+notification.getProductId()).concat("_"+onboardedProduct.getProductRole());
notification.setId(id);
try {
String msg = mapper.writeValueAsString(notification);
Expand Down Expand Up @@ -171,7 +171,7 @@ public void sendOperatorUserNotification(RelationshipInfo relationshipInfo, Queu
log.debug(LogUtils.CONFIDENTIAL_MARKER, "Notification to send to the data lake, notification: {}", relationshipInfo);
usersToNotify.forEach(user -> {
UserNotificationToSend notification = notificationMapper.setNotificationDetailsFromRelationship(relationshipInfo, user, eventType);
String id = user.getUserId().concat(notification.getInstitutionId()).concat(notification.getProductId()).concat(user.getProductRole());
String id = user.getUserId().concat("_"+notification.getInstitutionId()).concat("_"+notification.getProductId()).concat("_"+user.getProductRole());
notification.setId(id);
try {
String msg = mapper.writeValueAsString(notification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ public void start(@RequestParam(name = "size", required = false) Optional<Intege
@PostMapping(value = "/users")
@ResponseStatus(HttpStatus.OK)
public void startUsers(@RequestParam(name = "size", required = false)Optional<Integer> size,
@RequestParam(name = "page", required = false)Optional<Integer> page,
@RequestParam(name = "productsFilter")List<String> productsFilter,
@RequestParam(name = "userId", required = false)Optional<String> userId){
log.trace("Scheduler started for Users");
schedulerService.startUsersScheduler(size, productsFilter, userId);
schedulerService.startUsersScheduler(size, page, productsFilter, userId);
}
}
1 change: 1 addition & 0 deletions web/src/main/resources/swagger/swagger_en.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ swagger.mscore.external.geotaxonomies=retrieves the geographic taxonomies relate
swagger.mscore.external.institution.relationships=returns the relationships related to the institution
swagger.mscore.institutions=Gets institutions filtering by taxCode and/or subunitCode
swagger.ms-core.scheduler.api.start=Service to start scheduler to resend old messages to DL
swagger.ms-core.scheduler.api.start.users=Service to resend old user onboardings to DL
swagger.mscore.institution.create.from-ipa=create an institution from ipa registry
swagger.mscore.institution.create.from-ivass=create an institution from ivass CSV
swagger.mscore.institution.create.from-infocamere=create an institution from infocamere registry
Expand Down

0 comments on commit 465f579

Please sign in to comment.