From e9de35dcd4d7a2c095aa984c4f67059c98036e61 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 24 Jul 2024 15:49:55 -0400 Subject: [PATCH] PROTON-2841 Adds support for enabling WS compression to the transport Optional (default disabled) WS compression added to Transport implementations and the test peer. Peers can check is WS compression is enabled after a connection is established. --- .../protonj2/client/TransportOptions.java | 28 +++++++++ .../transport/netty4/WebSocketTransport.java | 4 ++ .../transport/netty5/WebSocketTransport.java | 5 +- .../client/impl/WsConnectionTest.java | 63 +++++++++++++++++++ .../client/transport/netty4/NettyServer.java | 12 ++++ .../transport/netty4/TcpTransportTest.java | 4 ++ .../netty4/WebSocketTransportTest.java | 50 +++++++++++++++ .../client/transport/netty5/NettyServer.java | 12 ++++ .../transport/netty5/TcpTransportTest.java | 4 ++ .../netty5/WebSocketTransportTest.java | 50 +++++++++++++++ .../test/driver/ProtonTestClient.java | 4 ++ .../test/driver/ProtonTestClientOptions.java | 11 ++++ .../test/driver/ProtonTestServer.java | 6 ++ .../test/driver/ProtonTestServerOptions.java | 12 ++++ .../test/driver/netty/NettyClient.java | 5 ++ .../test/driver/netty/NettyServer.java | 5 ++ .../driver/netty/netty4/Netty4Client.java | 54 ++++++++++++++++ .../driver/netty/netty4/Netty4Server.java | 54 ++++++++++++++++ .../driver/netty/netty5/Netty5Client.java | 52 +++++++++++++++ .../driver/netty/netty5/Netty5Server.java | 49 +++++++++++++++ .../test/driver/ProtonTestWSClientTest.java | 61 ++++++++++++++++++ 21 files changed, 544 insertions(+), 1 deletion(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java index f681da612..47e4f3c7c 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java @@ -40,6 +40,7 @@ public class TransportOptions implements Cloneable { public static final boolean DEFAULT_TRACE_BYTES = false; public static final int DEFAULT_LOCAL_PORT = 0; public static final boolean DEFAULT_USE_WEBSOCKETS = false; + public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false; public static final int DEFAULT_WEBSOCKET_MAX_FRAME_SIZE = 65535; private static final String[] DEFAULT_NATIVEIO_PREFERENCES_ARRAY = { "EPOLL", "KQUEUE" }; public static final List DEFAULT_NATIVEIO_PREFERENCES = @@ -62,6 +63,7 @@ public class TransportOptions implements Cloneable { private boolean useWebSockets = DEFAULT_USE_WEBSOCKETS; private String webSocketPath; private int webSocketMaxFrameSize = DEFAULT_WEBSOCKET_MAX_FRAME_SIZE; + private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION; private final Map webSocketHeaders = new HashMap<>(); @@ -454,6 +456,31 @@ public TransportOptions webSocketMaxFrameSize(int maxFrameSize) { return this; } + /** + * @return the configured value for the WebSocket compression support enabled flag. + */ + public boolean webSocketCompression() { + return webSocketCompression; + } + + /** + * Set to true to configure the transport layer as a WebSocket based connection that + * support compression of the WebSocket packets. This option simply allows the client + * to support compression if the server offers support but does not influence the server + * side, if the server does not offer support for compression of WS packets then this + * value has no affect on the WS packets and they remain uncompressed as if not enabled. + * (default is disabled). + * + * @param enabled + * should the transport support WebSocket compression if server offers it. + * + * @return this {@link TransportOptions} instance. + */ + public TransportOptions webSocketCompression(boolean enabled) { + this.webSocketCompression = enabled; + return this; + } + /** * Copy all configuration into the given {@link TransportOptions} from this instance. * @@ -481,6 +508,7 @@ public TransportOptions copyInto(TransportOptions other) { other.webSocketPath(webSocketPath()); other.webSocketHeaders().putAll(webSocketHeaders); other.webSocketMaxFrameSize(webSocketMaxFrameSize()); + other.webSocketCompression(webSocketCompression()); return other; } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java index f150a2c96..5d3aae6cf 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java @@ -47,6 +47,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ScheduledFuture; @@ -151,6 +152,9 @@ protected ChannelInboundHandlerAdapter createChannelHandler() { protected void addAdditionalHandlers(ChannelPipeline pipeline) { pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(8192)); + if (options.webSocketCompression()) { + pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE); + } } @Override diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java index 0c7fb5998..62c0097de 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java @@ -49,6 +49,7 @@ import io.netty5.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty5.handler.codec.http.websocketx.WebSocketFrame; import io.netty5.handler.codec.http.websocketx.WebSocketVersion; +import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty5.util.concurrent.Future; import io.netty5.util.concurrent.FutureListener; @@ -98,6 +99,9 @@ protected ChannelHandler createChannelHandler() { protected void addAdditionalHandlers(ChannelPipeline pipeline) { pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(8192)); + if (options.webSocketCompression()) { + pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE); + } } @Override @@ -169,7 +173,6 @@ public void operationComplete(Future future) throws Exception { super.channelActive(context); } - @SuppressWarnings("resource") @Override protected void messageReceived(ChannelHandlerContext ctx, Object message) throws Exception { LOG.trace("New data read: incoming: {}", message); diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java index caa1546cf..92aed1137 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java @@ -16,15 +16,23 @@ */ package org.apache.qpid.protonj2.client.impl; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.net.URI; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.qpid.protonj2.client.Client; import org.apache.qpid.protonj2.client.Connection; import org.apache.qpid.protonj2.client.ConnectionOptions; +import org.apache.qpid.protonj2.client.DeliveryMode; +import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.Sender; +import org.apache.qpid.protonj2.client.SenderOptions; +import org.apache.qpid.protonj2.client.Session; +import org.apache.qpid.protonj2.client.Tracker; import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.test.driver.ProtonTestServer; import org.apache.qpid.protonj2.test.driver.ProtonTestServerOptions; @@ -88,4 +96,59 @@ public void testWSConnectFailsDueToServerListeningOverTCP() throws Exception { peer.waitForScriptToCompleteIgnoreErrors(); } } + + @Test + public void testSendMessageWithLargeStringBodyWithCompressionEnabled() throws Exception { + final int BODY_SIZE = 16384; + + final String payload = new String("A").repeat(BODY_SIZE); + + try (ProtonTestServer peer = new ProtonTestServer(testServerOptions().setWebSocketCompression(true))) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.remoteFlow().withLinkCredit(10).queue(); + peer.expectAttach().respond(); // Open a receiver to ensure sender link has processed + peer.expectFlow(); // the inbound flow frame we sent previously before send. + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Sender test started, peer listening on: {}", remoteURI); + + ConnectionOptions connectOptions = new ConnectionOptions(); + connectOptions.transportOptions().webSocketCompression(true); + connectOptions.transportOptions().useWebSockets(true); + connectOptions.traceFrames(true); + + Client container = Client.create(); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectOptions).openFuture().get(); + + Session session = connection.openSession().openFuture().get(); + SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE); + Sender sender = session.openSender("test-qos", options); + + // Gates send on remote flow having been sent and received + session.openReceiver("dummy").openFuture().get(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().withMessage().withValue(payload); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + final Message message = Message.create(payload); + final Tracker tracker = sender.send(message); + + assertNotNull(tracker); + assertNotNull(tracker.settlementFuture().isDone()); + assertNotNull(tracker.settlementFuture().get().settled()); + + sender.closeAsync().get(10, TimeUnit.SECONDS); + + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java index c02f890ab..6164acb9c 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java @@ -61,6 +61,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslHandler; @@ -112,6 +113,10 @@ public boolean isWebSocketServer() { return options.useWebSockets(); } + public boolean isUseWebSocketCompression() { + return options.webSocketCompression(); + } + public String getWebSocketPath() { return webSocketPath; } @@ -210,9 +215,16 @@ public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(sslHandler); } + if (options.traceBytes()) { + ch.pipeline().addLast("logger", new LoggingHandler(getClass())); + } + if (isWebSocketServer()) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); + if (isUseWebSocketCompression()) { + ch.pipeline().addLast(new WebSocketServerCompressionHandler()); + } ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize)); } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java index 3d17e55f1..229e553ad 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java @@ -1167,6 +1167,10 @@ protected final NettyEchoServer createEchoServer(SslOptions options, boolean nee return createEchoServer(createServerTransportOptions(), options, needClientAuth); } + protected final NettyEchoServer createEchoServer(TransportOptions options) { + return new NettyEchoServer(options, createServerSSLOptions(), false); + } + protected final NettyEchoServer createEchoServer(TransportOptions options, SslOptions sslOptions, boolean needClientAuth) { return new NettyEchoServer(options, sslOptions, needClientAuth); } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java index 7927d11cd..899aa85c1 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java @@ -351,6 +351,56 @@ public boolean isSatisfied() throws Exception { } } + @Test + public void testTransportConnectionDoesNotDropWhenServerAndClientUseCompressionWithLargePayloads() throws Exception { + final int FRAME_SIZE = 16384; // This value would exceed the set max frame size without compression. + + final ProtonBuffer sendBuffer = allocator.allocate(FRAME_SIZE); + for (int i = 0; i < FRAME_SIZE; ++i) { + sendBuffer.writeByte((byte) 'A'); + } + + try (NettyEchoServer server = createEchoServer(createServerTransportOptions().webSocketCompression(true).traceBytes(true))) { + // Server won't accept the data as it's to large and will close the connection. + server.setMaxFrameSize(FRAME_SIZE / 2); + server.start(); + + final int port = server.getServerPort(); + + List transports = new ArrayList<>(); + + final Transport transport = createTransport(createTransportOptions().webSocketMaxFrameSize(FRAME_SIZE) + .webSocketCompression(true), createSSLOptions()); + + assertTrue(transport instanceof WebSocketTransport); + + try { + // Transport allows bigger frames in so that server is the one causing the failure. + transport.connect(HOSTNAME, port, testListener).awaitConnect(); + transports.add(transport); + transport.writeAndFlush(sendBuffer.copy()); + } catch (Exception e) { + fail("Should have connected to the server at " + HOSTNAME + ":" + port + " but got exception: " + e); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + try { + transport.writeAndFlush(sendBuffer.copy()); + } catch (IOException e) { + LOG.info("Transport send caught error:", e); + return false; + } + + return true; + } + }, 10000, 10), "Transport should not have lost connection"); + + transport.close(); + } + } + @Test public void testConfiguredHttpHeadersArriveAtServer() throws Exception { try (NettyEchoServer server = createEchoServer()) { diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java index 2a3bbd851..ce3a75bee 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java @@ -61,6 +61,7 @@ import io.netty5.handler.codec.http.websocketx.WebSocketFrame; import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakeCompletionEvent; import io.netty5.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty5.handler.logging.LogLevel; import io.netty5.handler.logging.LoggingHandler; import io.netty5.handler.ssl.SslHandler; @@ -112,6 +113,10 @@ public boolean isWebSocketServer() { return options.useWebSockets(); } + public boolean isUseWebSocketCompression() { + return options.webSocketCompression(); + } + public String getWebSocketPath() { return webSocketPath; } @@ -210,9 +215,16 @@ public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(sslHandler); } + if (options.traceBytes()) { + ch.pipeline().addLast("logger", new LoggingHandler(getClass())); + } + if (isWebSocketServer()) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); + if (isUseWebSocketCompression()) { + ch.pipeline().addLast(new WebSocketServerCompressionHandler()); + } ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize)); } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java index 62f33bfd3..3999bcb38 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java @@ -1095,6 +1095,10 @@ protected final NettyEchoServer createEchoServer(SslOptions options, boolean nee return createEchoServer(createServerTransportOptions(), options, needClientAuth); } + protected final NettyEchoServer createEchoServer(TransportOptions options) { + return new NettyEchoServer(options, createServerSSLOptions(), false); + } + protected final NettyEchoServer createEchoServer(TransportOptions options, SslOptions sslOptions, boolean needClientAuth) { return new NettyEchoServer(options, sslOptions, needClientAuth); } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java index a8afb0d2c..8b850b280 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java @@ -350,6 +350,56 @@ public boolean isSatisfied() throws Exception { } } + @Test + public void testTransportConnectionDoesNotDropWhenServerAndClientUseCompressionWithLargePayloads() throws Exception { + final int FRAME_SIZE = 16384; // This value would exceed the set max frame size without compression. + + final ProtonBuffer sendBuffer = allocator.allocate(FRAME_SIZE); + for (int i = 0; i < FRAME_SIZE; ++i) { + sendBuffer.writeByte((byte) 'A'); + } + + try (NettyEchoServer server = createEchoServer(createServerTransportOptions().webSocketCompression(true).traceBytes(true))) { + // Server won't accept the data as it's to large and will close the connection. + server.setMaxFrameSize(FRAME_SIZE / 2); + server.start(); + + final int port = server.getServerPort(); + + List transports = new ArrayList<>(); + + final Transport transport = createTransport(createTransportOptions().webSocketMaxFrameSize(FRAME_SIZE) + .webSocketCompression(true), createSSLOptions()); + + assertTrue(transport instanceof WebSocketTransport); + + try { + // Transport allows bigger frames in so that server is the one causing the failure. + transport.connect(HOSTNAME, port, testListener).awaitConnect(); + transports.add(transport); + transport.writeAndFlush(sendBuffer.copy()); + } catch (Exception e) { + fail("Should have connected to the server at " + HOSTNAME + ":" + port + " but got exception: " + e); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + try { + transport.writeAndFlush(sendBuffer.copy()); + } catch (IOException e) { + LOG.info("Transport send caught error:", e); + return false; + } + + return true; + } + }, 10000, 10), "Transport should not have lost connection"); + + transport.close(); + } + } + @Test public void testConfiguredHttpHeadersArriveAtServer() throws Exception { try (NettyEchoServer server = createEchoServer()) { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java index 97c59e286..e4249afc8 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java @@ -76,6 +76,10 @@ public AMQPTestDriver getDriver() { return driver; } + public boolean isWSCompressionActive() { + return client.isWSCompressionActive(); + } + @Override protected void processConnectionEstablished() { LOG.trace("AMQP Client connected to remote."); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java index c40cf207e..bac99f04e 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java @@ -49,6 +49,7 @@ public class ProtonTestClientOptions implements Cloneable { public static final boolean DEFAULT_USE_WEBSOCKETS = false; public static final boolean DEFAULT_FRAGMENT_WEBSOCKET_WRITES = false; public static final String DEFAULT_WEBSOCKET_PATH = "/"; + public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false; public static final int DEFAULT_WEBSOCKET_MAX_FRAME_SIZE = 65535; public static final boolean DEFAULT_SECURE_SERVER = false; public static final boolean DEFAULT_NEEDS_CLIENT_AUTH = false; @@ -75,6 +76,7 @@ public class ProtonTestClientOptions implements Cloneable { private boolean fragmentWebSocketWrites = DEFAULT_FRAGMENT_WEBSOCKET_WRITES; private String webSocketPath = DEFAULT_WEBSOCKET_PATH; private int webSocketMaxFrameSize = DEFAULT_WEBSOCKET_MAX_FRAME_SIZE; + private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION; private boolean secure = DEFAULT_SECURE_SERVER; private boolean needClientAuth = DEFAULT_NEEDS_CLIENT_AUTH; @@ -554,6 +556,14 @@ public void setWebSocketMaxFrameSize(int webSocketMaxFrameSize) { this.webSocketMaxFrameSize = webSocketMaxFrameSize; } + public boolean isWebSocketCompression() { + return webSocketCompression; + } + + public void setWebSocketCompression(boolean enabled) { + this.webSocketCompression = enabled; + } + protected ProtonTestClientOptions copyOptions(ProtonTestClientOptions copy) { copy.setReceiveBufferSize(getReceiveBufferSize()); copy.setSendBufferSize(getSendBufferSize()); @@ -587,6 +597,7 @@ protected ProtonTestClientOptions copyOptions(ProtonTestClientOptions copy) { copy.setFragmentWrites(isFragmentWrites()); copy.setWebSocketPath(getWebSocketPath()); copy.setWebSocketMaxFrameSize(getWebSocketMaxFrameSize()); + copy.setWebSocketCompression(isWebSocketCompression()); return copy; } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java index b4ab29ced..9b7d6420b 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java @@ -131,6 +131,7 @@ public URI getServerURI(String query) { * * @return this test peer instance. */ + @Override public ProtonTestPeer dropAfterLastHandler() { getDriver().addScriptedElement(new ConnectionDropAction(this)); return this; @@ -147,6 +148,7 @@ public ProtonTestPeer dropAfterLastHandler() { * * @return this test peer instance. */ + @Override public ProtonTestPeer dropAfterLastHandler(int delay) { getDriver().addScriptedElement(new ConnectionDropAction(this).afterDelay(delay)); return this; @@ -176,6 +178,10 @@ public int getConnectionRemotePort() { return server.getClientPort(); } + public boolean isWSCompressionActive() { + return server.isWSCompressionActive(); + } + @Override public AMQPTestDriver getDriver() { return driver; diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java index 06b85e755..23ebff321 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java @@ -48,6 +48,7 @@ public class ProtonTestServerOptions implements Cloneable { public static final int DEFAULT_LOCAL_PORT = 0; public static final boolean DEFAULT_USE_WEBSOCKETS = false; public static final boolean DEFAULT_FRAGMENT_WEBSOCKET_WRITES = false; + public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false; public static final boolean DEFAULT_SECURE_SERVER = false; public static final boolean DEFAULT_NEEDS_CLIENT_AUTH = false; @@ -70,6 +71,7 @@ public class ProtonTestServerOptions implements Cloneable { private int localPort = DEFAULT_LOCAL_PORT; private boolean traceBytes = DEFAULT_TRACE_BYTES; private boolean useWebSockets = DEFAULT_USE_WEBSOCKETS; + private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION; private boolean fragmentWebSocketWrites = DEFAULT_FRAGMENT_WEBSOCKET_WRITES; private boolean secure = DEFAULT_SECURE_SERVER; @@ -612,6 +614,15 @@ public ProtonTestServerOptions setFragmentWrites(boolean fragmentWrites) { return this; } + public boolean isWebSocketCompression() { + return webSocketCompression; + } + + public ProtonTestServerOptions setWebSocketCompression(boolean enabled) { + this.webSocketCompression = enabled; + return this; + } + protected ProtonTestServerOptions copyOptions(ProtonTestServerOptions copy) { copy.setReceiveBufferSize(getReceiveBufferSize()); copy.setSendBufferSize(getSendBufferSize()); @@ -643,6 +654,7 @@ protected ProtonTestServerOptions copyOptions(ProtonTestServerOptions copy) { copy.setNeedClientAuth(isNeedClientAuth()); copy.setUseWebSockets(isUseWebSockets()); copy.setFragmentWrites(isFragmentWrites()); + copy.setWebSocketCompression(isWebSocketCompression()); return copy; } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java index b90af803a..550943dab 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java @@ -66,4 +66,9 @@ public interface NettyClient extends AutoCloseable { */ URI getRemoteURI(); + /** + * @return true if the connected client has WS compression activated by the server. + */ + boolean isWSCompressionActive(); + } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java index 99d775c8c..b5b91c512 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java @@ -92,6 +92,11 @@ public interface NettyServer extends AutoCloseable { */ int getClientPort(); + /** + * @return true if a connected client has negotiated WS compression. + */ + boolean isWSCompressionActive(); + /** * @return has the SSL handshake for a client completed successfully. */ diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java index 3bb61d508..b31c6e90b 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java @@ -38,6 +38,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -52,8 +53,10 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; @@ -65,6 +68,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; @@ -88,6 +92,8 @@ public final class Netty4Client implements NettyClient { private Channel channel; private String host; private int port; + private boolean wsCompressionRequest; + private boolean wsCompressionResponse; protected volatile IOException failureCause; private final ProtonTestClientOptions options; private volatile SslHandler sslHandler; @@ -223,6 +229,15 @@ public boolean isSecure() { return options.isSecure(); } + @Override + public boolean isWSCompressionActive() { + if (channel == null || !channel.isActive()) { + throw new IllegalStateException("Channel is not connected or has closed"); + } + + return wsCompressionRequest && wsCompressionResponse; + } + @Override public URI getRemoteURI() { if (host != null) { @@ -400,6 +415,41 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } + private class ClientWSCompressionObserver extends ChannelDuplexHandler { + + final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions"; + final String WS_PERMESSAGE_DEFLATE = "permessage-deflate"; + final String WS_UPGRADE = "upgrade"; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object message) { + if (message instanceof FullHttpResponse) { + FullHttpResponse request = (FullHttpResponse) message; + HttpHeaders headers = request.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionRequest = headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE); + } + } + + ctx.fireChannelRead(message); + } + + @Override + public void write(ChannelHandlerContext context, Object message, ChannelPromise promise) throws Exception { + if (message instanceof FullHttpRequest) { + FullHttpRequest response = (FullHttpRequest) message; + HttpHeaders headers = response.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionResponse = headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE); + } + } + + context.write(message, promise); + } + } + //----- Internal Client implementation API protected ChannelHandler getClientHandler() { @@ -455,6 +505,10 @@ private void configureChannel(final Channel channel) throws Exception { if (options.isUseWebSockets()) { channel.pipeline().addLast(new HttpClientCodec()); channel.pipeline().addLast(new HttpObjectAggregator(8192)); + if (options.isWebSocketCompression()) { + channel.pipeline().addLast(new ClientWSCompressionObserver()); + channel.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE); + } } channel.pipeline().addLast(new NettyClientOutboundHandler()); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java index 64df5b0a1..94a891dc8 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java @@ -43,6 +43,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -59,6 +60,7 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpUtil; @@ -67,6 +69,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslHandler; @@ -93,6 +96,8 @@ public final class Netty4Server implements NettyServer { private final ProtonTestServerOptions options; private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private String webSocketPath = WEBSOCKET_PATH; + private boolean wsCompressionRequest; + private boolean wsCompressionResponse; private volatile SslHandler sslHandler; private volatile HandshakeComplete handshakeComplete; private final CountDownLatch handshakeCompletion = new CountDownLatch(1); @@ -141,6 +146,12 @@ public int getClientPort() { return (((InetSocketAddress) clientChannel.remoteAddress()).getPort()); } + @Override + public boolean isWSCompressionActive() { + Objects.requireNonNull(clientChannel); + return wsCompressionRequest && wsCompressionResponse; + } + @Override public boolean isPeerVerified() { try { @@ -264,9 +275,17 @@ public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(sslHandler = SslSupport.createServerSslHandler(null, options)); } + if (options.isTraceBytes()) { + ch.pipeline().addLast(new LoggingHandler(getClass())); + } + if (options.isUseWebSockets()) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); + if (options.isWebSocketCompression()) { + ch.pipeline().addLast(new ServerWSCompressionObserver()); + ch.pipeline().addLast(new WebSocketServerCompressionHandler()); + } ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize)); } @@ -513,6 +532,41 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } + private class ServerWSCompressionObserver extends ChannelDuplexHandler { + + final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions"; + final String WS_PERMESSAGE_DEFLATE = "permessage-deflate"; + final String WS_UPGRADE = "upgrade"; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object message) { + if (message instanceof FullHttpRequest) { + FullHttpRequest request = (FullHttpRequest) message; + HttpHeaders headers = request.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionRequest = headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE); + } + } + + ctx.fireChannelRead(message); + } + + @Override + public void write(ChannelHandlerContext context, Object message, ChannelPromise promise) throws Exception { + if (message instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) message; + HttpHeaders headers = response.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionResponse = headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE); + } + } + + context.write(message, promise); + } + } + private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) { // Generate an error page if response getStatus code is not OK (200). if (response.status().code() != 200) { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java index 643374c8a..b17209e27 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java @@ -51,6 +51,7 @@ import io.netty5.channel.nio.NioHandler; import io.netty5.channel.socket.nio.NioSocketChannel; import io.netty5.handler.codec.http.DefaultHttpContent; +import io.netty5.handler.codec.http.FullHttpRequest; import io.netty5.handler.codec.http.FullHttpResponse; import io.netty5.handler.codec.http.HttpClientCodec; import io.netty5.handler.codec.http.HttpObjectAggregator; @@ -65,6 +66,7 @@ import io.netty5.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty5.handler.codec.http.websocketx.WebSocketFrame; import io.netty5.handler.codec.http.websocketx.WebSocketVersion; +import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty5.handler.logging.LoggingHandler; import io.netty5.handler.ssl.SslHandler; import io.netty5.util.concurrent.Future; @@ -87,6 +89,8 @@ public final class Netty5Client implements NettyClient { private Channel channel; private String host; private int port; + private boolean wsCompressionRequest; + private boolean wsCompressionResponse; protected volatile IOException failureCause; private final ProtonTestClientOptions options; private volatile SslHandler sslHandler; @@ -222,6 +226,15 @@ public boolean isSecure() { return options.isSecure(); } + @Override + public boolean isWSCompressionActive() { + if (channel == null || !channel.isActive()) { + throw new IllegalStateException("Channel is not connected or has closed"); + } + + return wsCompressionRequest && wsCompressionResponse; + } + @Override public URI getRemoteURI() { if (host != null) { @@ -399,6 +412,41 @@ public Future write(ChannelHandlerContext ctx, Object msg) { } } + private class ClientWSCompressionObserverHandler extends ChannelHandlerAdapter { + + final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions"; + final String WS_PERMESSAGE_DEFLATE = "permessage-deflate"; + final String WS_UPGRADE = "upgrade"; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object message) { + if (message instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) message; + HttpHeaders headers = response.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionRequest = headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE); + } + } + + ctx.fireChannelRead(message); + } + + @Override + public Future write(ChannelHandlerContext context, Object message) { + if (message instanceof FullHttpRequest) { + FullHttpRequest request = (FullHttpRequest) message; + HttpHeaders headers = request.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionRequest = headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE); + } + } + + return context.write(message); + } + } + //----- Internal Client implementation API protected ChannelHandler getClientHandler() { @@ -454,6 +502,10 @@ private void configureChannel(final Channel channel) throws Exception { if (options.isUseWebSockets()) { channel.pipeline().addLast(new HttpClientCodec()); channel.pipeline().addLast(new HttpObjectAggregator(8192)); + if (options.isWebSocketCompression()) { + channel.pipeline().addLast(new ClientWSCompressionObserverHandler()); + channel.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE); + } } channel.pipeline().addLast(new NettyClientOutboundHandler()); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java index 0148b0917..4e08ba491 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java @@ -61,11 +61,13 @@ import io.netty5.handler.codec.http.HttpServerCodec; import io.netty5.handler.codec.http.HttpUtil; import io.netty5.handler.codec.http.HttpVersion; +import io.netty5.handler.codec.http.headers.HttpHeaders; import io.netty5.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty5.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty5.handler.codec.http.websocketx.WebSocketFrame; import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakeCompletionEvent; import io.netty5.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty5.handler.logging.LogLevel; import io.netty5.handler.logging.LoggingHandler; import io.netty5.handler.ssl.SslHandler; @@ -92,6 +94,8 @@ public final class Netty5Server implements NettyServer { private final ProtonTestServerOptions options; private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private String webSocketPath = WEBSOCKET_PATH; + private boolean wsCompressionRequest; + private boolean wsCompressionResponse; private volatile SslHandler sslHandler; private volatile WebSocketServerHandshakeCompletionEvent handshakeComplete; private final CountDownLatch handshakeCompletion = new CountDownLatch(1); @@ -139,6 +143,12 @@ public int getClientPort() { return (((InetSocketAddress) clientChannel.remoteAddress()).getPort()); } + @Override + public boolean isWSCompressionActive() { + Objects.requireNonNull(clientChannel); + return wsCompressionRequest && wsCompressionResponse; + } + @Override public boolean isPeerVerified() { try { @@ -261,6 +271,10 @@ public void initChannel(Channel ch) throws Exception { if (options.isUseWebSockets()) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); + if (options.isWebSocketCompression()) { + ch.pipeline().addLast(new ServerWSCompressionObserverHandler()); + ch.pipeline().addLast(new WebSocketServerCompressionHandler()); + } ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize)); } @@ -510,6 +524,41 @@ public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } + private class ServerWSCompressionObserverHandler extends ChannelHandlerAdapter { + + final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions"; + final String WS_PERMESSAGE_DEFLATE = "permessage-deflate"; + final String WS_UPGRADE = "upgrade"; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object message) { + if (message instanceof FullHttpRequest) { + FullHttpRequest request = (FullHttpRequest) message; + HttpHeaders headers = request.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionRequest = headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE); + } + } + + ctx.fireChannelRead(message); + } + + @Override + public Future write(ChannelHandlerContext context, Object message) { + if (message instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) message; + HttpHeaders headers = response.headers(); + + if (headers.contains(WS_UPGRADE) && headers.contains(WS_EXTENSIONS_SECTION)) { + wsCompressionResponse = headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE); + } + } + + return context.write(message); + } + } + private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) { // Generate an error page if response getStatus code is not OK (200). if (response.status().code() != 200) { diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java index 458b47614..cc9942b5f 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.protonj2.test.driver; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.net.URI; import java.util.concurrent.TimeUnit; @@ -95,4 +97,63 @@ public void testClientCanConnectAndOpenExchanged() throws Exception { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testClientAndServerEnabledWSCompressionCanConnect() throws Exception { + doTestClientAndServerWSCompressionNegotiation(true, true); + } + + @Test + public void testClientAndServerDisableWSCompressionCanConnect() throws Exception { + doTestClientAndServerWSCompressionNegotiation(false, false); + } + + @Test + public void testClientEnablesAndServerDisableWSCompressionCanConnect() throws Exception { + doTestClientAndServerWSCompressionNegotiation(true, false); + } + + @Test + public void testClientDisablesAndServerEnablesWSCompressionCanConnect() throws Exception { + doTestClientAndServerWSCompressionNegotiation(false, true); + } + + private void doTestClientAndServerWSCompressionNegotiation(boolean serverWSCompression, boolean clientWSCompression) throws Exception { + ProtonTestServerOptions serverOpts = new ProtonTestServerOptions(); + serverOpts.setUseWebSockets(true); + serverOpts.setWebSocketCompression(serverWSCompression); + + ProtonTestClientOptions clientOpts = new ProtonTestClientOptions(); + clientOpts.setUseWebSockets(true); + clientOpts.setWebSocketCompression(clientWSCompression); + + try (ProtonTestServer peer = new ProtonTestServer(serverOpts)) { + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectClose().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + ProtonTestClient client = new ProtonTestClient(clientOpts); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectClose(); + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteClose().now(); + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + + assertEquals(serverWSCompression && clientWSCompression, peer.isWSCompressionActive()); + assertEquals(serverWSCompression && clientWSCompression, client.isWSCompressionActive()); + + client.close(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } }