Skip to content

Commit

Permalink
Regenerate Quarkus classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed May 6, 2024
1 parent a66b9d8 commit b83c9aa
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.quarkiverse.cxf.transport.generated;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.ArrayDeque;
import java.util.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import io.vertx.core.buffer.impl.VertxByteBufAllocator;

/**
* Adapted by sync-quarkus-classes.groovy from
Expand All @@ -19,7 +24,26 @@
*/
final class AppendBuffer {

private final ByteBufAllocator allocator;
private static final MethodHandle virtualMh = PlatformDependent.javaVersion() >= 21 ? findVirtualMH() : null;

private static MethodHandle findVirtualMH() {
try {
return MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", MethodType.methodType(boolean.class));
} catch (Exception e) {
return null;
}
}

private static boolean isVirtualThread() {
if (virtualMh == null) {
return false;
}
try {
return (boolean) virtualMh.invokeExact(Thread.currentThread());
} catch (Throwable t) {
return false;
}
}

private final int minChunkSize;

Expand All @@ -31,34 +55,36 @@ final class AppendBuffer {

private int size;

private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) {
this.allocator = allocator;
private boolean anyHeap;

private AppendBuffer(int minChunkSize, int capacity) {
this.minChunkSize = Math.min(minChunkSize, capacity);
this.capacity = capacity;
this.anyHeap = false;
}

/**
* This buffer append data in a single eagerly allocated {@link ByteBuf}.
*/
public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, capacity, capacity);
public static AppendBuffer eager(int capacity) {
return new AppendBuffer(capacity, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, 0, capacity);
public static AppendBuffer exact(int capacity) {
return new AppendBuffer(0, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or
* as each {@code len}, if greater than it.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) {
return new AppendBuffer(allocator, minChunkSize, capacity);
public static AppendBuffer withMinChunks(int minChunkSize, int capacity) {
return new AppendBuffer(minChunkSize, capacity);
}

private ByteBuf lastBuffer() {
Expand Down Expand Up @@ -111,7 +137,14 @@ public int append(byte[] bytes, int off, int len) {
} else {
chunkCapacity = toWrite;
}
var tmpBuf = allocator.directBuffer(chunkCapacity);
boolean isVirtualThread = isVirtualThread();
final ByteBuf tmpBuf;
if (isVirtualThread) {
// VertxByteBufAllocator allocates cheaper heap buffers ie which doesn't use reference counting
tmpBuf = VertxByteBufAllocator.DEFAULT.heapBuffer(chunkCapacity);
} else {
tmpBuf = PooledByteBufAllocator.DEFAULT.directBuffer(chunkCapacity);
}
try {
tmpBuf.writeBytes(bytes, off, toWrite);
} catch (Throwable t) {
Expand All @@ -133,6 +166,9 @@ public int append(byte[] bytes, int off, int len) {
throw t;
}
}
if (isVirtualThread) {
anyHeap = true;
}
size += toWrite;
return toWrite + alreadyWritten;
}
Expand All @@ -159,6 +195,7 @@ public ByteBuf clear() {
if (others == null || others.isEmpty()) {
size = 0;
buffer = null;
anyHeap = false;
// super fast-path
return firstBuf;
}
Expand All @@ -168,10 +205,17 @@ public ByteBuf clear() {
private CompositeByteBuf clearBuffers() {
var firstBuf = buffer;
var others = otherBuffers;
var batch = allocator.compositeDirectBuffer(1 + others.size());
CompositeByteBuf batch;
if (anyHeap) {
batch = new CompositeByteBuf(VertxByteBufAllocator.UNPOOLED_ALLOCATOR, false, 1 + others.size());
} else {
// This should be the allocator picked by Netty
batch = PooledByteBufAllocator.DEFAULT.compositeBuffer(1 + others.size());
}
try {
buffer = null;
size = 0;
anyHeap = false;
batch.addComponent(true, 0, firstBuf);
for (int i = 0, othersCount = others.size(); i < othersCount; i++) {
// if addComponent fail, it takes care of releasing curr and throwing the exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import jakarta.servlet.WriteListener;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.quarkiverse.cxf.transport.VertxReactiveRequestContext;
import io.quarkus.vertx.core.runtime.VertxBufferImpl;
Expand Down Expand Up @@ -50,7 +49,7 @@ public class VertxServletOutputStream extends ServletOutputStream {
public VertxServletOutputStream(VertxReactiveRequestContext context) {
this.context = context;
this.request = context.getContext().request();
this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize());
this.appendBuffer = AppendBuffer.withMinChunks(context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize());
request.response().exceptionHandler(new Handler<Throwable>() {

@Override
Expand Down

0 comments on commit b83c9aa

Please sign in to comment.