Skip to content

Commit

Permalink
Implement async data package
Browse files Browse the repository at this point in the history
  • Loading branch information
antas-marcin committed Nov 9, 2024
1 parent 09f75ca commit e2842c0
Show file tree
Hide file tree
Showing 18 changed files with 1,439 additions and 133 deletions.
8 changes: 8 additions & 0 deletions src/main/java/io/weaviate/client/base/AsyncBaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ protected Future<Result<T>> sendPutRequest(String endpoint, Object payload, Futu
return sendRequest(endpoint, payload, "PUT", null, callback, parser);
}

protected Future<Result<T>> sendPatchRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "PATCH", classOfT, callback, null);
}

protected Future<Result<T>> sendPatchRequest(String endpoint, Object payload, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, payload, "PATCH", null, callback, parser);
}

protected Future<Result<T>> sendDeleteRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "DELETE", classOfT, callback, null);
}
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/WeaviateAsyncClient.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
package io.weaviate.client.v1.async;

import io.weaviate.client.Config;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.http.async.AsyncHttpClient;
import io.weaviate.client.base.util.DbVersionProvider;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.v1.async.data.Data;
import io.weaviate.client.v1.async.misc.Misc;
import io.weaviate.client.v1.async.schema.Schema;
import io.weaviate.client.v1.misc.model.Meta;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.io.CloseMode;

public class WeaviateAsyncClient implements AutoCloseable {
private final Config config;
private final CloseableHttpAsyncClient client;
private final DbVersionSupport dbVersionSupport;

public WeaviateAsyncClient(Config config) {
this.config = config;
this.client = AsyncHttpClient.create(config);
// auto start the client
this.start();
// init the db version provider and get the version info
this.dbVersionSupport = new DbVersionSupport(initDbVersionProvider());
}

public Misc misc() {
Expand All @@ -26,6 +36,28 @@ public Schema schema() {
return new Schema(client, config);
}

public Data data() {
return new Data(client, config, dbVersionSupport);
}

private DbVersionProvider initDbVersionProvider() {
DbVersionProvider.VersionGetter getter = () ->
Optional.ofNullable(this.getMeta())
.filter(result -> !result.hasErrors())
.map(result -> result.getResult().getVersion());

return new DbVersionProvider(getter);
}

private Result<Meta> getMeta() {
try {
return new Misc(client, config).metaGetter().run().get();
} catch (InterruptedException | ExecutionException e) {
// we can't connect to Weaviate, metaResult will be null
return null;
}
}

private void start() {
this.client.start();
}
Expand Down
74 changes: 74 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/data/Data.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.weaviate.client.v1.async.data;

import io.weaviate.client.Config;
import io.weaviate.client.base.util.BeaconPath;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.v1.async.data.api.ObjectCreator;
import io.weaviate.client.v1.async.data.api.ObjectDeleter;
import io.weaviate.client.v1.async.data.api.ObjectUpdater;
import io.weaviate.client.v1.async.data.api.ObjectValidator;
import io.weaviate.client.v1.async.data.api.ObjectsChecker;
import io.weaviate.client.v1.async.data.api.ObjectsGetter;
import io.weaviate.client.v1.async.data.api.ReferenceCreator;
import io.weaviate.client.v1.async.data.api.ReferenceDeleter;
import io.weaviate.client.v1.async.data.api.ReferenceReplacer;
import io.weaviate.client.v1.data.builder.ReferencePayloadBuilder;
import io.weaviate.client.v1.data.util.ObjectsPath;
import io.weaviate.client.v1.data.util.ReferencesPath;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

public class Data {
private final CloseableHttpAsyncClient client;
private final Config config;
private final ObjectsPath objectsPath;
private final ReferencesPath referencesPath;
private final BeaconPath beaconPath;

public Data(CloseableHttpAsyncClient client, Config config, DbVersionSupport dbVersionSupport) {
this.client = client;
this.config = config;
this.objectsPath = new ObjectsPath(dbVersionSupport);
this.referencesPath = new ReferencesPath(dbVersionSupport);
this.beaconPath = new BeaconPath(dbVersionSupport);
}

public ObjectCreator creator() {
return new ObjectCreator(client, config, objectsPath);
}

public ObjectsGetter objectsGetter() {
return new ObjectsGetter(client, config, objectsPath);
}

public ObjectsChecker checker() {
return new ObjectsChecker(client, config, objectsPath);
}

public ObjectDeleter deleter() {
return new ObjectDeleter(client, config, objectsPath);
}

public ObjectUpdater updater() {
return new ObjectUpdater(client, config, objectsPath);
}

public ObjectValidator validator() {
return new ObjectValidator(client, config);
}

public ReferencePayloadBuilder referencePayloadBuilder() {
return new ReferencePayloadBuilder(beaconPath);
}

public ReferenceCreator referenceCreator() {
return new ReferenceCreator(client, config, referencesPath);
}

public ReferenceReplacer referenceReplacer() {
return new ReferenceReplacer(client, config, referencesPath);
}

public ReferenceDeleter referenceDeleter() {
return new ReferenceDeleter(client, config, referencesPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.weaviate.client.v1.async.data.api;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.data.model.WeaviateObject;
import io.weaviate.client.v1.data.util.ObjectsPath;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

public class ObjectCreator extends AsyncBaseClient<WeaviateObject> implements AsyncClientResult<WeaviateObject> {
private final ObjectsPath objectsPath;
private String id;
private String className;
private String consistencyLevel;
private String tenant;
private Map<String, Object> properties;
private Float[] vector;
private Map<String, Float[]> vectors;

public ObjectCreator(CloseableHttpAsyncClient client, Config config, ObjectsPath objectsPath) {
super(client, config);
this.objectsPath = objectsPath;
}

public ObjectCreator withID(String id) {
this.id = id;
return this;
}

public ObjectCreator withClassName(String className) {
this.className = className;
return this;
}

public ObjectCreator withConsistencyLevel(String consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

public ObjectCreator withTenant(String tenant) {
this.tenant = tenant;
return this;
}

public ObjectCreator withProperties(Map<String, Object> properties) {
this.properties = properties;
return this;
}

public ObjectCreator withVector(Float[] vector) {
this.vector = vector;
return this;
}

public ObjectCreator withVectors(Map<String, Float[]> vectors) {
this.vectors = vectors;
return this;
}

private String getID() {
if (StringUtils.isEmpty(id)) {
return UUID.randomUUID().toString();
}
return id;
}

@Override
public Future<Result<WeaviateObject>> run() {
return run(null);
}

@Override
public Future<Result<WeaviateObject>> run(FutureCallback<Result<WeaviateObject>> callback) {
String path = objectsPath.buildCreate(ObjectsPath.Params.builder()
.consistencyLevel(consistencyLevel)
.build());
WeaviateObject obj = WeaviateObject.builder()
.className(className)
.properties(properties)
.vector(vector)
.vectors(vectors)
.id(getID())
.tenant(tenant)
.build();
return sendPostRequest(path, obj, WeaviateObject.class, callback);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.weaviate.client.v1.async.data.api;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Response;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.WeaviateErrorMessage;
import io.weaviate.client.base.WeaviateErrorResponse;
import io.weaviate.client.base.http.async.ResponseParser;
import io.weaviate.client.v1.data.util.ObjectsPath;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpResponse;

public class ObjectDeleter extends AsyncBaseClient<Boolean> implements AsyncClientResult<Boolean> {
private final ObjectsPath objectsPath;
private String id;
private String className;
private String consistencyLevel;
private String tenant;

public ObjectDeleter(CloseableHttpAsyncClient client, Config config, ObjectsPath objectsPath) {
super(client, config);
this.objectsPath = objectsPath;
}

public ObjectDeleter withID(String id) {
this.id = id;
return this;
}

public ObjectDeleter withClassName(String className) {
this.className = className;
return this;
}

public ObjectDeleter withConsistencyLevel(String consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

public ObjectDeleter withTenant(String tenant) {
this.tenant = tenant;
return this;
}

@Override
public Future<Result<Boolean>> run() {
return run(null);
}

@Override
public Future<Result<Boolean>> run(FutureCallback<Result<Boolean>> callback) {
if (StringUtils.isEmpty(id)) {
WeaviateErrorMessage errorMessage = WeaviateErrorMessage.builder()
.message("id cannot be empty").build();
WeaviateErrorResponse errors = WeaviateErrorResponse.builder()
.error(Stream.of(errorMessage).collect(Collectors.toList())).build();
return CompletableFuture.completedFuture(new Result<>(500, false, errors));
}
String path = objectsPath.buildDelete(ObjectsPath.Params.builder()
.id(id)
.className(className)
.consistencyLevel(consistencyLevel)
.tenant(tenant)
.build());
return sendDeleteRequest(path, null, callback, new ResponseParser<Boolean>() {
@Override
public Result<Boolean> parse(HttpResponse response, String body, ContentType contentType) {
Response<String> resp = serializer.toResponse(response.getCode(), body, String.class);
return new Result<>(resp.getStatusCode(), resp.getStatusCode() == 204, resp.getErrors());
}
});
}
}
Loading

0 comments on commit e2842c0

Please sign in to comment.