Skip to content
This repository has been archived by the owner on Oct 15, 2024. It is now read-only.

Commit

Permalink
[SELC-4677] refactor: remove users APIs in QueueNotificationControlle…
Browse files Browse the repository at this point in the history
…r and related methods (#485)
  • Loading branch information
giulia-tremolada authored May 10, 2024
1 parent f48fc4c commit 9614f94
Show file tree
Hide file tree
Showing 9 changed files with 11 additions and 420 deletions.
154 changes: 0 additions & 154 deletions app/src/main/resources/swagger/api-docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -2662,135 +2662,6 @@
} ]
}
},
"/notification-event/users" : {
"post" : {
"tags" : [ "kafka" ],
"summary" : "resendUsers",
"description" : "Service to resend old user onboardings to the SCUsers kafka queue, it can send the onboardings of a single user or also retrieve all the users for a given set of products in a paged manner by passing page and size",
"operationId" : "resendUsersUsingPOST",
"parameters" : [ {
"name" : "size",
"in" : "query",
"description" : "size",
"required" : false,
"style" : "form",
"schema" : {
"type" : "integer",
"format" : "int32"
}
}, {
"name" : "page",
"in" : "query",
"description" : "page",
"required" : false,
"style" : "form",
"schema" : {
"type" : "integer",
"format" : "int32"
}
}, {
"name" : "productsFilter",
"in" : "query",
"description" : "productsFilter",
"required" : true,
"style" : "form",
"explode" : true,
"schema" : {
"type" : "string"
}
}, {
"name" : "userId",
"in" : "query",
"description" : "userId",
"required" : false,
"style" : "form",
"schema" : {
"type" : "string"
}
} ],
"responses" : {
"200" : {
"description" : "OK"
},
"400" : {
"description" : "Bad Request",
"content" : {
"application/problem+json" : {
"schema" : {
"$ref" : "#/components/schemas/Problem"
}
}
}
},
"404" : {
"description" : "Not Found",
"content" : {
"application/problem+json" : {
"schema" : {
"$ref" : "#/components/schemas/Problem"
}
}
}
},
"409" : {
"description" : "Conflict",
"content" : {
"application/problem+json" : {
"schema" : {
"$ref" : "#/components/schemas/Problem"
}
}
}
}
},
"security" : [ {
"bearerAuth" : [ "global" ]
} ]
}
},
"/notification-event/users/count" : {
"get" : {
"tags" : [ "kafka" ],
"summary" : "countUsers",
"description" : "Users' Count for single product",
"operationId" : "countUsersUsingGET",
"responses" : {
"200" : {
"description" : "OK",
"content" : {
"*/*" : {
"schema" : {
"$ref" : "#/components/schemas/ProductCountResponse"
}
}
}
},
"400" : {
"description" : "Bad Request",
"content" : {
"application/problem+json" : {
"schema" : {
"$ref" : "#/components/schemas/Problem"
}
}
}
},
"404" : {
"description" : "Not Found",
"content" : {
"application/problem+json" : {
"schema" : {
"$ref" : "#/components/schemas/Problem"
}
}
}
}
},
"security" : [ {
"bearerAuth" : [ "global" ]
} ]
}
},
"/tokens" : {
"get" : {
"tags" : [ "Token" ],
Expand Down Expand Up @@ -5589,31 +5460,6 @@
}
}
},
"ProductCount" : {
"title" : "ProductCount",
"type" : "object",
"properties" : {
"count" : {
"type" : "integer",
"format" : "int32"
},
"productId" : {
"type" : "string"
}
}
},
"ProductCountResponse" : {
"title" : "ProductCountResponse",
"type" : "object",
"properties" : {
"products" : {
"type" : "array",
"items" : {
"$ref" : "#/components/schemas/ProductCount"
}
}
}
},
"ProductInfo" : {
"title" : "ProductInfo",
"type" : "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import it.pagopa.selfcare.commons.base.security.PartyRole;
import it.pagopa.selfcare.mscore.constant.RelationshipState;
import it.pagopa.selfcare.mscore.model.aggregation.QueryCount;
import it.pagopa.selfcare.mscore.model.aggregation.UserInstitutionAggregation;
import it.pagopa.selfcare.mscore.model.aggregation.UserInstitutionFilter;
import it.pagopa.selfcare.mscore.model.onboarding.OnboardedProduct;
Expand Down Expand Up @@ -61,5 +60,4 @@ public interface UserConnector {

List<UserInstitutionAggregation> getUserInfo(String userId, String institutionId, String[] states);

List<QueryCount> countUsers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import it.pagopa.selfcare.mscore.constant.RelationshipState;
import it.pagopa.selfcare.mscore.exception.InvalidRequestException;
import it.pagopa.selfcare.mscore.exception.ResourceNotFoundException;
import it.pagopa.selfcare.mscore.model.aggregation.QueryCount;
import it.pagopa.selfcare.mscore.model.aggregation.UserInstitutionAggregation;
import it.pagopa.selfcare.mscore.model.aggregation.UserInstitutionFilter;
import it.pagopa.selfcare.mscore.model.onboarding.OnboardedProduct;
Expand All @@ -29,7 +28,10 @@
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.core.FindAndModifyOptions;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.GraphLookupOperation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.aggregation.UnwindOperation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
Expand All @@ -43,7 +45,6 @@
import java.util.stream.Collectors;

import static it.pagopa.selfcare.mscore.constant.CustomError.*;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.group;

@Slf4j
@Component
Expand Down Expand Up @@ -424,14 +425,4 @@ private String constructQuery(String... variables) {
Arrays.stream(variables).forEach(s -> builder.append(".").append(s));
return builder.toString();
}

@Override
public List<QueryCount> countUsers() {
UnwindOperation unwindBindings = Aggregation.unwind("$bindings", "binding", true);
UnwindOperation unwindProducts = Aggregation.unwind("$bindings.products", "products", true);
MatchOperation matchStatusId = Aggregation.match(Criteria.where("bindings.products.status").in(VALID_USER_RELATIONSHIPS));
GroupOperation productCount = group("bindings.products.productId").count().as("count");
Aggregation aggregation = Aggregation.newAggregation(unwindBindings, unwindProducts, matchStatusId, productCount);
return mongoOperations.aggregate(aggregation, "User", QueryCount.class).getMappedResults();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import it.pagopa.selfcare.mscore.constant.Env;
import it.pagopa.selfcare.mscore.constant.RelationshipState;
import it.pagopa.selfcare.mscore.exception.ResourceNotFoundException;
import it.pagopa.selfcare.mscore.model.aggregation.QueryCount;
import it.pagopa.selfcare.mscore.model.aggregation.UserInstitutionAggregation;
import it.pagopa.selfcare.mscore.model.aggregation.UserInstitutionBinding;
import it.pagopa.selfcare.mscore.model.aggregation.UserInstitutionFilter;
Expand Down Expand Up @@ -679,29 +678,4 @@ void findUsersByInstitutionIdAndProductId(){
assertEquals(1, userIds.size());
}


@Test
void countUser() {
//Given
AggregationResults<Object> results = mock(AggregationResults.class);

when(results.getMappedResults()).thenReturn(List.of(
new QueryCount("prod1", 1),
new QueryCount("prod2", 2),
new QueryCount("prod3", 3)));

//When
when(mongoTemplate.aggregate(any(Aggregation.class), anyString(), any())).
thenReturn(results);

List<QueryCount> response = userConnectorImpl.countUsers();

//Then
assertNotNull(response);
assertFalse(response.isEmpty());
QueryCount actual = response.get(0);

assertEquals(actual.getCount(), 1);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package it.pagopa.selfcare.mscore.core;

import it.pagopa.selfcare.mscore.model.aggregation.QueryCount;
import org.springframework.scheduling.annotation.Async;

import java.util.List;
Expand All @@ -12,8 +11,4 @@ public interface QueueNotificationService {
void sendContractsNotificationsByInstitutionIdAndTokenId(String tokenId, String institutionId);

void sendContracts(Optional<Integer> size, List<String> productsFilter);

void sendUsers(Optional<Integer> size, Optional<Integer> page, List<String> productsFilter, Optional<String> userId);

List<QueryCount> countUsers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

import it.pagopa.selfcare.mscore.api.InstitutionConnector;
import it.pagopa.selfcare.mscore.api.TokenConnector;
import it.pagopa.selfcare.mscore.api.UserConnector;
import it.pagopa.selfcare.mscore.constant.RelationshipState;
import it.pagopa.selfcare.mscore.exception.ResourceNotFoundException;
import it.pagopa.selfcare.mscore.model.QueueEvent;
import it.pagopa.selfcare.mscore.model.aggregation.QueryCount;
import it.pagopa.selfcare.mscore.model.institution.Institution;
import it.pagopa.selfcare.mscore.model.institution.Onboarding;
import it.pagopa.selfcare.mscore.model.onboarding.OnboardedUser;
import it.pagopa.selfcare.mscore.model.onboarding.Token;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -28,25 +25,19 @@ public class QueueNotificationServiceImpl implements QueueNotificationService {
public static final int TOKEN_PAGE_SIZE = 100;
public static final int USER_PAGE_SIZE = 100;
private final ContractEventNotificationService contractService;
private final UserEventService userEventService;
private Optional<Integer> page_size_api = Optional.empty();
private Optional<Integer> page = Optional.empty();
private final TokenConnector tokenConnector;
private final InstitutionConnector institutionConnector;
private Optional<List<String>> productsFilter = Optional.empty();

private final UserConnector userConnector;

private final List<RelationshipState> statesToSend = List.of(RelationshipState.ACTIVE, RelationshipState.DELETED);


@Autowired
public QueueNotificationServiceImpl(ContractEventNotificationService contractService,
UserEventService userEventService,
TokenConnector tokenConnector,
InstitutionConnector institutionConnector, UserConnector userConnector) {
this.userEventService = userEventService;
this.userConnector = userConnector;
InstitutionConnector institutionConnector) {
log.info("Initializing {}...", QueueNotificationServiceImpl.class.getSimpleName());
this.contractService = contractService;
this.tokenConnector = tokenConnector;
Expand Down Expand Up @@ -134,40 +125,6 @@ private void sendScContractNotifications(List<Token> tokens) {
});
}

private void sendDataLakeUserNotifications(List<OnboardedUser> users, String productId){
users.forEach(onboardedUser -> {
userEventService.sendOnboardedUserNotification(onboardedUser, productId);
});
}

@Async
public void regenerateUserNotifications(Optional<String> userId){
if (productsFilter.isPresent()){
for (String productId: productsFilter.get()){
boolean nextPage = true;
int page = this.page.orElse(0);
if (userId.isPresent()){
OnboardedUser user = userConnector.findById(userId.get());
userEventService.sendOnboardedUserNotification(user, productId);
}
else {
do {
List<OnboardedUser> users = userConnector.findAllValidUsers(page, page_size_api.orElse(USER_PAGE_SIZE), productId);
sendDataLakeUserNotifications(users, productId);
page += 1;
if (users.size() < USER_PAGE_SIZE || this.page.isPresent()) {
nextPage = false;
log.debug("[KAFKA] USER TOTAL NUMBER {}", page * USER_PAGE_SIZE + users.size());
}
}while(nextPage);
}
page_size_api = Optional.empty();
}

}

}

@Async
@Override
public void sendContracts(Optional<Integer> size, List<String> productsFilter) {
Expand All @@ -176,17 +133,4 @@ public void sendContracts(Optional<Integer> size, List<String> productsFilter) {
regenerateContractsNotifications();
}

@Async
@Override
public void sendUsers(Optional<Integer> size, Optional<Integer> page, List<String> productsFilter, Optional<String> userId) {
this.page_size_api = size;
this.productsFilter = Optional.ofNullable(productsFilter);
this.page=page;
regenerateUserNotifications(userId);
}

@Override
public List<QueryCount> countUsers() {
return userConnector.countUsers();
}
}
Loading

0 comments on commit 9614f94

Please sign in to comment.