type);
+
+ /**
+ * StopSubscribe stop subs
+ */
+ public abstract String stopSubscribe();
+
+ /**
+ * GetDefaultGroup returns default group.This method will be invoked if a request doesn't specify the group field
+ */
+ public abstract String getDefaultGroup();
+
+ /**
+ * GetDefaultLabel returns default label
+ */
+ public abstract String getDefaultLabel();
}
diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStoreBuilder.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStoreBuilder.java
new file mode 100644
index 0000000..e57f1d8
--- /dev/null
+++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStoreBuilder.java
@@ -0,0 +1,84 @@
+package group.rxcloud.capa.component.configstore;
+
+
+import group.rxcloud.capa.infrastructure.config.CapaProperties;
+import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer;
+import group.rxcloud.capa.infrastructure.serializer.DefaultObjectSerializer;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+
+/**
+ * A builder for the {@link CapaConfigStore} implementor.
+ */
+public class CapaConfigStoreBuilder {
+
+ /**
+ * Serializer used for request and response objects in CapaClient.
+ */
+ private CapaObjectSerializer objectSerializer;
+
+ private final StoreConfig storeConfig;
+
+ /**
+ * Creates a constructor for CapaConfigStore.
+ *
+ * {@link DefaultObjectSerializer} is used for object and state serializers by default but is not recommended
+ * for production scenarios.
+ */
+ public CapaConfigStoreBuilder(StoreConfig storeConfig) {
+ this.objectSerializer = new DefaultObjectSerializer();
+ this.storeConfig = storeConfig;
+ }
+
+ /**
+ * Sets the serializer for objects to be sent and received from Capa.
+ * See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios.
+ *
+ * @param objectSerializer Serializer for objects to be sent and received from Capa.
+ * @return This instance.
+ */
+ public CapaConfigStoreBuilder withObjectSerializer(CapaObjectSerializer objectSerializer) {
+ if (objectSerializer == null) {
+ throw new IllegalArgumentException("Object serializer is required");
+ }
+ if (objectSerializer.getContentType() == null
+ || objectSerializer.getContentType().isEmpty()) {
+ throw new IllegalArgumentException("Content Type should not be null or empty");
+ }
+ this.objectSerializer = objectSerializer;
+ return this;
+ }
+
+ /**
+ * Build an instance of the client based on the provided setup.
+ *
+ * @return an instance of {@link CapaConfigStore}
+ * @throws IllegalStateException if any required field is missing
+ */
+ public CapaConfigStore build() {
+ CapaConfigStore capaConfigStore = buildCapaConfigStore();
+ capaConfigStore.init(this.storeConfig);
+ return capaConfigStore;
+ }
+
+ /**
+ * Creates an instance of the {@link CapaConfigStore} implementor.
+ *
+ * @return Instance of {@link CapaConfigStore} implementor
+ */
+ private CapaConfigStore buildCapaConfigStore() {
+ // load spi capa config store impl
+ try {
+ Properties properties = CapaProperties.COMPONENT_PROPERTIES_SUPPLIER.get();
+ String capaConfigStoreClassPath = properties.getProperty(CapaConfigStore.class.getName());
+ Class extends CapaConfigStore> aClass = (Class extends CapaConfigStore>) Class.forName(capaConfigStoreClassPath);
+ Constructor extends CapaConfigStore> constructor = aClass.getConstructor(CapaObjectSerializer.class);
+ Object newInstance = constructor.newInstance(this.objectSerializer);
+ return (CapaConfigStore) newInstance;
+ } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException("No CapaConfigStore Client supported.");
+ }
+ }
+}
diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/ConfigurationItem.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/ConfigurationItem.java
new file mode 100644
index 0000000..fe50cc5
--- /dev/null
+++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/ConfigurationItem.java
@@ -0,0 +1,80 @@
+package group.rxcloud.capa.component.configstore;
+
+import java.util.Map;
+
+public class ConfigurationItem {
+
+ /**
+ * Required. The key of configuration item
+ */
+ private String key;
+ /**
+ * The content of configuration item
+ * Empty if the configuration is not set, including the case that the configuration is changed from value-set to value-not-set.
+ */
+ private T content;
+ /**
+ * The group of configuration item.
+ */
+ private String group;
+ /**
+ * The label of configuration item.
+ */
+ private String label;
+ /**
+ * The tag list of configuration item.
+ */
+ private Map tags;
+ /**
+ * The metadata which will be passed to configuration store component.
+ */
+ private Map metadata;
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public T getContent() {
+ return content;
+ }
+
+ public void setContent(T content) {
+ this.content = content;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(Map metadata) {
+ this.metadata = metadata;
+ }
+}
diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/GetRequest.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/GetRequest.java
new file mode 100644
index 0000000..e6c0932
--- /dev/null
+++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/GetRequest.java
@@ -0,0 +1,72 @@
+package group.rxcloud.capa.component.configstore;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * GetRequest is the object describing a get configuration request
+ */
+public class GetRequest {
+
+ /**
+ * The application id which
+ * Only used for admin, Ignored and reset for normal client
+ */
+ private String appId;
+ /**
+ * The group of keys.
+ */
+ private String group;
+ /**
+ * The label for keys.
+ */
+ private String label;
+ /**
+ * The keys to get.
+ */
+ private List keys;
+ /**
+ * The metadata which will be sent to configuration store components.
+ */
+ private Map metadata;
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public List getKeys() {
+ return keys;
+ }
+
+ public void setKeys(List keys) {
+ this.keys = keys;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(Map metadata) {
+ this.metadata = metadata;
+ }
+}
diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/StoreConfig.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/StoreConfig.java
new file mode 100644
index 0000000..1d98979
--- /dev/null
+++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/StoreConfig.java
@@ -0,0 +1,44 @@
+package group.rxcloud.capa.component.configstore;
+
+import java.util.List;
+import java.util.Map;
+
+public class StoreConfig {
+
+ private String storeName;
+ private List address;
+ private String timeOut;
+ private Map metadata;
+
+ public String getStoreName() {
+ return storeName;
+ }
+
+ public void setStoreName(String storeName) {
+ this.storeName = storeName;
+ }
+
+ public List getAddress() {
+ return address;
+ }
+
+ public void setAddress(List address) {
+ this.address = address;
+ }
+
+ public String getTimeOut() {
+ return timeOut;
+ }
+
+ public void setTimeOut(String timeOut) {
+ this.timeOut = timeOut;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(Map metadata) {
+ this.metadata = metadata;
+ }
+}
diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeReq.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeReq.java
new file mode 100644
index 0000000..25bc2a7
--- /dev/null
+++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeReq.java
@@ -0,0 +1,53 @@
+package group.rxcloud.capa.component.configstore;
+
+import java.util.List;
+import java.util.Map;
+
+public class SubscribeReq {
+
+ private String appId;
+ private String group;
+ private String label;
+ private List keys;
+ private Map metadata;
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public List getKeys() {
+ return keys;
+ }
+
+ public void setKeys(List keys) {
+ this.keys = keys;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(Map metadata) {
+ this.metadata = metadata;
+ }
+}
diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeResp.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeResp.java
new file mode 100644
index 0000000..2a9c441
--- /dev/null
+++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeResp.java
@@ -0,0 +1,46 @@
+package group.rxcloud.capa.component.configstore;
+
+
+import java.util.List;
+
+public class SubscribeResp {
+
+ /**
+ * The name of configuration store.
+ */
+ private String storeName;
+ /**
+ * The application id which
+ * Only used for admin, Ignored and reset for normal client
+ */
+ private String appId;
+ /**
+ * The list of configuration items to save.
+ * To delete a exist item, set the key (also label) and let content to be empty
+ */
+ private List> items;
+
+ public String getStoreName() {
+ return storeName;
+ }
+
+ public void setStoreName(String storeName) {
+ this.storeName = storeName;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public List> getItems() {
+ return items;
+ }
+
+ public void setItems(List> items) {
+ this.items = items;
+ }
+}
diff --git a/sdk-infrastructure/pom.xml b/sdk-infrastructure/pom.xml
index a8636c8..46f5e78 100644
--- a/sdk-infrastructure/pom.xml
+++ b/sdk-infrastructure/pom.xml
@@ -7,7 +7,7 @@
capa-parent
group.rxcloud
- 1.0.1.RELEASE
+ 1.0.2.SNAPSHOT
capa-sdk-infrastructure
diff --git a/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java b/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java
index 081746e..e608506 100644
--- a/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java
+++ b/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java
@@ -1,6 +1,7 @@
package group.rxcloud.capa.infrastructure.exceptions;
import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
@@ -73,6 +74,22 @@ public static Mono wrapMono(Exception exception) {
return Mono.empty();
}
+ /**
+ * Wraps an exception into CapaException (if not already CapaException).
+ *
+ * @param exception Exception to be wrapped.
+ * @param Flux's response type.
+ * @return Flux containing CapaException.
+ */
+ public static Flux wrapFlux(Exception exception) {
+ try {
+ wrap(exception);
+ } catch (Exception e) {
+ return Flux.error(e);
+ }
+ return Flux.empty();
+ }
+
/**
* Wraps an exception into CapaException (if not already CapaException).
*
diff --git a/sdk-spi-demo/pom.xml b/sdk-spi-demo/pom.xml
index 066a583..a62718e 100644
--- a/sdk-spi-demo/pom.xml
+++ b/sdk-spi-demo/pom.xml
@@ -7,7 +7,7 @@
capa-parent
group.rxcloud
- 1.0.1.RELEASE
+ 1.0.2.SNAPSHOT
capa-sdk-spi-demo
diff --git a/sdk-spi-demo/src/main/java/group/rxcloud/capa/spi/demo/configstore/DemoCapaConfigStore.java b/sdk-spi-demo/src/main/java/group/rxcloud/capa/spi/demo/configstore/DemoCapaConfigStore.java
new file mode 100644
index 0000000..becb5e2
--- /dev/null
+++ b/sdk-spi-demo/src/main/java/group/rxcloud/capa/spi/demo/configstore/DemoCapaConfigStore.java
@@ -0,0 +1,87 @@
+package group.rxcloud.capa.spi.demo.configstore;
+
+import group.rxcloud.capa.component.configstore.ConfigurationItem;
+import group.rxcloud.capa.component.configstore.StoreConfig;
+import group.rxcloud.capa.component.configstore.SubscribeResp;
+import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer;
+import group.rxcloud.capa.spi.configstore.CapaConfigStoreSpi;
+import group.rxcloud.cloudruntimes.utils.TypeRef;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class DemoCapaConfigStore extends CapaConfigStoreSpi {
+
+ private static final Logger logger = LoggerFactory.getLogger(DemoCapaConfigStore.class);
+
+ /**
+ * Instantiates a new Capa ConfigStore.
+ *
+ * @param objectSerializer Serializer for transient request/response objects.
+ */
+ public DemoCapaConfigStore(CapaObjectSerializer objectSerializer) {
+ super(objectSerializer);
+ }
+
+ @Override
+ protected void doInit(StoreConfig storeConfig) {
+ // paas
+ }
+
+ @Override
+ public String stopSubscribe() {
+ // paas
+ return null;
+ }
+
+ @Override
+ protected Mono>> doGet(String appId, String group, String label, String configName, List values, Map metadata, TypeRef type) {
+ ConfigurationItem configurationItems = new ConfigurationItem<>();
+ configurationItems.setKey("test");
+ configurationItems.setContent(null);
+ configurationItems.setGroup(getDefaultGroup());
+ configurationItems.setLabel(getDefaultLabel());
+ configurationItems.setTags(metadata);
+ configurationItems.setMetadata(metadata);
+
+ List> items = Collections.singletonList(configurationItems);
+ return Mono.just(items);
+ }
+
+ @Override
+ protected Flux> doSubscribe(String appId, String group, String label, String configName, List values, Map metadata, TypeRef type) {
+ return Flux.interval(Duration.ofSeconds(3))
+ .map(aLong -> this.getSubscribeResp(appId, metadata, aLong));
+ }
+
+ @NotNull
+ private SubscribeResp getSubscribeResp(String appId, Map metadata, Long aLong) {
+ ConfigurationItem configurationItems = new ConfigurationItem<>();
+ configurationItems.setKey("test" + aLong);
+ configurationItems.setContent(null);
+ configurationItems.setGroup(getDefaultGroup());
+ configurationItems.setLabel(getDefaultLabel());
+ configurationItems.setTags(metadata);
+ configurationItems.setMetadata(metadata);
+
+ List> items = Collections.singletonList(configurationItems);
+
+ SubscribeResp subscribeResp = new SubscribeResp<>();
+ subscribeResp.setAppId(appId);
+ subscribeResp.setStoreName(getStoreName());
+ subscribeResp.setItems(items);
+ return subscribeResp;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // paas
+ }
+}
diff --git a/sdk-spi-demo/src/main/resources/capa-component.properties b/sdk-spi-demo/src/main/resources/capa-component.properties
index 6703aad..d1215aa 100644
--- a/sdk-spi-demo/src/main/resources/capa-component.properties
+++ b/sdk-spi-demo/src/main/resources/capa-component.properties
@@ -1,2 +1,4 @@
group.rxcloud.capa.component.http.CapaHttp=group.rxcloud.capa.spi.demo.http.DemoCapaHttp
-group.rxcloud.capa.spi.config.CapaSpiOptionsLoader=group.rxcloud.capa.spi.demo.config.DemoSpiOptionsLoader
\ No newline at end of file
+group.rxcloud.capa.spi.config.CapaSpiOptionsLoader=group.rxcloud.capa.spi.demo.config.DemoSpiOptionsLoader
+
+group.rxcloud.capa.component.configstore.CapaConfigStore=group.rxcloud.capa.spi.demo.configstore.DemoCapaConfigStore
\ No newline at end of file
diff --git a/sdk-spi/pom.xml b/sdk-spi/pom.xml
index da2dd2c..29650ee 100644
--- a/sdk-spi/pom.xml
+++ b/sdk-spi/pom.xml
@@ -7,7 +7,7 @@
capa-parent
group.rxcloud
- 1.0.1.RELEASE
+ 1.0.2.SNAPSHOT
capa-sdk-spi
diff --git a/sdk-spi/src/main/java/group/rxcloud/capa/spi/configstore/CapaConfigStoreSpi.java b/sdk-spi/src/main/java/group/rxcloud/capa/spi/configstore/CapaConfigStoreSpi.java
new file mode 100644
index 0000000..dbdeafa
--- /dev/null
+++ b/sdk-spi/src/main/java/group/rxcloud/capa/spi/configstore/CapaConfigStoreSpi.java
@@ -0,0 +1,128 @@
+package group.rxcloud.capa.spi.configstore;
+
+import group.rxcloud.capa.component.configstore.*;
+import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer;
+import group.rxcloud.cloudruntimes.utils.TypeRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The SPI Capa ConfigStore client. Templates for different implementations.
+ */
+public abstract class CapaConfigStoreSpi extends CapaConfigStore {
+
+ private static final Logger logger = LoggerFactory.getLogger(CapaConfigStoreSpi.class);
+
+ /**
+ * Instantiates a new Capa ConfigStore.
+ *
+ * @param objectSerializer Serializer for transient request/response objects.
+ */
+ public CapaConfigStoreSpi(CapaObjectSerializer objectSerializer) {
+ super(objectSerializer);
+ }
+
+ @Override
+ public Mono>> get(GetRequest getRequest, TypeRef type) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("[CapaConfigStoreSpi] get config request[{}]", getRequest);
+ }
+ final String appId = getRequest.getAppId();
+ final String group = getRequest.getGroup();
+ final String label = getRequest.getLabel();
+ final Map metadata = getRequest.getMetadata();
+
+ // [config_name, value1, value2, ...]
+ final List keys = getRequest.getKeys();
+ Objects.requireNonNull(keys, "keys");
+ final String configName = keys.get(0);
+ final List values = keys.subList(1, keys.size());
+ if (logger.isDebugEnabled()) {
+ logger.debug("[CapaConfigStoreSpi] get config[config_name={}, values={}]",
+ configName, keys);
+ }
+
+ return doGet(appId, group, label, configName, values, metadata, type);
+ }
+
+ /**
+ * Get configuration.
+ *
+ * @param the response type parameter
+ * @param appId the app id
+ * @param group the group
+ * @param label the label
+ * @param configName the config name
+ * @param values the values
+ * @param metadata the metadata
+ * @param type the response type
+ * @return the async mono response
+ */
+ protected abstract Mono>> doGet(String appId,
+ String group,
+ String label,
+ String configName,
+ List values,
+ Map metadata,
+ TypeRef type);
+
+ @Override
+ public Flux> subscribe(SubscribeReq subscribeReq, TypeRef type) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("[CapaConfigStoreSpi] subscribe config request[{}]", subscribeReq);
+ }
+ final String appId = subscribeReq.getAppId();
+ final String group = subscribeReq.getGroup();
+ final String label = subscribeReq.getLabel();
+ final Map metadata = subscribeReq.getMetadata();
+
+ // [config_name, value1, value2, ...]
+ final List keys = subscribeReq.getKeys();
+ Objects.requireNonNull(keys, "keys");
+ final String configName = keys.get(0);
+ final List values = keys.subList(1, keys.size());
+ if (logger.isDebugEnabled()) {
+ logger.debug("[CapaConfigStoreSpi] subscribe config[config_name={}, values={}]",
+ configName, keys);
+ }
+
+ return doSubscribe(appId, group, label, configName, values, metadata, type);
+ }
+
+ /**
+ * Subscribe configuration.
+ *
+ * @param the response type parameter
+ * @param appId the app id
+ * @param group the group
+ * @param label the label
+ * @param configName the config name
+ * @param values the values
+ * @param metadata the metadata
+ * @param type the response type
+ * @return the async flux of response stream
+ */
+ protected abstract Flux> doSubscribe(String appId,
+ String group,
+ String label,
+ String configName,
+ List values,
+ Map metadata,
+ TypeRef type);
+
+ @Override
+ public String getDefaultGroup() {
+ return "application";
+ }
+
+ @Override
+ public String getDefaultLabel() {
+ return "";
+ }
+}
diff --git a/sdk/pom.xml b/sdk/pom.xml
index 16b81e3..2b8403f 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -7,7 +7,7 @@
group.rxcloud
capa-parent
- 1.0.1.RELEASE
+ 1.0.2.SNAPSHOT
capa-sdk
diff --git a/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientBuilder.java b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientBuilder.java
new file mode 100644
index 0000000..76bb443
--- /dev/null
+++ b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientBuilder.java
@@ -0,0 +1,62 @@
+package group.rxcloud.capa.configuration;
+
+
+import group.rxcloud.capa.component.configstore.CapaConfigStore;
+import group.rxcloud.capa.component.configstore.CapaConfigStoreBuilder;
+import group.rxcloud.capa.component.configstore.StoreConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * A builder for the {@link CapaConfigurationClient}.
+ */
+public class CapaConfigurationClientBuilder {
+
+ /**
+ * Builder for Capa's ConfigStore Client.
+ */
+ private final List configStoreBuilders;
+
+ /**
+ * Creates a constructor for {@link CapaConfigurationClient}.
+ */
+ public CapaConfigurationClientBuilder(StoreConfig storeConfig) {
+ this(Collections.singletonList(new CapaConfigStoreBuilder(storeConfig)));
+ }
+
+ /**
+ * Creates a constructor for {@link CapaConfigurationClient} with custom {@link CapaConfigStoreBuilder}.
+ */
+ public CapaConfigurationClientBuilder(Supplier capaConfigStoreBuilderSupplier) {
+ this(Collections.singletonList(capaConfigStoreBuilderSupplier.get()));
+ }
+
+ /**
+ * Creates a constructor for {@link CapaConfigurationClient}.
+ */
+ public CapaConfigurationClientBuilder(List configStoreBuilders) {
+ this.configStoreBuilders = configStoreBuilders;
+ }
+
+ /**
+ * Build an instance of the Client based on the provided setup.
+ *
+ * @return an instance of the setup Client
+ */
+ public CapaConfigurationClient build() {
+ return buildCapaClientStore();
+ }
+
+ /**
+ * Creates and instance of {@link CapaConfigurationClient}.
+ */
+ private CapaConfigurationClient buildCapaClientStore() {
+ List capaConfigStores = this.configStoreBuilders.stream()
+ .map(CapaConfigStoreBuilder::build)
+ .collect(Collectors.toList());
+ return new CapaConfigurationClientStore(capaConfigStores);
+ }
+}
diff --git a/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java
index 77682fd..bafc9dd 100644
--- a/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java
+++ b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java
@@ -1,6 +1,9 @@
package group.rxcloud.capa.configuration;
import group.rxcloud.capa.component.configstore.CapaConfigStore;
+import group.rxcloud.capa.component.configstore.GetRequest;
+import group.rxcloud.capa.component.configstore.SubscribeReq;
+import group.rxcloud.capa.component.configstore.SubscribeResp;
import group.rxcloud.capa.infrastructure.exceptions.CapaExceptions;
import group.rxcloud.cloudruntimes.domain.core.configuration.ConfigurationItem;
import group.rxcloud.cloudruntimes.domain.core.configuration.ConfigurationRequestItem;
@@ -9,8 +12,9 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* An adapter for the Config Store Client.
@@ -18,45 +22,177 @@
public class CapaConfigurationClientStore extends AbstractCapaConfigurationClient {
/**
- * The Store client to be used.
+ * The Store clients to be used.
*
* @see CapaConfigStore
*/
- private final CapaConfigStore store;
+ private final Map configStores;
- public CapaConfigurationClientStore(CapaConfigStore store) {
- this.store = store;
+ public CapaConfigurationClientStore(List stores) {
+ if (stores == null || stores.isEmpty()) {
+ this.configStores = new HashMap<>(2, 1);
+ } else {
+ this.configStores = stores.stream()
+ .collect(Collectors.toMap(
+ CapaConfigStore::getStoreName,
+ Function.identity()));
+ }
+ }
+
+ private CapaConfigStore getStore(String storeName) {
+ // check storeName
+ if (storeName == null || storeName.trim().isEmpty()) {
+ throw new IllegalArgumentException("Store Name cannot be null or empty.");
+ }
+ return Objects.requireNonNull(configStores.get(storeName), "Store Component cannot be null.");
}
@Override
public Mono>> getConfiguration(ConfigurationRequestItem configurationRequestItem, TypeRef type) {
try {
- String storeName = configurationRequestItem.getStoreName();
- String appId = configurationRequestItem.getAppId();
+ final String storeName = configurationRequestItem.getStoreName();
+ final CapaConfigStore store = this.getStore(storeName);
+
+ final String appId = configurationRequestItem.getAppId();
String group = configurationRequestItem.getGroup();
String label = configurationRequestItem.getLabel();
List keys = configurationRequestItem.getKeys();
Map metadata = configurationRequestItem.getMetadata();
- // check storeName
- if (storeName == null || storeName.trim().isEmpty()) {
- throw new IllegalArgumentException("Store Name cannot be null or empty.");
+ if (group == null || group.trim().isEmpty()) {
+ group = store.getDefaultGroup();
+ }
+ if (label == null || label.trim().isEmpty()) {
+ label = store.getDefaultLabel();
}
- // check appId
- if (appId == null || appId.trim().isEmpty()) {
- throw new IllegalArgumentException("App Id cannot be null or empty.");
+ if (keys == null) {
+ keys = new ArrayList<>(2);
+ }
+ if (metadata == null) {
+ metadata = new HashMap<>(2, 1);
}
- // TODO: 2021/8/6
+ final String finalGroup = group;
+ final String finalLabel = label;
+ final List finalKeys = keys;
+ final Map finalMetadata = metadata;
- return Mono.empty();
+ return Mono.subscriberContext()
+ .flatMap(context -> {
+ GetRequest getRequest = this.getStoreRequest(
+ appId,
+ finalGroup,
+ finalLabel,
+ finalKeys,
+ finalMetadata);
+ return store.get(getRequest, type);
+ })
+ .flatMap(configurationItems -> {
+ if (configurationItems == null || configurationItems.isEmpty()) {
+ return Mono.empty();
+ }
+ return Flux.fromStream(configurationItems
+ .stream()
+ .map(this::getStoreResponse))
+ .collectList();
+ });
} catch (Exception ex) {
return CapaExceptions.wrapMono(ex);
}
}
+ private GetRequest getStoreRequest(String appId, String group, String label, List keys, Map metadata) {
+ GetRequest getRequest = new GetRequest();
+ getRequest.setAppId(appId);
+ getRequest.setGroup(group);
+ getRequest.setLabel(label);
+ getRequest.setKeys(keys);
+ getRequest.setMetadata(metadata);
+ return getRequest;
+ }
+
+ private ConfigurationItem getStoreResponse(group.rxcloud.capa.component.configstore.ConfigurationItem tConfigurationItem) {
+ ConfigurationItem configurationItem = new ConfigurationItem<>();
+ configurationItem.setKey(tConfigurationItem.getKey());
+ configurationItem.setContent(tConfigurationItem.getContent());
+ configurationItem.setGroup(tConfigurationItem.getGroup());
+ configurationItem.setLabel(tConfigurationItem.getLabel());
+ configurationItem.setTags(tConfigurationItem.getTags());
+ configurationItem.setMetadata(tConfigurationItem.getMetadata());
+ return configurationItem;
+ }
+
@Override
public Flux> subscribeConfiguration(ConfigurationRequestItem configurationRequestItem, TypeRef type) {
- return null;
+ try {
+ final String storeName = configurationRequestItem.getStoreName();
+ final CapaConfigStore store = this.getStore(storeName);
+
+ final String appId = configurationRequestItem.getAppId();
+ String group = configurationRequestItem.getGroup();
+ String label = configurationRequestItem.getLabel();
+ List keys = configurationRequestItem.getKeys();
+ Map metadata = configurationRequestItem.getMetadata();
+ if (group == null || group.trim().isEmpty()) {
+ group = store.getDefaultGroup();
+ }
+ if (label == null || label.trim().isEmpty()) {
+ label = store.getDefaultLabel();
+ }
+ if (keys == null) {
+ keys = new ArrayList<>(2);
+ }
+ if (metadata == null) {
+ metadata = new HashMap<>(2, 1);
+ }
+
+ final String finalGroup = group;
+ final String finalLabel = label;
+ final List finalKeys = keys;
+ final Map finalMetadata = metadata;
+
+ return Flux.deferWithContext(Mono::just)
+ .flatMap(context -> {
+ SubscribeReq subscribeReq = this.getSubscribeRequest(
+ appId,
+ finalGroup,
+ finalLabel,
+ finalKeys,
+ finalMetadata);
+ return store.subscribe(subscribeReq, type);
+ })
+ .flatMap(subscribeResp -> {
+ if (subscribeResp == null) {
+ return Flux.empty();
+ }
+ SubConfigurationResp subscribeResponse = this.getSubscribeResponse(subscribeResp);
+ return Flux.just(subscribeResponse);
+ });
+ } catch (Exception ex) {
+ return CapaExceptions.wrapFlux(ex);
+ }
+ }
+
+ private SubscribeReq getSubscribeRequest(String appId, String group, String label, List keys, Map metadata) {
+ SubscribeReq subscribeReq = new SubscribeReq();
+ subscribeReq.setAppId(appId);
+ subscribeReq.setGroup(group);
+ subscribeReq.setLabel(label);
+ subscribeReq.setKeys(keys);
+ subscribeReq.setMetadata(metadata);
+ return subscribeReq;
+ }
+
+ private SubConfigurationResp getSubscribeResponse(SubscribeResp subscribeResp) {
+ SubConfigurationResp subConfigurationResp = new SubConfigurationResp<>();
+ subConfigurationResp.setStoreName(subscribeResp.getStoreName());
+ subConfigurationResp.setAppId(subscribeResp.getAppId());
+ if (subscribeResp.getItems() != null && subscribeResp.getItems().isEmpty()) {
+ List> itemList = subscribeResp.getItems().stream()
+ .map(this::getStoreResponse)
+ .collect(Collectors.toList());
+ subConfigurationResp.setItems(itemList);
+ }
+ return subConfigurationResp;
}
/**
@@ -65,7 +201,9 @@ public Flux> subscribeConfiguration(ConfigurationReq
@Override
public void close() {
try {
- store.close();
+ for (CapaConfigStore store : configStores.values()) {
+ store.close();
+ }
} catch (Exception e) {
throw CapaExceptions.propagate(e);
}
diff --git a/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java b/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java
index cdba481..da11138 100644
--- a/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java
+++ b/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java
@@ -1,36 +1,13 @@
package group.rxcloud.capa.pubsub;
import group.rxcloud.cloudruntimes.client.DefaultCloudRuntimesClient;
-import group.rxcloud.cloudruntimes.domain.core.binding.InvokeBindingRequest;
import group.rxcloud.cloudruntimes.domain.core.pubsub.PublishEventRequest;
-import group.rxcloud.cloudruntimes.utils.TypeRef;
import reactor.core.publisher.Mono;
import java.util.Map;
public interface CapaPubSubClient extends DefaultCloudRuntimesClient {
- @Override
- Mono invokeBinding(String bindingName, String operation, Object data);
-
- @Override
- Mono invokeBinding(String bindingName, String operation, byte[] data, Map metadata);
-
- @Override
- Mono invokeBinding(String bindingName, String operation, Object data, TypeRef type);
-
- @Override
- Mono invokeBinding(String bindingName, String operation, Object data, Class clazz);
-
- @Override
- Mono invokeBinding(String bindingName, String operation, Object data, Map metadata, TypeRef type);
-
- @Override
- Mono invokeBinding(String bindingName, String operation, Object data, Map metadata, Class clazz);
-
- @Override
- Mono invokeBinding(InvokeBindingRequest request, TypeRef type);
-
@Override
Mono publishEvent(String pubsubName, String topicName, Object data);
diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java b/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java
index b659e0c..175f873 100644
--- a/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java
+++ b/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java
@@ -23,7 +23,6 @@ public Mono invokeMethod(String appId, String methodName, Object request,
.withHttpExtension(httpExtension)
.withMetadata(metadata)
.build();
-
return this.invokeMethod(req, type);
}
diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java
index 7bf0ec3..d3c9685 100644
--- a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java
+++ b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java
@@ -13,7 +13,8 @@ public enum CapaApiProtocol {
/**
* HTTP/1.1
*/
- HTTP;
+ HTTP,
+ GRPC;
/**
* Parse protocol to {@link CapaApiProtocol}.
diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java
index 8317634..6c64375 100644
--- a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java
+++ b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java
@@ -7,8 +7,7 @@
import java.util.function.Supplier;
/**
- * A builder for the {@link CapaRpcClient},
- * Currently only HTTP Client will be supported.
+ * A builder for the {@link CapaRpcClient}, Currently only HTTP Client will be supported.
*/
public class CapaRpcClientBuilder {
@@ -65,13 +64,10 @@ private CapaRpcClient buildCapaClient(CapaApiProtocol protocol) {
if (protocol == null) {
throw new IllegalStateException("Protocol is required.");
}
-
- switch (protocol) {
- case HTTP:
- return buildCapaClientHttp();
- default:
- throw new IllegalStateException("Unsupported protocol: " + protocol.name());
+ if (protocol == CapaApiProtocol.HTTP) {
+ return buildCapaClientHttp();
}
+ throw new IllegalStateException("Unsupported protocol: " + protocol.name());
}
/**
diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java
index 3439b6e..6ee8788 100644
--- a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java
+++ b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java
@@ -1,7 +1,6 @@
package group.rxcloud.capa.rpc;
import group.rxcloud.capa.component.http.CapaHttp;
-import group.rxcloud.capa.component.http.HttpResponse;
import group.rxcloud.capa.infrastructure.exceptions.CapaExceptions;
import group.rxcloud.cloudruntimes.domain.core.invocation.HttpExtension;
import group.rxcloud.cloudruntimes.domain.core.invocation.InvokeMethodRequest;
@@ -63,8 +62,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
headers.putAll(metadata);
}
- Mono> httpResponseMono = Mono
- .subscriberContext()
+ return Mono.subscriberContext()
.flatMap(context ->
this.client.invokeApi(
httpMethod,
@@ -73,8 +71,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
request,
headers,
context,
- type));
- Mono responseMono = httpResponseMono
+ type))
.flatMap(httpResponse -> {
T object = httpResponse.getBody();
if (object != null) {
@@ -82,7 +79,6 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
}
return Mono.empty();
});
- return responseMono;
} catch (Exception ex) {
return CapaExceptions.wrapMono(ex);
}