Skip to content

Commit

Permalink
Merge pull request #317 from weaviate/java-async-client-poc
Browse files Browse the repository at this point in the history
Async client PoC
  • Loading branch information
antas-marcin authored Nov 8, 2024
2 parents 5ce3193 + c0fc6d1 commit 09f75ca
Show file tree
Hide file tree
Showing 39 changed files with 2,116 additions and 162 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/weaviate/client/WeaviateClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.weaviate.client.base.util.DbVersionProvider;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.base.util.GrpcVersionSupport;
import io.weaviate.client.v1.async.WeaviateAsyncClient;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.Backup;
import io.weaviate.client.v1.batch.Batch;
Expand Down Expand Up @@ -44,6 +45,10 @@ public WeaviateClient(Config config, HttpClient httpClient, AccessTokenProvider
this.tokenProvider = tokenProvider;
}

public WeaviateAsyncClient async() {
return new WeaviateAsyncClient(config);
}

public Misc misc() {
return new Misc(httpClient, config, dbVersionProvider);
}
Expand Down
75 changes: 75 additions & 0 deletions src/main/java/io/weaviate/client/base/AsyncBaseClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.weaviate.client.base;

import io.weaviate.client.Config;
import io.weaviate.client.base.http.async.ResponseParser;
import io.weaviate.client.base.http.async.WeaviateResponseConsumer;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHeaders;

public abstract class AsyncBaseClient<T> {
private final CloseableHttpAsyncClient client;
private final Config config;
private final Serializer serializer;

public AsyncBaseClient(CloseableHttpAsyncClient client, Config config) {
this.client = client;
this.config = config;
this.serializer = new Serializer();
}

protected Future<Result<T>> sendGetRequest(String endpoint, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, null, "GET", classOfT, callback, null);
}

protected Future<Result<T>> sendGetRequest(String endpoint, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, null, "GET", null, callback, parser);
}

protected Future<Result<T>> sendPostRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "POST", classOfT, callback, null);
}

protected Future<Result<T>> sendPostRequest(String endpoint, Object payload, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, payload, "POST", null, callback, parser);
}

protected Future<Result<T>> sendPutRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "PUT", classOfT, callback, null);
}

protected Future<Result<T>> sendPutRequest(String endpoint, Object payload, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, payload, "PUT", null, callback, parser);
}

protected Future<Result<T>> sendDeleteRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "DELETE", classOfT, callback, null);
}

protected Future<Result<T>> sendDeleteRequest(String endpoint, Object payload, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, payload, "DELETE", null, callback, parser);
}

protected Future<Result<T>> sendHeadRequest(String endpoint, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, null, "HEAD", classOfT, callback, null);
}

protected Future<Result<T>> sendHeadRequest(String endpoint, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, null, "HEAD", null, callback, parser);
}

private Future<Result<T>> sendRequest(String endpoint, Object payload, String method, Class<T> classOfT, FutureCallback<Result<T>> callback,
ResponseParser<T> parser) {
SimpleHttpRequest req = new SimpleHttpRequest(method, String.format("%s%s", config.getBaseURL(), endpoint));
req.addHeader(HttpHeaders.ACCEPT, "*/*");
req.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
if (payload != null) {
req.setBody(serializer.toJsonString(payload), ContentType.APPLICATION_JSON);
}
return client.execute(SimpleRequestProducer.create(req), new WeaviateResponseConsumer<>(classOfT, parser), callback);
}
}
9 changes: 9 additions & 0 deletions src/main/java/io/weaviate/client/base/AsyncClientResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.weaviate.client.base;

import java.util.concurrent.Future;
import org.apache.hc.core5.concurrent.FutureCallback;

public interface AsyncClientResult<T> {
Future<Result<T>> run();
Future<Result<T>> run(FutureCallback<Result<T>> callback);
}
27 changes: 2 additions & 25 deletions src/main/java/io/weaviate/client/base/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import io.weaviate.client.Config;
import io.weaviate.client.base.http.HttpClient;
import io.weaviate.client.base.http.HttpResponse;
import io.weaviate.client.v1.graphql.GraphQL;
import io.weaviate.client.v1.graphql.model.GraphQLResponse;
import java.util.Collections;
import java.util.List;

public abstract class BaseClient<T> {
private final HttpClient client;
Expand Down Expand Up @@ -53,12 +50,7 @@ private Response<T> sendRequest(String endpoint, Object payload, String method,

if (statusCode < 399) {
T body = toResponse(responseBody, classOfT);
WeaviateErrorResponse errors = null;

if (body != null && classOfT.equals(GraphQL.class)) {
errors = getWeaviateGraphQLErrorResponse((GraphQLResponse) body, statusCode);
}
return new Response<>(statusCode, body, errors);
return new Response<>(statusCode, body, null);
}

WeaviateErrorResponse error = toResponse(responseBody, WeaviateErrorResponse.class);
Expand Down Expand Up @@ -89,7 +81,7 @@ private HttpResponse sendHttpRequest(String address, String json, String method)
}

private <C> C toResponse(String response, Class<C> classOfT) {
return serializer.toResponse(response, classOfT);
return serializer.toObject(response, classOfT);
}

private String toJsonString(Object object) {
Expand All @@ -100,19 +92,4 @@ private WeaviateErrorResponse getWeaviateErrorResponse(Exception e) {
WeaviateErrorMessage error = WeaviateErrorMessage.builder().message(e.getMessage()).throwable(e).build();
return WeaviateErrorResponse.builder().error(Collections.singletonList(error)).build();
}

/**
* Extract errors from {@link WeaviateErrorResponse} from a GraphQL response body.
*
* @param gql GraphQL response body.
* @param code HTTP status code to pass in the {@link WeaviateErrorResponse}.
* @return Error response to be returned to the caller.
*/
private WeaviateErrorResponse getWeaviateGraphQLErrorResponse(GraphQLResponse gql, int code) {
List<WeaviateErrorMessage> messages = gql.errorMessages();
if (messages == null || messages.isEmpty()) {
return null;
}
return WeaviateErrorResponse.builder().code(code).error(gql.errorMessages()).build();
}
}
29 changes: 27 additions & 2 deletions src/main/java/io/weaviate/client/base/Response.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
package io.weaviate.client.base;

import io.weaviate.client.v1.graphql.model.GraphQLResponse;
import java.util.List;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.FieldDefaults;

@Getter
@AllArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class Response<T> {
int statusCode;
T body;
WeaviateErrorResponse errors;

public Response(int statusCode, T body, WeaviateErrorResponse errors) {
this.statusCode = statusCode;
this.body = body;
if (body instanceof GraphQLResponse) {
this.errors = getWeaviateGraphQLErrorResponse((GraphQLResponse) body, statusCode);;
} else {
this.errors = errors;
}
}

/**
* Extract errors from {@link WeaviateErrorResponse} from a GraphQL response body.
*
* @param gql GraphQL response body.
* @param code HTTP status code to pass in the {@link WeaviateErrorResponse}.
* @return Error response to be returned to the caller.
*/
private WeaviateErrorResponse getWeaviateGraphQLErrorResponse(GraphQLResponse gql, int code) {
List<WeaviateErrorMessage> messages = gql.errorMessages();
if (messages == null || messages.isEmpty()) {
return null;
}
return WeaviateErrorResponse.builder().code(code).error(gql.errorMessages()).build();
}
}
21 changes: 20 additions & 1 deletion src/main/java/io/weaviate/client/base/Serializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,30 @@ public Serializer() {
this.gson = new GsonBuilder().disableHtmlEscaping().create();
}

public <C> C toResponse(String response, Class<C> classOfT) {
public <T> T toObject(String response, Class<T> classOfT) {
return gson.fromJson(response, classOfT);
}

public String toJsonString(Object object) {
return (object != null) ? gson.toJson(object) : null;
}

public <T> Result<T> toResult(int statusCode, String body, Class<T> classOfT) {
if (statusCode < 399) {
return new Result<>(toResponse(statusCode, body, classOfT));
}
return new Result<>(statusCode, null, toWeaviateError(body));
}

public <T> Response<T> toResponse(int statusCode, String body, Class<T> classOfT) {
if (statusCode < 399) {
T obj = toObject(body, classOfT);
return new Response<>(statusCode, obj, null);
}
return new Response<>(statusCode, null, toWeaviateError(body));
}

public WeaviateErrorResponse toWeaviateError(String body) {
return toObject(body, WeaviateErrorResponse.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.weaviate.client.base.http.async;

import io.weaviate.client.Config;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;

public class AsyncHttpClient {

public static CloseableHttpAsyncClient create(Config config) {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(Timeout.ofSeconds(config.getSocketTimeout()))
.build();

return HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.weaviate.client.base.http.async;

import io.weaviate.client.base.Result;
import io.weaviate.client.base.Serializer;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpResponse;

public abstract class ResponseParser<T> {
protected final Serializer serializer;

public ResponseParser() {
this.serializer = new Serializer();
}

public abstract Result<T> parse(HttpResponse response, String body, ContentType contentType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.weaviate.client.base.http.async;

import io.weaviate.client.base.Result;
import io.weaviate.client.base.Serializer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;

public class WeaviateResponseConsumer<T> extends AbstractAsyncResponseConsumer<Result<T>, byte[]> {
private final Serializer serializer;
private final Class<T> classOfT;
private final ResponseParser<T> parser;

public WeaviateResponseConsumer(Class<T> classOfT, ResponseParser<T> parser) {
super(new BasicAsyncEntityConsumer());
this.serializer = new Serializer();
this.classOfT = classOfT;
this.parser = parser;
}

@Override
protected Result<T> buildResult(HttpResponse response, byte[] entity, ContentType contentType) {
String body = new String(entity, StandardCharsets.UTF_8);
if (this.parser != null) {
return this.parser.parse(response, body, contentType);
}
return serializer.toResult(response.getCode(), body, classOfT);
}

@Override
public void informationResponse(HttpResponse response, HttpContext context) throws HttpException, IOException {
}
}
37 changes: 37 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/WeaviateAsyncClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.weaviate.client.v1.async;

import io.weaviate.client.Config;
import io.weaviate.client.base.http.async.AsyncHttpClient;
import io.weaviate.client.v1.async.misc.Misc;
import io.weaviate.client.v1.async.schema.Schema;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.io.CloseMode;

public class WeaviateAsyncClient implements AutoCloseable {
private final Config config;
private final CloseableHttpAsyncClient client;

public WeaviateAsyncClient(Config config) {
this.config = config;
this.client = AsyncHttpClient.create(config);
// auto start the client
this.start();
}

public Misc misc() {
return new Misc(client, config);
}

public Schema schema() {
return new Schema(client, config);
}

private void start() {
this.client.start();
}

@Override
public void close() {
this.client.close(CloseMode.GRACEFUL);
}
}
34 changes: 34 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/misc/Misc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.weaviate.client.v1.async.misc;

import io.weaviate.client.Config;
import io.weaviate.client.v1.async.misc.api.LiveChecker;
import io.weaviate.client.v1.async.misc.api.MetaGetter;
import io.weaviate.client.v1.async.misc.api.OpenIDConfigGetter;
import io.weaviate.client.v1.async.misc.api.ReadyChecker;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

public class Misc {
private final CloseableHttpAsyncClient client;
private final Config config;

public Misc(CloseableHttpAsyncClient client, Config config) {
this.client = client;
this.config = config;
}

public MetaGetter metaGetter() {
return new MetaGetter(client, config);
}

public OpenIDConfigGetter openIDConfigGetter() {
return new OpenIDConfigGetter(client, config);
}

public LiveChecker liveChecker() {
return new LiveChecker(client, config);
}

public ReadyChecker readyChecker() {
return new ReadyChecker(client, config);
}
}
Loading

0 comments on commit 09f75ca

Please sign in to comment.