diff --git a/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java b/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java index c90dbb24c..49b68cc89 100644 --- a/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java +++ b/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java @@ -1,10 +1,15 @@ package io.quarkiverse.cxf.transport.generated; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.util.ArrayDeque; import java.util.Objects; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; +import io.vertx.core.buffer.impl.VertxByteBufAllocator; /** * Adapted by sync-quarkus-classes.groovy from @@ -19,7 +24,26 @@ */ final class AppendBuffer { - private final ByteBufAllocator allocator; + private static final MethodHandle virtualMh = PlatformDependent.javaVersion() >= 21 ? findVirtualMH() : null; + + private static MethodHandle findVirtualMH() { + try { + return MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", MethodType.methodType(boolean.class)); + } catch (Exception e) { + return null; + } + } + + private static boolean isVirtualThread() { + if (virtualMh == null) { + return false; + } + try { + return (boolean) virtualMh.invokeExact(Thread.currentThread()); + } catch (Throwable t) { + return false; + } + } private final int minChunkSize; @@ -31,25 +55,27 @@ final class AppendBuffer { private int size; - private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { - this.allocator = allocator; + private boolean anyHeap; + + private AppendBuffer(int minChunkSize, int capacity) { this.minChunkSize = Math.min(minChunkSize, capacity); this.capacity = capacity; + this.anyHeap = false; } /** * This buffer append data in a single eagerly allocated {@link ByteBuf}. */ - public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) { - return new AppendBuffer(allocator, capacity, capacity); + public static AppendBuffer eager(int capacity) { + return new AppendBuffer(capacity, capacity); } /** * This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. */ - public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { - return new AppendBuffer(allocator, 0, capacity); + public static AppendBuffer exact(int capacity) { + return new AppendBuffer(0, capacity); } /** @@ -57,8 +83,8 @@ public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { * as each {@code len}, if greater than it.
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. */ - public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) { - return new AppendBuffer(allocator, minChunkSize, capacity); + public static AppendBuffer withMinChunks(int minChunkSize, int capacity) { + return new AppendBuffer(minChunkSize, capacity); } private ByteBuf lastBuffer() { @@ -111,7 +137,14 @@ public int append(byte[] bytes, int off, int len) { } else { chunkCapacity = toWrite; } - var tmpBuf = allocator.directBuffer(chunkCapacity); + boolean isVirtualThread = isVirtualThread(); + final ByteBuf tmpBuf; + if (isVirtualThread) { + // VertxByteBufAllocator allocates cheaper heap buffers ie which doesn't use reference counting + tmpBuf = VertxByteBufAllocator.DEFAULT.heapBuffer(chunkCapacity); + } else { + tmpBuf = PooledByteBufAllocator.DEFAULT.directBuffer(chunkCapacity); + } try { tmpBuf.writeBytes(bytes, off, toWrite); } catch (Throwable t) { @@ -133,6 +166,9 @@ public int append(byte[] bytes, int off, int len) { throw t; } } + if (isVirtualThread) { + anyHeap = true; + } size += toWrite; return toWrite + alreadyWritten; } @@ -159,6 +195,7 @@ public ByteBuf clear() { if (others == null || others.isEmpty()) { size = 0; buffer = null; + anyHeap = false; // super fast-path return firstBuf; } @@ -168,10 +205,17 @@ public ByteBuf clear() { private CompositeByteBuf clearBuffers() { var firstBuf = buffer; var others = otherBuffers; - var batch = allocator.compositeDirectBuffer(1 + others.size()); + CompositeByteBuf batch; + if (anyHeap) { + batch = new CompositeByteBuf(VertxByteBufAllocator.UNPOOLED_ALLOCATOR, false, 1 + others.size()); + } else { + // This should be the allocator picked by Netty + batch = PooledByteBufAllocator.DEFAULT.compositeBuffer(1 + others.size()); + } try { buffer = null; size = 0; + anyHeap = false; batch.addComponent(true, 0, firstBuf); for (int i = 0, othersCount = others.size(); i < othersCount; i++) { // if addComponent fail, it takes care of releasing curr and throwing the exception: diff --git a/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java b/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java index d5d58c2a2..29d4d1f7f 100644 --- a/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java +++ b/extensions/core/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java @@ -7,7 +7,6 @@ import jakarta.servlet.WriteListener; import org.jboss.logging.Logger; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderNames; import io.quarkiverse.cxf.transport.VertxReactiveRequestContext; import io.quarkus.vertx.core.runtime.VertxBufferImpl; @@ -50,7 +49,7 @@ public class VertxServletOutputStream extends ServletOutputStream { public VertxServletOutputStream(VertxReactiveRequestContext context) { this.context = context; this.request = context.getContext().request(); - this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); + this.appendBuffer = AppendBuffer.withMinChunks(context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); request.response().exceptionHandler(new Handler() { @Override