Skip to content

Commit

Permalink
⚡ (interservice communication) switch to amqp rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias-Pe committed Aug 9, 2024
1 parent 7278b14 commit fb30d36
Show file tree
Hide file tree
Showing 23 changed files with 399 additions and 63 deletions.
8 changes: 0 additions & 8 deletions feedservice/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-micrometer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableFeignClients
@EnableCaching
public class FeedserviceApplication {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,21 +1,57 @@
package edu.hm.peslalz.thesis.feedservice.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.hm.peslalz.thesis.feedservice.entity.PagedPostResponse;
import edu.hm.peslalz.thesis.feedservice.entity.PostDTO;
import edu.hm.peslalz.thesis.feedservice.entity.PostsRequestDTO;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.data.domain.Page;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ResponseStatusException;

@Component
public class PostClient {
private final DirectExchange postExchange;
private final RabbitTemplate template;

public PostClient(DirectExchange postExchange, RabbitTemplate template) {
this.postExchange = postExchange;
this.template = template;
}

@FeignClient("postservice")
public interface PostClient {
@Cacheable("posts")
@GetMapping("posts")
PagedPostResponse getPosts(@RequestParam(required = false) String category, @RequestParam(required = false) Integer userId, @RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "50") int size);
public PagedPostResponse getPosts(String category, Integer userId, int page, int size) {
PostsRequestDTO postsRequestDTO = new PostsRequestDTO(category, userId, page, size);
ObjectMapper mapper = new ObjectMapper();
String message;
try {
message = mapper.writeValueAsString(postsRequestDTO);
} catch (JsonProcessingException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Could not process request to json", e);
}
String response = (String) this.template.convertSendAndReceive(postExchange.getName(), "rpc-posts", message);
PagedPostResponse pagedPostResponse;
try {
pagedPostResponse = mapper.readValue(response, PagedPostResponse.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return pagedPostResponse;
}

@Cacheable("post")
@GetMapping(value = "posts/{id}")
PostDTO getPost(@PathVariable int id);
public PostDTO getPost(int id) {
String response = (String) this.template.convertSendAndReceive(postExchange.getName(), "rpc-post", id);
ObjectMapper mapper = new ObjectMapper();
PostDTO postDTO;
try {
postDTO = mapper.readValue(response, PostDTO.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return postDTO;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package edu.hm.peslalz.thesis.feedservice.client;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
public class RabbitmqQueuesConfig {
@Bean
Expand All @@ -15,4 +19,21 @@ public Queue postFeedQueue() {
public Queue postActionFeedQueue() {
return new Queue("post-action-feed");
}

@Bean
public DirectExchange postExchange() {
return new DirectExchange("postservice.rpc");
}

@Bean
public DirectExchange trendExchange() {
return new DirectExchange("statisticservice.rpc");
}

@Bean
public SimpleMessageConverter converter() {
SimpleMessageConverter converter = new SimpleMessageConverter();
converter.setAllowedListPatterns(List.of("java.*"));
return converter;
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,47 @@
package edu.hm.peslalz.thesis.feedservice.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.hm.peslalz.thesis.feedservice.entity.PagedTrendResponse;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.stereotype.Component;

@Component
public class TrendClient {
private final DirectExchange trendExchange;
private final RabbitTemplate template;

public TrendClient(DirectExchange trendExchange, RabbitTemplate template) {
this.trendExchange = trendExchange;
this.template = template;
}

@FeignClient("statisticservice")
public interface TrendClient {
@Cacheable("trends/categories")
@GetMapping("trends/categories")
PagedTrendResponse getTrendingCategories(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "50") int size);
public PagedTrendResponse getTrendingCategories(int page) {
return getPagedTrendResponse("rpc-categories", page);
}

@Cacheable("trends/posts")
@GetMapping("trends/posts")
PagedTrendResponse getTrendingPosts(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "50") int size);
public PagedTrendResponse getTrendingPosts(int page){
return getPagedTrendResponse("rpc-posts", page);
}

@Cacheable("trends/users")
@GetMapping("trends/users")
PagedTrendResponse getTrendingUsers(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "50") int size);
public PagedTrendResponse getTrendingUsers(int page){
return getPagedTrendResponse("rpc-users", page);
}

private PagedTrendResponse getPagedTrendResponse(String routingKey, int page) {
String response = (String) this.template.convertSendAndReceive(trendExchange.getName(), routingKey, page);
ObjectMapper mapper = new ObjectMapper();
PagedTrendResponse pagedTrendResponse;
try {
pagedTrendResponse = mapper.readValue(response, PagedTrendResponse.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return pagedTrendResponse;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.hm.peslalz.thesis.feedservice.entity;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.RequiredArgsConstructor;

Expand All @@ -9,6 +10,7 @@

@RequiredArgsConstructor
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class PagedPostResponse implements Serializable {
public PagedPostResponse(List<PostDTO> content) {
this.content = content;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.hm.peslalz.thesis.feedservice.entity;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.RequiredArgsConstructor;

Expand All @@ -9,6 +10,7 @@

@RequiredArgsConstructor
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class PagedTrendResponse implements Serializable {
public PagedTrendResponse(List<Trend> content) {
this.content = content;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package edu.hm.peslalz.thesis.feedservice.entity;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;

import java.io.Serial;
import java.io.Serializable;
import java.util.List;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class PostDTO implements Serializable {
@Serial
private static final long serialVersionUID = 3L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package edu.hm.peslalz.thesis.feedservice.entity;

import java.io.Serial;
import java.io.Serializable;

public record PostsRequestDTO(String category, Integer userId, int page, int size) implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ public FeedService(TrendClient trendClient, PostClient postClient, UserPreferenc
public Slice<PostDTO> getPersonalizedFeed(int userId, int page) {
// Fetching trending categories, users, and posts in parallel
CompletableFuture<List<String>> trendingCategoriesFuture = CompletableFuture.supplyAsync(() ->
trendClient.getTrendingCategories(0, 5).getContent()
trendClient.getTrendingCategories(0).getContent()
.stream()
.map(Trend::identifier)
.toList()
);

CompletableFuture<List<String>> trendingUsersFuture = CompletableFuture.supplyAsync(() ->
trendClient.getTrendingUsers(0, 5).getContent()
trendClient.getTrendingUsers(0).getContent()
.stream()
.map(Trend::identifier)
.toList()
);

CompletableFuture<PagedTrendResponse> trendingPostsFuture = CompletableFuture.supplyAsync(() ->
trendClient.getTrendingPosts(0, 10)
trendClient.getTrendingPosts(0)
);

// load preferences from repository
Expand Down
5 changes: 1 addition & 4 deletions feedservice/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: 5672
channel-rpc-timeout: 5000
listener:
simple:
consumer-batch-enabled: true
Expand All @@ -37,10 +38,6 @@ spring:
threads:
virtual:
enabled: true
cloud:
openfeign:
httpclient:
connection-timeout: 500
mvc:
async:
request-timeout: 5000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ void receivePreferences() {

@Test
void testFeed() throws Exception {
Mockito.when(trendClient.getTrendingCategories(anyInt(), anyInt())).thenReturn(new PagedTrendResponse(List.of(new Trend("Fishing", 12), new Trend("Outdoor", 6))));
Mockito.when(trendClient.getTrendingPosts(anyInt(), anyInt())).thenReturn(new PagedTrendResponse(List.of(new Trend("1", 12), new Trend("2", 6))));
Mockito.when(trendClient.getTrendingUsers(anyInt(), anyInt())).thenReturn(new PagedTrendResponse(List.of(new Trend("1", 12), new Trend("2", 6))));
Mockito.when(trendClient.getTrendingCategories(anyInt())).thenReturn(new PagedTrendResponse(List.of(new Trend("Fishing", 12), new Trend("Outdoor", 6))));
Mockito.when(trendClient.getTrendingPosts(anyInt())).thenReturn(new PagedTrendResponse(List.of(new Trend("1", 12), new Trend("2", 6))));
Mockito.when(trendClient.getTrendingUsers(anyInt())).thenReturn(new PagedTrendResponse(List.of(new Trend("1", 12), new Trend("2", 6))));
Mockito.when(postClient.getPosts(any(), any(),anyInt(),anyInt())).thenReturn(new PagedPostResponse(List.of(new PostDTO(), new PostDTO())));
Mockito.when(postClient.getPost(anyInt())).thenReturn(new PostDTO());
Slice<PostDTO> personalizedFeed = feedController.getPersonalizedFeed(1, 0).call();
Expand Down
1 change: 1 addition & 0 deletions notificationservice/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: 5672
channel-rpc-timeout: 5000
lifecycle:
timeout-per-shutdown-phase: 20s
threads:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,35 @@ public SimpleMessageConverter converter() {
converter.setAllowedListPatterns(List.of("java.lang.*"));
return converter;
}

@Bean
public Queue postsRpcQueue() {
return new Queue("postservice.rpc.rpc-posts");
}

@Bean
public Queue singlePostRpcQueue() {
return new Queue("postservice.rpc.rpc-post");
}

@Bean
public DirectExchange postserviceRpcExchange() {
return new DirectExchange("postservice.rpc");
}

@Bean
public Binding bindingSinglePostToRpcExchange(DirectExchange postserviceRpcExchange,
Queue singlePostRpcQueue) {
return BindingBuilder.bind(singlePostRpcQueue)
.to(postserviceRpcExchange)
.with("rpc-post");
}

@Bean
public Binding bindingPostsToRpcExchange(DirectExchange postserviceRpcExchange,
Queue postsRpcQueue) {
return BindingBuilder.bind(postsRpcQueue)
.to(postserviceRpcExchange)
.with("rpc-posts");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package edu.hm.peslalz.thesis.postservice.entity;

import java.io.Serial;
import java.io.Serializable;

public record PostsRequestDTO(String category, Integer userId, int page, int size) implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
}
Loading

0 comments on commit fb30d36

Please sign in to comment.