Skip to content

Commit

Permalink
Use Websockets.next (#1232)
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Deandrea <edeandrea@redhat.com>
  • Loading branch information
edeandrea authored Oct 18, 2024
1 parent d8ad49f commit d30c71a
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 279 deletions.
11 changes: 6 additions & 5 deletions event-statistics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets</artifactId>
<artifactId>quarkus-websockets-next</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down Expand Up @@ -93,10 +93,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-info</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jacoco</artifactId>
Expand Down Expand Up @@ -133,6 +129,11 @@
<artifactId>quarkus-test-kafka-companion</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,117 +1,50 @@
package io.quarkus.sample.superheroes.statistics.endpoint;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.websocket.CloseReason;
import jakarta.websocket.CloseReason.CloseCodes;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.PongMessage;
import jakarta.websocket.Session;

import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnPongMessage;
import io.quarkus.websockets.next.WebSocketConnection;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.core.buffer.Buffer;

/**
* Base WebSocket endpoint which handles caching the stream and replaying
* the last event from a stream upon subscription by new socket clients
* @param <V> The object type inside each event
*/
public abstract class EventStatsWebSocket<V> {
private final ConcurrentMap<Session, Cancellable> sessions = new ConcurrentHashMap<>();
private Multi<String> cachedStream;
private Logger logger;

@Inject
ObjectMapper mapper;

protected abstract Multi<V> getStream();

@OnOpen
public void onOpen(Session session) {
this.logger.debugf("Opening session with id %s", session.getId());
this.sessions.put(session, createSubscription(session));
public Multi<V> onOpen(WebSocketConnection connection) {
this.logger.debugf("Opening connection with id %s", connection.id());

return Multi.createBy()
.replaying()
.upTo(1)
.ofMulti(getStream())
.invoke(v -> this.logger.infof("[Connection %s] - Writing message %s", connection.id(), v));
}

@OnMessage
public void onPongMessage(Session session, PongMessage pongMessage) {
this.logger.debugf("Got pong message (%s) from session %s", new String(pongMessage.getApplicationData().array(), StandardCharsets.UTF_8), session.getId());
@OnPongMessage
public void onPongMessage(WebSocketConnection connection, Buffer pongMessage) {
this.logger.debugf("Got pong message (%s) on %s from connection %s", pongMessage.toString(), connection.handshakeRequest().path(), connection.id());
}

@OnClose
public void onClose(Session session) {
this.logger.debugf("Closing session with id %s", session.getId());
Optional.ofNullable(this.sessions.remove(session))
.ifPresent(Cancellable::cancel);
public void onClose(WebSocketConnection connection) {
this.logger.debugf("Closing connection with id %s", connection.id());
}

@PostConstruct
public void initialize() {
this.logger = Logger.getLogger(getClass());
this.cachedStream = Multi.createBy().replaying().upTo(1).ofMulti(getStream())
.map(Unchecked.function(this.mapper::writeValueAsString));
}

@PreDestroy
public void cleanup() {
this.sessions.forEach((session, subscription) -> {
subscription.cancel();

if (session.isOpen()) {
try {
session.close(new CloseReason(CloseCodes.GOING_AWAY, "Server shutting down"));
}
catch (IOException ex) {
this.logger.errorf(ex, "Got exception (%s) while closing session", ex.getClass().getName());
}
}
});
}

void sendPings() {
this.sessions.keySet()
.stream()
.filter(Session::isOpen)
.forEach(this::sendPing);
}

private void sendPing(Session session) {
this.logger.debugf("Sending ping to session %s", session.getId());

try {
session.getAsyncRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
}
catch (IOException e) {
this.logger.errorf(e, "Got error sending ping: %s", e.getMessage());
}
}

private Cancellable createSubscription(Session session) {
return Optional.ofNullable(this.cachedStream)
.orElseThrow(() -> new IllegalArgumentException("Cached stream (Multi<String>) has not been created. Please initialize it inside an @PostConstruct method."))
.subscribe().with(serialized -> write(session, serialized));
}

private void write(Session session, String text) {
this.logger.infof("[Session %s] - Writing message %s", session.getId(), text);

session.getAsyncRemote().sendText(text, result -> {
if (result.getException() != null) {
this.logger.error("Unable to write message to web socket", result.getException());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package io.quarkus.sample.superheroes.statistics.endpoint;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.websocket.server.ServerEndpoint;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.websockets.next.WebSocket;

import io.quarkus.sample.superheroes.statistics.domain.TeamScore;

Expand All @@ -17,8 +15,7 @@
* </p>
* @see TeamStatsChannelHolder
*/
@ServerEndpoint("/stats/team")
@ApplicationScoped
@WebSocket(path = "/stats/team")
public class TeamStatsWebSocket extends EventStatsWebSocket<TeamScore> {
@Inject
TeamStatsChannelHolder teamStatsChannelHolder;
Expand All @@ -27,11 +24,4 @@ public class TeamStatsWebSocket extends EventStatsWebSocket<TeamScore> {
protected Multi<TeamScore> getStream() {
return this.teamStatsChannelHolder.getTeamStats();
}

@Scheduled(every = "${pingInterval.teamStats:1m}", delayed = "${pingInterval.teamStats:1m}")
@Override
void sendPings() {
// This is overridden here because of https://github.com/quarkusio/quarkus/issues/38781
super.sendPings();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.quarkus.sample.superheroes.statistics.endpoint;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.websocket.server.ServerEndpoint;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.websockets.next.WebSocket;

import io.quarkus.sample.superheroes.statistics.domain.Score;

Expand All @@ -16,8 +13,7 @@
* </p>
* @see TopWinnerStatsChannelHolder
*/
@ServerEndpoint("/stats/winners")
@ApplicationScoped
@WebSocket(path = "/stats/winners")
public class TopWinnerWebSocket extends EventStatsWebSocket<Iterable<Score>> {
private final TopWinnerStatsChannelHolder topWinnerStatsChannelHolder;

Expand All @@ -29,11 +25,4 @@ public TopWinnerWebSocket(TopWinnerStatsChannelHolder topWinnerStatsChannelHolde
protected Multi<Iterable<Score>> getStream() {
return this.topWinnerStatsChannelHolder.getWinners();
}

@Scheduled(every = "${pingInterval.topWinners:1m}", delayed = "${pingInterval.topWinners:1m}")
@Override
void sendPings() {
// This is overridden here because of https://github.com/quarkusio/quarkus/issues/38781
super.sendPings();
}
}
1 change: 1 addition & 0 deletions event-statistics/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ quarkus.banner.path=banner.txt
quarkus.http.port=8085
quarkus.http.test-port=0
quarkus.jackson.serialization-inclusion=non-empty
quarkus.websockets-next.server.auto-ping-interval=1m

## Kafka configuration
mp.messaging.incoming.fights.connector=smallrye-kafka
Expand Down
Loading

0 comments on commit d30c71a

Please sign in to comment.