Skip to content

Commit

Permalink
Merge pull request #323 from weaviate/async_cluster
Browse files Browse the repository at this point in the history
feature: async support for cluster package
  • Loading branch information
antas-marcin authored Nov 12, 2024
2 parents bb5cfe3 + 87e0e04 commit 0997183
Show file tree
Hide file tree
Showing 9 changed files with 533 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.weaviate.client.base.http.async.AsyncHttpClient;
import io.weaviate.client.base.util.DbVersionProvider;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.v1.async.cluster.Cluster;
import io.weaviate.client.v1.async.data.Data;
import io.weaviate.client.v1.async.misc.Misc;
import io.weaviate.client.v1.async.schema.Schema;
Expand Down Expand Up @@ -40,6 +41,10 @@ public Data data() {
return new Data(client, config, dbVersionSupport);
}

public Cluster cluster() {
return new Cluster(client, config);
}

private DbVersionProvider initDbVersionProvider() {
DbVersionProvider.VersionGetter getter = () ->
Optional.ofNullable(this.getMeta())
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/cluster/Cluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.weaviate.client.v1.async.cluster;

import io.weaviate.client.Config;
import io.weaviate.client.v1.async.cluster.api.NodesStatusGetter;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

public class Cluster {

private final CloseableHttpAsyncClient client;
private final Config config;

public Cluster(CloseableHttpAsyncClient client, Config config) {
this.client = client;
this.config = config;
}

public NodesStatusGetter nodesStatusGetter() {
return new NodesStatusGetter(client, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.weaviate.client.v1.async.cluster.api;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.cluster.model.NodesStatusResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

import java.util.concurrent.Future;

public class NodesStatusGetter extends AsyncBaseClient<NodesStatusResponse>
implements AsyncClientResult<NodesStatusResponse> {

private String className;
private String output;

public NodesStatusGetter(CloseableHttpAsyncClient client, Config config) {
super(client, config);
}

public NodesStatusGetter withClassName(String className) {
this.className = className;
return this;
}

public NodesStatusGetter withOutput(String output) {
this.output = output;
return this;
}

@Override
public Future<Result<NodesStatusResponse>> run() {
return run(null);
}

@Override
public Future<Result<NodesStatusResponse>> run(FutureCallback<Result<NodesStatusResponse>> callback) {
return sendGetRequest(path(), NodesStatusResponse.class, callback);
}

private String path() {
String path = "/nodes";
if (StringUtils.isNotBlank(className)) {
path = String.format("%s/%s", path, UrlEncoder.encodePathParam(className));
}
if (StringUtils.isNotBlank(output)) {
path = String.format("%s?%s", path, UrlEncoder.encodeQueryParam("output", output));
}
return path;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.weaviate.integration.client.async.cluster;

import io.weaviate.client.Config;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.async.WeaviateAsyncClient;
import io.weaviate.client.v1.async.cluster.api.NodesStatusGetter;
import io.weaviate.client.v1.cluster.model.NodeStatusOutput;
import io.weaviate.client.v1.cluster.model.NodesStatusResponse;
import io.weaviate.integration.client.WeaviateDockerCompose;
import io.weaviate.integration.client.WeaviateTestGenerics;
import io.weaviate.integration.tests.cluster.ClusterMultiTenancyTestSuite;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ClientClusterMultiTenancyTest {

private WeaviateClient client;
private final WeaviateTestGenerics testGenerics = new WeaviateTestGenerics();

@ClassRule
public static WeaviateDockerCompose compose = new WeaviateDockerCompose();

@Before
public void before() {
Config config = new Config("http", compose.getHttpHostAddress());
client = new WeaviateClient(config);
}

@After
public void after() {
testGenerics.cleanupWeaviate(client);
}


@Test
public void shouldGetNodeStatusPerClass() throws InterruptedException {
Supplier<Result<NodesStatusResponse>> resultSupplierAll = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
);
Supplier<Result<NodesStatusResponse>> resultSupplierPizza = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
.withClassName("Pizza")
);
Supplier<Result<NodesStatusResponse>> resultSupplierSoup = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
.withClassName("Soup")
);

ClusterMultiTenancyTestSuite.testMultiTenancyDataPerClassOutputVerbose(resultSupplierAll, resultSupplierPizza, resultSupplierSoup,
testGenerics, client);
}

private Supplier<Result<NodesStatusResponse>> createSupplier(Consumer<NodesStatusGetter> configure) {
return () -> {
try (WeaviateAsyncClient asyncClient = client.async()) {
NodesStatusGetter nodesStatusGetter = asyncClient.cluster().nodesStatusGetter();
configure.accept(nodesStatusGetter);
return nodesStatusGetter.run().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.weaviate.integration.client.async.cluster;

import io.weaviate.client.Config;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.async.WeaviateAsyncClient;
import io.weaviate.client.v1.async.cluster.api.NodesStatusGetter;
import io.weaviate.client.v1.cluster.model.NodeStatusOutput;
import io.weaviate.client.v1.cluster.model.NodesStatusResponse;
import io.weaviate.integration.client.WeaviateDockerCompose;
import io.weaviate.integration.client.WeaviateTestGenerics;
import io.weaviate.integration.tests.cluster.ClusterTestSuite;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ClientClusterTest {

private WeaviateClient client;
private final WeaviateTestGenerics testGenerics = new WeaviateTestGenerics();

@ClassRule
public static WeaviateDockerCompose compose = new WeaviateDockerCompose();

@Before
public void before() {
Config config = new Config("http", compose.getHttpHostAddress());
client = new WeaviateClient(config);
}

@After
public void after() {
testGenerics.cleanupWeaviate(client);
}

@Test
public void testClusterNodesEndpointWithoutDataWithOutputVerbose() {
Supplier<Result<NodesStatusResponse>> resultSupplier = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
);

ClusterTestSuite.testNoDataOutputVerbose(resultSupplier);
}

@Test
public void testClusterNodesEndpointWithDataWithOutputVerbose() throws InterruptedException {
Supplier<Result<NodesStatusResponse>> resultSupplier = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
);

ClusterTestSuite.testDataOutputVerbose(resultSupplier, testGenerics, client);
}

@Test
public void shouldGetNodeStatusPerClassWithOutputVerbose() throws InterruptedException {
Supplier<Result<NodesStatusResponse>> resultSupplierAll = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
);
Supplier<Result<NodesStatusResponse>> resultSupplierPizza = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
.withClassName("Pizza")
);
Supplier<Result<NodesStatusResponse>> resultSupplierSoup = createSupplier(
nodesStatusGetter -> nodesStatusGetter
.withOutput(NodeStatusOutput.VERBOSE)
.withClassName("Soup")
);

ClusterTestSuite.testDataPerClassOutputVerbose(resultSupplierAll, resultSupplierPizza, resultSupplierSoup,
testGenerics, client);
}

@Test
public void testClusterNodesEndpointWithOutputMinimalImplicit() {
Supplier<Result<NodesStatusResponse>> resultSupplier = createSupplier(
nodesStatusGetter->{}
);

ClusterTestSuite.testNoDataOutputMinimalImplicit(resultSupplier);
}

private Supplier<Result<NodesStatusResponse>> createSupplier(Consumer<NodesStatusGetter> configure) {
return () -> {
try (WeaviateAsyncClient asyncClient = client.async()) {
NodesStatusGetter nodesStatusGetter = asyncClient.cluster().nodesStatusGetter();
configure.accept(nodesStatusGetter);
return nodesStatusGetter.run().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,18 @@
import io.weaviate.client.Config;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.util.TriConsumer;
import io.weaviate.client.v1.cluster.model.NodeStatusOutput;
import io.weaviate.client.v1.cluster.model.NodesStatusResponse;
import io.weaviate.client.v1.schema.model.Tenant;
import io.weaviate.integration.client.WeaviateDockerCompose;
import io.weaviate.integration.client.WeaviateTestGenerics;
import static io.weaviate.integration.client.WeaviateVersion.EXPECTED_WEAVIATE_GIT_HASH;
import static io.weaviate.integration.client.WeaviateVersion.EXPECTED_WEAVIATE_VERSION;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.ARRAY;
import io.weaviate.integration.tests.cluster.ClusterMultiTenancyTestSuite;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.function.Supplier;

public class ClientClusterMultiTenancyTest {

private WeaviateClient client;
Expand All @@ -31,9 +25,7 @@ public class ClientClusterMultiTenancyTest {

@Before
public void before() {
String httpHost = compose.getHttpHostAddress();
Config config = new Config("http", httpHost);

Config config = new Config("http", compose.getHttpHostAddress());
client = new WeaviateClient(config);
}

Expand All @@ -42,79 +34,21 @@ public void after() {
testGenerics.cleanupWeaviate(client);
}


@Test
public void shouldGetNodeStatusPerClass() throws InterruptedException {
Tenant[] pizzaTenants = new Tenant[] {
Tenant.builder().name("TenantPizza1").build(),
Tenant.builder().name("TenantPizza2").build(),
};
Tenant[] soupTenants = new Tenant[] {
Tenant.builder().name("TenantSoup1").build(),
Tenant.builder().name("TenantSoup2").build(),
Tenant.builder().name("TenantSoup3").build(),
};
String[] pizzaTenantNames = Arrays.stream(pizzaTenants).map(Tenant::getName).toArray(String[]::new);
String[] soupTenantNames = Arrays.stream(soupTenants).map(Tenant::getName).toArray(String[]::new);

List<String> pizzaIds = WeaviateTestGenerics.IDS_BY_CLASS.get("Pizza");
List<String> soupIds = WeaviateTestGenerics.IDS_BY_CLASS.get("Soup");
testGenerics.createSchemaPizzaForTenants(client);
testGenerics.createTenantsPizza(client, pizzaTenants);
testGenerics.createDataPizzaForTenants(client, pizzaTenantNames);
testGenerics.createSchemaSoupForTenants(client);
testGenerics.createTenantsSoup(client, soupTenants);
testGenerics.createDataSoupForTenants(client, soupTenantNames);
Thread.sleep(3000); // makes sure data are flushed so nodes endpoint returns actual object/shard count

Consumer<Result<NodesStatusResponse>> assertSingleNode = (Result<NodesStatusResponse> result) ->
assertThat(result).isNotNull()
.returns(false, Result::hasErrors)
.extracting(Result::getResult).isNotNull()
.extracting(NodesStatusResponse::getNodes).asInstanceOf(ARRAY)
.hasSize(1);

TriConsumer<NodesStatusResponse.NodeStatus, Long, Long> assertCounts = (NodesStatusResponse.NodeStatus nodeStatus, Long shardCount, Long objectCount) -> {
assertThat(nodeStatus.getName()).isNotBlank();
assertThat(nodeStatus)
.returns(EXPECTED_WEAVIATE_VERSION, NodesStatusResponse.NodeStatus::getVersion)
.returns(EXPECTED_WEAVIATE_GIT_HASH, NodesStatusResponse.NodeStatus::getGitHash)
.returns(NodesStatusResponse.Status.HEALTHY, NodesStatusResponse.NodeStatus::getStatus)
.extracting(NodesStatusResponse.NodeStatus::getStats)
.returns(shardCount, NodesStatusResponse.Stats::getShardCount)
.returns(objectCount, NodesStatusResponse.Stats::getObjectCount);
};

// ALL
Result<NodesStatusResponse> resultAll = client.cluster().nodesStatusGetter()
Supplier<Result<NodesStatusResponse>> resultSupplierAll = () -> client.cluster().nodesStatusGetter()
.withOutput(NodeStatusOutput.VERBOSE)
.run();

long expectedAllShardCount = pizzaTenants.length + soupTenants.length;
long expectedAllObjectsCount = pizzaTenants.length * pizzaIds.size() + soupTenants.length * soupIds.size();
assertSingleNode.accept(resultAll);
assertCounts.accept(resultAll.getResult().getNodes()[0], expectedAllShardCount, expectedAllObjectsCount);

// PIZZA
Result<NodesStatusResponse> resultPizza = client.cluster().nodesStatusGetter()
Supplier<Result<NodesStatusResponse>> resultSupplierPizza = () -> client.cluster().nodesStatusGetter()
.withOutput(NodeStatusOutput.VERBOSE)
.withClassName("Pizza")
.run();

long expectedPizzaShardCount = pizzaTenants.length;
long expectedPizzaObjectsCount = pizzaTenants.length * pizzaIds.size();
assertSingleNode.accept(resultPizza);
assertCounts.accept(resultPizza.getResult().getNodes()[0], expectedPizzaShardCount, expectedPizzaObjectsCount);

// SOUP
Result<NodesStatusResponse> resultSoup = client.cluster().nodesStatusGetter()
Supplier<Result<NodesStatusResponse>> resultSupplierSoup = () -> client.cluster().nodesStatusGetter()
.withOutput(NodeStatusOutput.VERBOSE)
.withClassName("Soup")
.run();

long expectedSoupShardCount = soupTenants.length;
long expectedSoupObjectsCount = soupTenants.length * soupIds.size();
assertSingleNode.accept(resultSoup);
assertCounts.accept(resultSoup.getResult().getNodes()[0], expectedSoupShardCount, expectedSoupObjectsCount);
ClusterMultiTenancyTestSuite.testMultiTenancyDataPerClassOutputVerbose(resultSupplierAll, resultSupplierPizza, resultSupplierSoup,
testGenerics, client);
}
}
Loading

0 comments on commit 0997183

Please sign in to comment.