Skip to content

Commit

Permalink
PROTON-2841 Adds support for enabling WS compression to the transport
Browse files Browse the repository at this point in the history
Optional (default disabled) WS compression added to Transport implementations
and the test peer. Peers can check is WS compression is enabled after a connection
is established.
  • Loading branch information
tabish121 committed Jul 24, 2024
1 parent d13544d commit e9de35d
Show file tree
Hide file tree
Showing 21 changed files with 544 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TransportOptions implements Cloneable {
public static final boolean DEFAULT_TRACE_BYTES = false;
public static final int DEFAULT_LOCAL_PORT = 0;
public static final boolean DEFAULT_USE_WEBSOCKETS = false;
public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false;
public static final int DEFAULT_WEBSOCKET_MAX_FRAME_SIZE = 65535;
private static final String[] DEFAULT_NATIVEIO_PREFERENCES_ARRAY = { "EPOLL", "KQUEUE" };
public static final List<String> DEFAULT_NATIVEIO_PREFERENCES =
Expand All @@ -62,6 +63,7 @@ public class TransportOptions implements Cloneable {
private boolean useWebSockets = DEFAULT_USE_WEBSOCKETS;
private String webSocketPath;
private int webSocketMaxFrameSize = DEFAULT_WEBSOCKET_MAX_FRAME_SIZE;
private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION;

private final Map<String, String> webSocketHeaders = new HashMap<>();

Expand Down Expand Up @@ -454,6 +456,31 @@ public TransportOptions webSocketMaxFrameSize(int maxFrameSize) {
return this;
}

/**
* @return the configured value for the WebSocket compression support enabled flag.
*/
public boolean webSocketCompression() {
return webSocketCompression;
}

/**
* Set to true to configure the transport layer as a WebSocket based connection that
* support compression of the WebSocket packets. This option simply allows the client
* to support compression if the server offers support but does not influence the server
* side, if the server does not offer support for compression of WS packets then this
* value has no affect on the WS packets and they remain uncompressed as if not enabled.
* (default is disabled).
*
* @param enabled
* should the transport support WebSocket compression if server offers it.
*
* @return this {@link TransportOptions} instance.
*/
public TransportOptions webSocketCompression(boolean enabled) {
this.webSocketCompression = enabled;
return this;
}

/**
* Copy all configuration into the given {@link TransportOptions} from this instance.
*
Expand Down Expand Up @@ -481,6 +508,7 @@ public TransportOptions copyInto(TransportOptions other) {
other.webSocketPath(webSocketPath());
other.webSocketHeaders().putAll(webSocketHeaders);
other.webSocketMaxFrameSize(webSocketMaxFrameSize());
other.webSocketCompression(webSocketCompression());

return other;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -151,6 +152,9 @@ protected ChannelInboundHandlerAdapter createChannelHandler() {
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(8192));
if (options.webSocketCompression()) {
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.netty5.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
import io.netty5.handler.codec.http.websocketx.WebSocketVersion;
import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureListener;

Expand Down Expand Up @@ -98,6 +99,9 @@ protected ChannelHandler createChannelHandler() {
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator<DefaultHttpContent>(8192));
if (options.webSocketCompression()) {
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
}
}

@Override
Expand Down Expand Up @@ -169,7 +173,6 @@ public void operationComplete(Future<? extends Void> future) throws Exception {
super.channelActive(context);
}

@SuppressWarnings("resource")
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object message) throws Exception {
LOG.trace("New data read: incoming: {}", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@
*/
package org.apache.qpid.protonj2.client.impl;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.DeliveryMode;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.Sender;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.Session;
import org.apache.qpid.protonj2.client.Tracker;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.ProtonTestServerOptions;
Expand Down Expand Up @@ -88,4 +96,59 @@ public void testWSConnectFailsDueToServerListeningOverTCP() throws Exception {
peer.waitForScriptToCompleteIgnoreErrors();
}
}

@Test
public void testSendMessageWithLargeStringBodyWithCompressionEnabled() throws Exception {
final int BODY_SIZE = 16384;

final String payload = new String("A").repeat(BODY_SIZE);

try (ProtonTestServer peer = new ProtonTestServer(testServerOptions().setWebSocketCompression(true))) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond();
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed
peer.expectFlow(); // the inbound flow frame we sent previously before send.
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Sender test started, peer listening on: {}", remoteURI);

ConnectionOptions connectOptions = new ConnectionOptions();
connectOptions.transportOptions().webSocketCompression(true);
connectOptions.transportOptions().useWebSockets(true);
connectOptions.traceFrames(true);

Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectOptions).openFuture().get();

Session session = connection.openSession().openFuture().get();
SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
Sender sender = session.openSender("test-qos", options);

// Gates send on remote flow having been sent and received
session.openReceiver("dummy").openFuture().get();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMessage().withValue(payload);
peer.expectDetach().respond();
peer.expectClose().respond();

final Message<String> message = Message.create(payload);
final Tracker tracker = sender.send(message);

assertNotNull(tracker);
assertNotNull(tracker.settlementFuture().isDone());
assertNotNull(tracker.settlementFuture().get().settled());

sender.closeAsync().get(10, TimeUnit.SECONDS);

connection.closeAsync().get(10, TimeUnit.SECONDS);

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
Expand Down Expand Up @@ -112,6 +113,10 @@ public boolean isWebSocketServer() {
return options.useWebSockets();
}

public boolean isUseWebSocketCompression() {
return options.webSocketCompression();
}

public String getWebSocketPath() {
return webSocketPath;
}
Expand Down Expand Up @@ -210,9 +215,16 @@ public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(sslHandler);
}

if (options.traceBytes()) {
ch.pipeline().addLast("logger", new LoggingHandler(getClass()));
}

if (isWebSocketServer()) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
if (isUseWebSocketCompression()) {
ch.pipeline().addLast(new WebSocketServerCompressionHandler());
}
ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,10 @@ protected final NettyEchoServer createEchoServer(SslOptions options, boolean nee
return createEchoServer(createServerTransportOptions(), options, needClientAuth);
}

protected final NettyEchoServer createEchoServer(TransportOptions options) {
return new NettyEchoServer(options, createServerSSLOptions(), false);
}

protected final NettyEchoServer createEchoServer(TransportOptions options, SslOptions sslOptions, boolean needClientAuth) {
return new NettyEchoServer(options, sslOptions, needClientAuth);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,56 @@ public boolean isSatisfied() throws Exception {
}
}

@Test
public void testTransportConnectionDoesNotDropWhenServerAndClientUseCompressionWithLargePayloads() throws Exception {
final int FRAME_SIZE = 16384; // This value would exceed the set max frame size without compression.

final ProtonBuffer sendBuffer = allocator.allocate(FRAME_SIZE);
for (int i = 0; i < FRAME_SIZE; ++i) {
sendBuffer.writeByte((byte) 'A');
}

try (NettyEchoServer server = createEchoServer(createServerTransportOptions().webSocketCompression(true).traceBytes(true))) {
// Server won't accept the data as it's to large and will close the connection.
server.setMaxFrameSize(FRAME_SIZE / 2);
server.start();

final int port = server.getServerPort();

List<Transport> transports = new ArrayList<>();

final Transport transport = createTransport(createTransportOptions().webSocketMaxFrameSize(FRAME_SIZE)
.webSocketCompression(true), createSSLOptions());

assertTrue(transport instanceof WebSocketTransport);

try {
// Transport allows bigger frames in so that server is the one causing the failure.
transport.connect(HOSTNAME, port, testListener).awaitConnect();
transports.add(transport);
transport.writeAndFlush(sendBuffer.copy());
} catch (Exception e) {
fail("Should have connected to the server at " + HOSTNAME + ":" + port + " but got exception: " + e);
}

assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
transport.writeAndFlush(sendBuffer.copy());
} catch (IOException e) {
LOG.info("Transport send caught error:", e);
return false;
}

return true;
}
}, 10000, 10), "Transport should not have lost connection");

transport.close();
}
}

@Test
public void testConfiguredHttpHeadersArriveAtServer() throws Exception {
try (NettyEchoServer server = createEchoServer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakeCompletionEvent;
import io.netty5.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty5.handler.logging.LogLevel;
import io.netty5.handler.logging.LoggingHandler;
import io.netty5.handler.ssl.SslHandler;
Expand Down Expand Up @@ -112,6 +113,10 @@ public boolean isWebSocketServer() {
return options.useWebSockets();
}

public boolean isUseWebSocketCompression() {
return options.webSocketCompression();
}

public String getWebSocketPath() {
return webSocketPath;
}
Expand Down Expand Up @@ -210,9 +215,16 @@ public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(sslHandler);
}

if (options.traceBytes()) {
ch.pipeline().addLast("logger", new LoggingHandler(getClass()));
}

if (isWebSocketServer()) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator<DefaultHttpContent>(65536));
if (isUseWebSocketCompression()) {
ch.pipeline().addLast(new WebSocketServerCompressionHandler());
}
ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,10 @@ protected final NettyEchoServer createEchoServer(SslOptions options, boolean nee
return createEchoServer(createServerTransportOptions(), options, needClientAuth);
}

protected final NettyEchoServer createEchoServer(TransportOptions options) {
return new NettyEchoServer(options, createServerSSLOptions(), false);
}

protected final NettyEchoServer createEchoServer(TransportOptions options, SslOptions sslOptions, boolean needClientAuth) {
return new NettyEchoServer(options, sslOptions, needClientAuth);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,56 @@ public boolean isSatisfied() throws Exception {
}
}

@Test
public void testTransportConnectionDoesNotDropWhenServerAndClientUseCompressionWithLargePayloads() throws Exception {
final int FRAME_SIZE = 16384; // This value would exceed the set max frame size without compression.

final ProtonBuffer sendBuffer = allocator.allocate(FRAME_SIZE);
for (int i = 0; i < FRAME_SIZE; ++i) {
sendBuffer.writeByte((byte) 'A');
}

try (NettyEchoServer server = createEchoServer(createServerTransportOptions().webSocketCompression(true).traceBytes(true))) {
// Server won't accept the data as it's to large and will close the connection.
server.setMaxFrameSize(FRAME_SIZE / 2);
server.start();

final int port = server.getServerPort();

List<Transport> transports = new ArrayList<>();

final Transport transport = createTransport(createTransportOptions().webSocketMaxFrameSize(FRAME_SIZE)
.webSocketCompression(true), createSSLOptions());

assertTrue(transport instanceof WebSocketTransport);

try {
// Transport allows bigger frames in so that server is the one causing the failure.
transport.connect(HOSTNAME, port, testListener).awaitConnect();
transports.add(transport);
transport.writeAndFlush(sendBuffer.copy());
} catch (Exception e) {
fail("Should have connected to the server at " + HOSTNAME + ":" + port + " but got exception: " + e);
}

assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
transport.writeAndFlush(sendBuffer.copy());
} catch (IOException e) {
LOG.info("Transport send caught error:", e);
return false;
}

return true;
}
}, 10000, 10), "Transport should not have lost connection");

transport.close();
}
}

@Test
public void testConfiguredHttpHeadersArriveAtServer() throws Exception {
try (NettyEchoServer server = createEchoServer()) {
Expand Down
Loading

0 comments on commit e9de35d

Please sign in to comment.