Skip to content

Commit

Permalink
VertxHttpClientHTTPConduit asynchronous mode, fix #1447
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Nov 16, 2024
1 parent cf28c8c commit 7984ff1
Show file tree
Hide file tree
Showing 30 changed files with 2,190 additions and 122 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testModule: ['client', 'client-server', 'fastinfoset', 'hc5', 'metrics', 'mtls', 'mtls -Djks', 'mtom', 'mtom-awt', 'opentelemetry', 'saaj', 'santuario-xmlsec', 'server', 'ws-rm-client', 'ws-security', 'ws-security -Djks', 'ws-security-policy', 'ws-security-policy -Djks', 'ws-trust', 'wsdl2java', 'wsdl2java-no-config']
testModule: ['async-vertx-client', 'client', 'client-server', 'fastinfoset', 'hc5', 'metrics', 'mtls', 'mtls -Djks', 'mtom', 'mtom-awt', 'opentelemetry', 'saaj', 'santuario-xmlsec', 'server', 'ws-rm-client', 'ws-security', 'ws-security -Djks', 'ws-security-policy', 'ws-security-policy -Djks', 'ws-trust', 'wsdl2java', 'wsdl2java-no-config']
name: ${{matrix.testModule}} native tests
needs: build-and-run-jvm-tests
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.apache.cxf.endpoint.Client;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.apache.cxf.wsdl11.CatalogWSDLLocator;
import org.apache.cxf.wsdl11.WSDLManagerImpl;
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
import org.jboss.jandex.AnnotationValue;
Expand Down Expand Up @@ -507,6 +509,19 @@ void customizers(
}
}

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
void workaroundAsyncWsdlInit(
CXFRecorder recorder,
BuildProducer<RuntimeBusCustomizerBuildItem> customizers) {
customizers.produce(new RuntimeBusCustomizerBuildItem(recorder.setQuarkusWSDLManager()));
}

@BuildStep
ReflectiveClassBuildItem workaroundAsyncWsdlInit() {
return ReflectiveClassBuildItem.builder(WSDLManagerImpl.class, CatalogWSDLLocator.class).fields().build();
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void workaroundBadForceURLConnectionInit(CXFRecorder recorder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ void verticeDeploy() throws IOException, InterruptedException, ExecutionExceptio
vertx.undeploy(deplId).toCompletionStage().toCompletableFuture().get();
}
/* Make sure the server is down */
Assertions.assertThatThrownBy(() -> helloVertice.hello("Doe"))
.rootCause()
.hasMessageContaining("Connection refused")
.isInstanceOf(java.net.ConnectException.class);
assertServerDown();

/* Put the valid stores aside */
Files.move(localHostKs, localHostKsCp, StandardCopyOption.REPLACE_EXISTING);
Expand Down Expand Up @@ -168,10 +165,7 @@ void verticeDeploy() throws IOException, InterruptedException, ExecutionExceptio

}
/* Make sure the server is down */
Assertions.assertThatThrownBy(() -> helloVertice.hello("Doe"))
.rootCause()
.hasMessageContaining("Connection refused")
.isInstanceOf(java.net.ConnectException.class);
assertServerDown();

/* Revert everything back */
Files.move(localHostKsCp, localHostKs, StandardCopyOption.REPLACE_EXISTING);
Expand All @@ -194,13 +188,21 @@ void verticeDeploy() throws IOException, InterruptedException, ExecutionExceptio

}
/* Make sure the server is down */
Assertions.assertThatThrownBy(() -> helloVertice.hello("Doe"))
.rootCause()
.hasMessageContaining("Connection refused")
.isInstanceOf(java.net.ConnectException.class);
assertServerDown();

}

private void assertServerDown() {
Awaitility.await().atMost(3000, TimeUnit.SECONDS).until(() -> {
try {
helloVertice.hello("Doe");
return false;
} catch (Exception e) {
return rootCause(e).getMessage().startsWith("Connection refused"); // There is some suffix on Windows
}
});
}

@Test
void simple() throws IOException, InterruptedException, ExecutionException {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package io.quarkiverse.cxf.deployment.test;

import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.jws.WebMethod;
import jakarta.jws.WebService;

import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.cxf.annotation.CXFClient;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;

public class Client3xx4xx5xxTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloService.class, HelloServiceImpl.class))

/* Service */
.overrideConfigKey("quarkus.cxf.endpoint.\"/hello\".implementor",
HelloServiceImpl.class.getName())
.overrideConfigKey("quarkus.cxf.endpoint.\"/hello\".logging.enabled", "true")

/* Clients */
.overrideConfigKey("quarkus.cxf.client.wsdlUri200.client-endpoint-url", "http://localhost:8081/services/hello")
.overrideConfigKey("quarkus.cxf.client.wsdlUri200.wsdl", "http://localhost:8081/services/hello?wsdl")
// Not needed when the WSDL is set and HelloService has both serviceName and targetNamespace set
//.overrideConfigKey("quarkus.cxf.client.wsdlUri404.service-interface", HelloService.class.getName())
.overrideConfigKey("quarkus.cxf.client.wsdlUri200.logging.enabled", "true")

/* Bad WSDL URI */
.overrideConfigKey("quarkus.cxf.client.wsdlUri404.client-endpoint-url", "http://localhost:8081/services/hello")
.overrideConfigKey("quarkus.cxf.client.wsdlUri404.wsdl", "http://localhost:8081/services/no-such-service?wsdl")
.overrideConfigKey("quarkus.cxf.client.wsdlUri404.logging.enabled", "true")

/* Bad service endpoint URI */
.overrideConfigKey("quarkus.cxf.client.endpointUri404.client-endpoint-url",
"http://localhost:8081/services/no-such-service")
.overrideConfigKey("quarkus.cxf.client.endpointUri404.service-interface", HelloService.class.getName())
.overrideConfigKey("quarkus.cxf.client.endpointUri404.logging.enabled", "true");

@CXFClient("wsdlUri200")
// Use Instance to avoid greedy initialization
Instance<HelloService> wsdlUri200;

@CXFClient("wsdlUri404")
Instance<HelloService> wsdlUri404;

@CXFClient("endpointUri404")
Instance<HelloService> endpointUri404;

Instance<HelloService> getClient(String clientName) {
switch (clientName) {
case "wsdlUri200": {
return wsdlUri200;
}
case "wsdlUri404": {
return wsdlUri404;
}
case "endpointUri404": {
return endpointUri404;
}
default:
throw new IllegalArgumentException("Unexpected client name: " + clientName);
}
}

@Test
void wsdlUri200() {
Assertions.assertThat(wsdlUri200.get().hello("foo")).isEqualTo("Hello foo");
}

@Test
void wsdlUri404() {
Assertions.assertThatThrownBy(() -> wsdlUri404.get().hello("foo"))
.hasRootCauseInstanceOf(org.apache.cxf.transport.http.HTTPException.class)
.hasRootCauseMessage(
"HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service?wsdl");
}

@Test
void endpointUri404() {
Assertions.assertThatThrownBy(() -> endpointUri404.get().hello("foo")).hasRootCauseMessage(
"HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service");
}

public void init(@Observes Router router) {
router.route().handler(BodyHandler.create());
router.post("/vertx-blocking/:client").blockingHandler(ctx -> {
final String person = ctx.body().asString();
final String resp = getClient(ctx.pathParam("client")).get().hello(person);
ctx.response().end(resp);
});
router.post("/vertx/:client").handler(ctx -> {
final String person = ctx.body().asString();
try {
final String resp = getClient(ctx.pathParam("client")).get().hello(person);
ctx.response().end(resp);
} catch (Exception e) {
Throwable r = rootCause(e);
ctx.response().setStatusCode(500).end(r.getClass().getName() + " " + r.getMessage());
}
});
}

@Test
void wsdlUri200OnWorkerThread() {
RestAssured.given()
.body("Joe")
.post("http://localhost:8081/vertx-blocking/wsdlUri200")
.then()
.statusCode(200)
.body(Matchers.is("Hello Joe"));
}

@Test
void wsdlUri200OnEventLoop() throws InterruptedException {
final Client client = ClientProxy.getClient(wsdlUri200.get());
if (client.getConduit() instanceof URLConnectionHTTPConduit) {
/* URLConnectionHTTPConduit is not as picky as VertxHttpClientHTTPConduit */
RestAssured.given()
.body("Joe")
.post("http://localhost:8081/vertx/wsdlUri200")
.then()
.statusCode(200)
.body(Matchers.is("Hello Joe"));
} else {
/* VertxHttpClientHTTPConduit */
RestAssured.given()
.body("Joe")
.post("http://localhost:8081/vertx/wsdlUri200")
.then()
.statusCode(500)
.body(CoreMatchers.containsString(
"java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread."));
}

}

@Test
void endpointUri404OnWorkerThread() {
RestAssured.given()
.body("Joe")
.post("http://localhost:8081/vertx-blocking/endpointUri404")
.then()
.statusCode(500)
.body(CoreMatchers.containsString(
"org.apache.cxf.transport.http.HTTPException: HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service"));
}

@Test
void endpointUri404OnEventLoop() throws InterruptedException {
final Client client = ClientProxy.getClient(endpointUri404.get());
if (client.getConduit() instanceof URLConnectionHTTPConduit) {
/* URLConnectionHTTPConduit is not as picky as VertxHttpClientHTTPConduit */
RestAssured.given()
.body("Joe")
.post("http://localhost:8081/vertx/endpointUri404")
.then()
.statusCode(500)
.body(CoreMatchers.containsString(
"org.apache.cxf.transport.http.HTTPException HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service"));
} else {
/* VertxHttpClientHTTPConduit */
RestAssured.given()
.body("Joe")
.post("http://localhost:8081/vertx/endpointUri404")
.then()
.statusCode(500)
.body(CoreMatchers.containsString(
"java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread."));

}

}

private static Throwable rootCause(Exception e) {
e.printStackTrace();
Throwable result = e;
while (result.getCause() != null) {
result = result.getCause();
}
return result;
}

@WebService(serviceName = "HelloService", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/")
public interface HelloService {

@WebMethod
String hello(String person);

}

@WebService(serviceName = "HelloService")
public static class HelloServiceImpl implements HelloService {

@Override
public String hello(String person) {
return "Hello " + person;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

import org.apache.cxf.Bus;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.apache.cxf.wsdl.WSDLManager;
import org.jboss.logging.Logger;

import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Supplier;
import io.quarkiverse.cxf.annotation.CXFEndpoint;
import io.quarkiverse.cxf.transport.CxfHandler;
import io.quarkiverse.cxf.transport.VertxDestinationFactory;
import io.quarkiverse.cxf.wsdl.QuarkusWSDLManager;
import io.quarkus.arc.Arc;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.runtime.RuntimeValue;
Expand Down Expand Up @@ -285,6 +287,10 @@ public RuntimeValue<Consumer<Bus>> setBusHTTPConduitFactory(HTTPConduitImpl fact
return new RuntimeValue<>(bus -> bus.setExtension(factory, HTTPConduitSpec.class));
}

public RuntimeValue<Consumer<Bus>> setQuarkusWSDLManager() {
return new RuntimeValue<>(bus -> bus.setExtension(QuarkusWSDLManager.newInstance(bus), WSDLManager.class));
}

public void workaroundBadForceURLConnectionInit() {
// A workaround for the bad initialization of HTTPTransportFactory.forceURLConnectionConduit
// in the downstream CXF 4.0.5.fuse-redhat-00012:
Expand Down
Loading

0 comments on commit 7984ff1

Please sign in to comment.