From 535c88f86714f0beed5a3e34b86fc3f9f879cdcf Mon Sep 17 00:00:00 2001 From: Kevin_T <596823919@qq.com> Date: Mon, 27 Sep 2021 18:44:15 +0800 Subject: [PATCH] feat: configuration module --- README.md | 4 +- examples/pom.xml | 2 +- .../DemoConfigurationClient.java | 45 +++++ pom.xml | 2 +- sdk-component/pom.xml | 2 +- .../configstore/CapaConfigStore.java | 60 +++++- .../configstore/CapaConfigStoreBuilder.java | 84 +++++++++ .../configstore/ConfigurationItem.java | 80 ++++++++ .../component/configstore/GetRequest.java | 72 ++++++++ .../component/configstore/StoreConfig.java | 44 +++++ .../component/configstore/SubscribeReq.java | 53 ++++++ .../component/configstore/SubscribeResp.java | 46 +++++ sdk-infrastructure/pom.xml | 2 +- .../exceptions/CapaExceptions.java | 17 ++ sdk-spi-demo/pom.xml | 2 +- .../demo/configstore/DemoCapaConfigStore.java | 87 +++++++++ .../main/resources/capa-component.properties | 4 +- sdk-spi/pom.xml | 2 +- .../spi/configstore/CapaConfigStoreSpi.java | 128 +++++++++++++ sdk/pom.xml | 2 +- .../CapaConfigurationClientBuilder.java | 62 +++++++ .../CapaConfigurationClientStore.java | 174 ++++++++++++++++-- .../rxcloud/capa/pubsub/CapaPubSubClient.java | 23 --- .../capa/rpc/AbstractCapaRpcClient.java | 1 - .../rxcloud/capa/rpc/CapaApiProtocol.java | 3 +- .../capa/rpc/CapaRpcClientBuilder.java | 12 +- .../rxcloud/capa/rpc/CapaRpcClientHttp.java | 8 +- 27 files changed, 953 insertions(+), 68 deletions(-) create mode 100644 sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStoreBuilder.java create mode 100644 sdk-component/src/main/java/group/rxcloud/capa/component/configstore/ConfigurationItem.java create mode 100644 sdk-component/src/main/java/group/rxcloud/capa/component/configstore/GetRequest.java create mode 100644 sdk-component/src/main/java/group/rxcloud/capa/component/configstore/StoreConfig.java create mode 100644 sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeReq.java create mode 100644 sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeResp.java create mode 100644 sdk-spi-demo/src/main/java/group/rxcloud/capa/spi/demo/configstore/DemoCapaConfigStore.java create mode 100644 sdk-spi/src/main/java/group/rxcloud/capa/spi/configstore/CapaConfigStoreSpi.java create mode 100644 sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientBuilder.java diff --git a/README.md b/README.md index e2b05ad..429d7c0 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ For a Maven project, add the following to your pom.xml file: group.rxcloud capa-sdk - 1.0.1.RELEASE + 1.0.2.SNAPSHOT ... @@ -93,7 +93,7 @@ Sample implementation library: group.rxcloud capa-sdk-spi-demo - 1.0.1.RELEASE + 1.0.2.SNAPSHOT ... diff --git a/examples/pom.xml b/examples/pom.xml index a1e4636..f3637dd 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -7,7 +7,7 @@ capa-parent group.rxcloud - 1.0.1.RELEASE + 1.0.2.SNAPSHOT capa-examples diff --git a/examples/src/main/java/group/rxcloud/capa/examples/configuration/DemoConfigurationClient.java b/examples/src/main/java/group/rxcloud/capa/examples/configuration/DemoConfigurationClient.java index 823905f..5a7eb7c 100644 --- a/examples/src/main/java/group/rxcloud/capa/examples/configuration/DemoConfigurationClient.java +++ b/examples/src/main/java/group/rxcloud/capa/examples/configuration/DemoConfigurationClient.java @@ -1,4 +1,49 @@ package group.rxcloud.capa.examples.configuration; +import group.rxcloud.capa.component.configstore.StoreConfig; +import group.rxcloud.capa.configuration.CapaConfigurationClient; +import group.rxcloud.capa.configuration.CapaConfigurationClientBuilder; +import group.rxcloud.cloudruntimes.domain.core.configuration.ConfigurationItem; +import group.rxcloud.cloudruntimes.domain.core.configuration.ConfigurationRequestItem; +import group.rxcloud.cloudruntimes.domain.core.configuration.SubConfigurationResp; +import group.rxcloud.cloudruntimes.utils.TypeRef; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Collections; +import java.util.List; + public class DemoConfigurationClient { + + public static void main(String[] args) throws InterruptedException { + StoreConfig storeConfig = new StoreConfig(); + storeConfig.setStoreName("config"); + + CapaConfigurationClient capaConfigurationClient = new CapaConfigurationClientBuilder(storeConfig).build(); + + ConfigurationRequestItem configurationRequestItem = new ConfigurationRequestItem(); + configurationRequestItem.setAppId("test"); + configurationRequestItem.setStoreName("config"); + configurationRequestItem.setKeys(Collections.singletonList("test.json")); + + // get + Mono>> configuration = + capaConfigurationClient.getConfiguration(configurationRequestItem, TypeRef.STRING); + + List> block = configuration.block(); + + for (ConfigurationItem item : block) { + System.out.println(item); + } + + // subscribe + Flux> subConfigurationRespFlux = + capaConfigurationClient.subscribeConfiguration(configurationRequestItem, TypeRef.STRING); + + subConfigurationRespFlux.subscribe(resp -> { + System.out.println(resp); + }); + + Thread.sleep(1000 * 10); + } } diff --git a/pom.xml b/pom.xml index 80a8151..2fae71c 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ group.rxcloud capa-parent pom - 1.0.1.RELEASE + 1.0.2.SNAPSHOT capa-sdk-parent SDK for Capa. https://github.com/reactivegroup diff --git a/sdk-component/pom.xml b/sdk-component/pom.xml index dcaac61..945be8d 100644 --- a/sdk-component/pom.xml +++ b/sdk-component/pom.xml @@ -7,7 +7,7 @@ group.rxcloud capa-parent - 1.0.1.RELEASE + 1.0.2.SNAPSHOT capa-sdk-component diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStore.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStore.java index d9838a5..8be201b 100644 --- a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStore.java +++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStore.java @@ -2,7 +2,15 @@ import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer; +import group.rxcloud.cloudruntimes.utils.TypeRef; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import java.util.List; + +/** + * The Abstract ConfigStore Client. Extend this and provide your specific impl. + */ public abstract class CapaConfigStore implements AutoCloseable { /** @@ -16,11 +24,61 @@ public abstract class CapaConfigStore implements AutoCloseable { protected final CapaObjectSerializer objectSerializer; /** - * Instantiates a new Capa configuration. + * Init init the configuration store. + */ + private String storeName; + + /** + * Instantiates a new Capa ConfigStore. * * @param objectSerializer Serializer for transient request/response objects. */ public CapaConfigStore(CapaObjectSerializer objectSerializer) { this.objectSerializer = objectSerializer; } + + /** + * Init init the configuration store. + */ + public void init(StoreConfig storeConfig) { + this.storeName = storeConfig.getStoreName(); + this.doInit(storeConfig); + } + + /** + * Init init the configuration store. + */ + protected abstract void doInit(StoreConfig storeConfig); + + /** + * Gets store name. + */ + public String getStoreName() { + return this.storeName; + } + + /** + * GetSpecificKeysValue get specific key value. + */ + public abstract Mono>> get(GetRequest getRequest, TypeRef type); + + /** + * Subscribe subscribe the configurations updates. + */ + public abstract Flux> subscribe(SubscribeReq subscribeReq, TypeRef type); + + /** + * StopSubscribe stop subs + */ + public abstract String stopSubscribe(); + + /** + * GetDefaultGroup returns default group.This method will be invoked if a request doesn't specify the group field + */ + public abstract String getDefaultGroup(); + + /** + * GetDefaultLabel returns default label + */ + public abstract String getDefaultLabel(); } diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStoreBuilder.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStoreBuilder.java new file mode 100644 index 0000000..e57f1d8 --- /dev/null +++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/CapaConfigStoreBuilder.java @@ -0,0 +1,84 @@ +package group.rxcloud.capa.component.configstore; + + +import group.rxcloud.capa.infrastructure.config.CapaProperties; +import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer; +import group.rxcloud.capa.infrastructure.serializer.DefaultObjectSerializer; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Properties; + +/** + * A builder for the {@link CapaConfigStore} implementor. + */ +public class CapaConfigStoreBuilder { + + /** + * Serializer used for request and response objects in CapaClient. + */ + private CapaObjectSerializer objectSerializer; + + private final StoreConfig storeConfig; + + /** + * Creates a constructor for CapaConfigStore. + *

+ * {@link DefaultObjectSerializer} is used for object and state serializers by default but is not recommended + * for production scenarios. + */ + public CapaConfigStoreBuilder(StoreConfig storeConfig) { + this.objectSerializer = new DefaultObjectSerializer(); + this.storeConfig = storeConfig; + } + + /** + * Sets the serializer for objects to be sent and received from Capa. + * See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios. + * + * @param objectSerializer Serializer for objects to be sent and received from Capa. + * @return This instance. + */ + public CapaConfigStoreBuilder withObjectSerializer(CapaObjectSerializer objectSerializer) { + if (objectSerializer == null) { + throw new IllegalArgumentException("Object serializer is required"); + } + if (objectSerializer.getContentType() == null + || objectSerializer.getContentType().isEmpty()) { + throw new IllegalArgumentException("Content Type should not be null or empty"); + } + this.objectSerializer = objectSerializer; + return this; + } + + /** + * Build an instance of the client based on the provided setup. + * + * @return an instance of {@link CapaConfigStore} + * @throws IllegalStateException if any required field is missing + */ + public CapaConfigStore build() { + CapaConfigStore capaConfigStore = buildCapaConfigStore(); + capaConfigStore.init(this.storeConfig); + return capaConfigStore; + } + + /** + * Creates an instance of the {@link CapaConfigStore} implementor. + * + * @return Instance of {@link CapaConfigStore} implementor + */ + private CapaConfigStore buildCapaConfigStore() { + // load spi capa config store impl + try { + Properties properties = CapaProperties.COMPONENT_PROPERTIES_SUPPLIER.get(); + String capaConfigStoreClassPath = properties.getProperty(CapaConfigStore.class.getName()); + Class aClass = (Class) Class.forName(capaConfigStoreClassPath); + Constructor constructor = aClass.getConstructor(CapaObjectSerializer.class); + Object newInstance = constructor.newInstance(this.objectSerializer); + return (CapaConfigStore) newInstance; + } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("No CapaConfigStore Client supported."); + } + } +} diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/ConfigurationItem.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/ConfigurationItem.java new file mode 100644 index 0000000..fe50cc5 --- /dev/null +++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/ConfigurationItem.java @@ -0,0 +1,80 @@ +package group.rxcloud.capa.component.configstore; + +import java.util.Map; + +public class ConfigurationItem { + + /** + * Required. The key of configuration item + */ + private String key; + /** + * The content of configuration item + * Empty if the configuration is not set, including the case that the configuration is changed from value-set to value-not-set. + */ + private T content; + /** + * The group of configuration item. + */ + private String group; + /** + * The label of configuration item. + */ + private String label; + /** + * The tag list of configuration item. + */ + private Map tags; + /** + * The metadata which will be passed to configuration store component. + */ + private Map metadata; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public T getContent() { + return content; + } + + public void setContent(T content) { + this.content = content; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public Map getMetadata() { + return metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } +} diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/GetRequest.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/GetRequest.java new file mode 100644 index 0000000..e6c0932 --- /dev/null +++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/GetRequest.java @@ -0,0 +1,72 @@ +package group.rxcloud.capa.component.configstore; + +import java.util.List; +import java.util.Map; + +/** + * GetRequest is the object describing a get configuration request + */ +public class GetRequest { + + /** + * The application id which + * Only used for admin, Ignored and reset for normal client + */ + private String appId; + /** + * The group of keys. + */ + private String group; + /** + * The label for keys. + */ + private String label; + /** + * The keys to get. + */ + private List keys; + /** + * The metadata which will be sent to configuration store components. + */ + private Map metadata; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public List getKeys() { + return keys; + } + + public void setKeys(List keys) { + this.keys = keys; + } + + public Map getMetadata() { + return metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } +} diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/StoreConfig.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/StoreConfig.java new file mode 100644 index 0000000..1d98979 --- /dev/null +++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/StoreConfig.java @@ -0,0 +1,44 @@ +package group.rxcloud.capa.component.configstore; + +import java.util.List; +import java.util.Map; + +public class StoreConfig { + + private String storeName; + private List address; + private String timeOut; + private Map metadata; + + public String getStoreName() { + return storeName; + } + + public void setStoreName(String storeName) { + this.storeName = storeName; + } + + public List getAddress() { + return address; + } + + public void setAddress(List address) { + this.address = address; + } + + public String getTimeOut() { + return timeOut; + } + + public void setTimeOut(String timeOut) { + this.timeOut = timeOut; + } + + public Map getMetadata() { + return metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } +} diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeReq.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeReq.java new file mode 100644 index 0000000..25bc2a7 --- /dev/null +++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeReq.java @@ -0,0 +1,53 @@ +package group.rxcloud.capa.component.configstore; + +import java.util.List; +import java.util.Map; + +public class SubscribeReq { + + private String appId; + private String group; + private String label; + private List keys; + private Map metadata; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public List getKeys() { + return keys; + } + + public void setKeys(List keys) { + this.keys = keys; + } + + public Map getMetadata() { + return metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } +} diff --git a/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeResp.java b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeResp.java new file mode 100644 index 0000000..2a9c441 --- /dev/null +++ b/sdk-component/src/main/java/group/rxcloud/capa/component/configstore/SubscribeResp.java @@ -0,0 +1,46 @@ +package group.rxcloud.capa.component.configstore; + + +import java.util.List; + +public class SubscribeResp { + + /** + * The name of configuration store. + */ + private String storeName; + /** + * The application id which + * Only used for admin, Ignored and reset for normal client + */ + private String appId; + /** + * The list of configuration items to save. + * To delete a exist item, set the key (also label) and let content to be empty + */ + private List> items; + + public String getStoreName() { + return storeName; + } + + public void setStoreName(String storeName) { + this.storeName = storeName; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public List> getItems() { + return items; + } + + public void setItems(List> items) { + this.items = items; + } +} diff --git a/sdk-infrastructure/pom.xml b/sdk-infrastructure/pom.xml index a8636c8..46f5e78 100644 --- a/sdk-infrastructure/pom.xml +++ b/sdk-infrastructure/pom.xml @@ -7,7 +7,7 @@ capa-parent group.rxcloud - 1.0.1.RELEASE + 1.0.2.SNAPSHOT capa-sdk-infrastructure diff --git a/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java b/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java index 081746e..e608506 100644 --- a/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java +++ b/sdk-infrastructure/src/main/java/group/rxcloud/capa/infrastructure/exceptions/CapaExceptions.java @@ -1,6 +1,7 @@ package group.rxcloud.capa.infrastructure.exceptions; import reactor.core.Exceptions; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.concurrent.Callable; @@ -73,6 +74,22 @@ public static Mono wrapMono(Exception exception) { return Mono.empty(); } + /** + * Wraps an exception into CapaException (if not already CapaException). + * + * @param exception Exception to be wrapped. + * @param Flux's response type. + * @return Flux containing CapaException. + */ + public static Flux wrapFlux(Exception exception) { + try { + wrap(exception); + } catch (Exception e) { + return Flux.error(e); + } + return Flux.empty(); + } + /** * Wraps an exception into CapaException (if not already CapaException). * diff --git a/sdk-spi-demo/pom.xml b/sdk-spi-demo/pom.xml index 066a583..a62718e 100644 --- a/sdk-spi-demo/pom.xml +++ b/sdk-spi-demo/pom.xml @@ -7,7 +7,7 @@ capa-parent group.rxcloud - 1.0.1.RELEASE + 1.0.2.SNAPSHOT capa-sdk-spi-demo diff --git a/sdk-spi-demo/src/main/java/group/rxcloud/capa/spi/demo/configstore/DemoCapaConfigStore.java b/sdk-spi-demo/src/main/java/group/rxcloud/capa/spi/demo/configstore/DemoCapaConfigStore.java new file mode 100644 index 0000000..becb5e2 --- /dev/null +++ b/sdk-spi-demo/src/main/java/group/rxcloud/capa/spi/demo/configstore/DemoCapaConfigStore.java @@ -0,0 +1,87 @@ +package group.rxcloud.capa.spi.demo.configstore; + +import group.rxcloud.capa.component.configstore.ConfigurationItem; +import group.rxcloud.capa.component.configstore.StoreConfig; +import group.rxcloud.capa.component.configstore.SubscribeResp; +import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer; +import group.rxcloud.capa.spi.configstore.CapaConfigStoreSpi; +import group.rxcloud.cloudruntimes.utils.TypeRef; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class DemoCapaConfigStore extends CapaConfigStoreSpi { + + private static final Logger logger = LoggerFactory.getLogger(DemoCapaConfigStore.class); + + /** + * Instantiates a new Capa ConfigStore. + * + * @param objectSerializer Serializer for transient request/response objects. + */ + public DemoCapaConfigStore(CapaObjectSerializer objectSerializer) { + super(objectSerializer); + } + + @Override + protected void doInit(StoreConfig storeConfig) { + // paas + } + + @Override + public String stopSubscribe() { + // paas + return null; + } + + @Override + protected Mono>> doGet(String appId, String group, String label, String configName, List values, Map metadata, TypeRef type) { + ConfigurationItem configurationItems = new ConfigurationItem<>(); + configurationItems.setKey("test"); + configurationItems.setContent(null); + configurationItems.setGroup(getDefaultGroup()); + configurationItems.setLabel(getDefaultLabel()); + configurationItems.setTags(metadata); + configurationItems.setMetadata(metadata); + + List> items = Collections.singletonList(configurationItems); + return Mono.just(items); + } + + @Override + protected Flux> doSubscribe(String appId, String group, String label, String configName, List values, Map metadata, TypeRef type) { + return Flux.interval(Duration.ofSeconds(3)) + .map(aLong -> this.getSubscribeResp(appId, metadata, aLong)); + } + + @NotNull + private SubscribeResp getSubscribeResp(String appId, Map metadata, Long aLong) { + ConfigurationItem configurationItems = new ConfigurationItem<>(); + configurationItems.setKey("test" + aLong); + configurationItems.setContent(null); + configurationItems.setGroup(getDefaultGroup()); + configurationItems.setLabel(getDefaultLabel()); + configurationItems.setTags(metadata); + configurationItems.setMetadata(metadata); + + List> items = Collections.singletonList(configurationItems); + + SubscribeResp subscribeResp = new SubscribeResp<>(); + subscribeResp.setAppId(appId); + subscribeResp.setStoreName(getStoreName()); + subscribeResp.setItems(items); + return subscribeResp; + } + + @Override + public void close() throws Exception { + // paas + } +} diff --git a/sdk-spi-demo/src/main/resources/capa-component.properties b/sdk-spi-demo/src/main/resources/capa-component.properties index 6703aad..d1215aa 100644 --- a/sdk-spi-demo/src/main/resources/capa-component.properties +++ b/sdk-spi-demo/src/main/resources/capa-component.properties @@ -1,2 +1,4 @@ group.rxcloud.capa.component.http.CapaHttp=group.rxcloud.capa.spi.demo.http.DemoCapaHttp -group.rxcloud.capa.spi.config.CapaSpiOptionsLoader=group.rxcloud.capa.spi.demo.config.DemoSpiOptionsLoader \ No newline at end of file +group.rxcloud.capa.spi.config.CapaSpiOptionsLoader=group.rxcloud.capa.spi.demo.config.DemoSpiOptionsLoader + +group.rxcloud.capa.component.configstore.CapaConfigStore=group.rxcloud.capa.spi.demo.configstore.DemoCapaConfigStore \ No newline at end of file diff --git a/sdk-spi/pom.xml b/sdk-spi/pom.xml index da2dd2c..29650ee 100644 --- a/sdk-spi/pom.xml +++ b/sdk-spi/pom.xml @@ -7,7 +7,7 @@ capa-parent group.rxcloud - 1.0.1.RELEASE + 1.0.2.SNAPSHOT capa-sdk-spi diff --git a/sdk-spi/src/main/java/group/rxcloud/capa/spi/configstore/CapaConfigStoreSpi.java b/sdk-spi/src/main/java/group/rxcloud/capa/spi/configstore/CapaConfigStoreSpi.java new file mode 100644 index 0000000..dbdeafa --- /dev/null +++ b/sdk-spi/src/main/java/group/rxcloud/capa/spi/configstore/CapaConfigStoreSpi.java @@ -0,0 +1,128 @@ +package group.rxcloud.capa.spi.configstore; + +import group.rxcloud.capa.component.configstore.*; +import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer; +import group.rxcloud.cloudruntimes.utils.TypeRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * The SPI Capa ConfigStore client. Templates for different implementations. + */ +public abstract class CapaConfigStoreSpi extends CapaConfigStore { + + private static final Logger logger = LoggerFactory.getLogger(CapaConfigStoreSpi.class); + + /** + * Instantiates a new Capa ConfigStore. + * + * @param objectSerializer Serializer for transient request/response objects. + */ + public CapaConfigStoreSpi(CapaObjectSerializer objectSerializer) { + super(objectSerializer); + } + + @Override + public Mono>> get(GetRequest getRequest, TypeRef type) { + if (logger.isDebugEnabled()) { + logger.debug("[CapaConfigStoreSpi] get config request[{}]", getRequest); + } + final String appId = getRequest.getAppId(); + final String group = getRequest.getGroup(); + final String label = getRequest.getLabel(); + final Map metadata = getRequest.getMetadata(); + + // [config_name, value1, value2, ...] + final List keys = getRequest.getKeys(); + Objects.requireNonNull(keys, "keys"); + final String configName = keys.get(0); + final List values = keys.subList(1, keys.size()); + if (logger.isDebugEnabled()) { + logger.debug("[CapaConfigStoreSpi] get config[config_name={}, values={}]", + configName, keys); + } + + return doGet(appId, group, label, configName, values, metadata, type); + } + + /** + * Get configuration. + * + * @param the response type parameter + * @param appId the app id + * @param group the group + * @param label the label + * @param configName the config name + * @param values the values + * @param metadata the metadata + * @param type the response type + * @return the async mono response + */ + protected abstract Mono>> doGet(String appId, + String group, + String label, + String configName, + List values, + Map metadata, + TypeRef type); + + @Override + public Flux> subscribe(SubscribeReq subscribeReq, TypeRef type) { + if (logger.isDebugEnabled()) { + logger.debug("[CapaConfigStoreSpi] subscribe config request[{}]", subscribeReq); + } + final String appId = subscribeReq.getAppId(); + final String group = subscribeReq.getGroup(); + final String label = subscribeReq.getLabel(); + final Map metadata = subscribeReq.getMetadata(); + + // [config_name, value1, value2, ...] + final List keys = subscribeReq.getKeys(); + Objects.requireNonNull(keys, "keys"); + final String configName = keys.get(0); + final List values = keys.subList(1, keys.size()); + if (logger.isDebugEnabled()) { + logger.debug("[CapaConfigStoreSpi] subscribe config[config_name={}, values={}]", + configName, keys); + } + + return doSubscribe(appId, group, label, configName, values, metadata, type); + } + + /** + * Subscribe configuration. + * + * @param the response type parameter + * @param appId the app id + * @param group the group + * @param label the label + * @param configName the config name + * @param values the values + * @param metadata the metadata + * @param type the response type + * @return the async flux of response stream + */ + protected abstract Flux> doSubscribe(String appId, + String group, + String label, + String configName, + List values, + Map metadata, + TypeRef type); + + @Override + public String getDefaultGroup() { + return "application"; + } + + @Override + public String getDefaultLabel() { + return ""; + } +} diff --git a/sdk/pom.xml b/sdk/pom.xml index 16b81e3..2b8403f 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -7,7 +7,7 @@ group.rxcloud capa-parent - 1.0.1.RELEASE + 1.0.2.SNAPSHOT capa-sdk diff --git a/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientBuilder.java b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientBuilder.java new file mode 100644 index 0000000..76bb443 --- /dev/null +++ b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientBuilder.java @@ -0,0 +1,62 @@ +package group.rxcloud.capa.configuration; + + +import group.rxcloud.capa.component.configstore.CapaConfigStore; +import group.rxcloud.capa.component.configstore.CapaConfigStoreBuilder; +import group.rxcloud.capa.component.configstore.StoreConfig; + +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * A builder for the {@link CapaConfigurationClient}. + */ +public class CapaConfigurationClientBuilder { + + /** + * Builder for Capa's ConfigStore Client. + */ + private final List configStoreBuilders; + + /** + * Creates a constructor for {@link CapaConfigurationClient}. + */ + public CapaConfigurationClientBuilder(StoreConfig storeConfig) { + this(Collections.singletonList(new CapaConfigStoreBuilder(storeConfig))); + } + + /** + * Creates a constructor for {@link CapaConfigurationClient} with custom {@link CapaConfigStoreBuilder}. + */ + public CapaConfigurationClientBuilder(Supplier capaConfigStoreBuilderSupplier) { + this(Collections.singletonList(capaConfigStoreBuilderSupplier.get())); + } + + /** + * Creates a constructor for {@link CapaConfigurationClient}. + */ + public CapaConfigurationClientBuilder(List configStoreBuilders) { + this.configStoreBuilders = configStoreBuilders; + } + + /** + * Build an instance of the Client based on the provided setup. + * + * @return an instance of the setup Client + */ + public CapaConfigurationClient build() { + return buildCapaClientStore(); + } + + /** + * Creates and instance of {@link CapaConfigurationClient}. + */ + private CapaConfigurationClient buildCapaClientStore() { + List capaConfigStores = this.configStoreBuilders.stream() + .map(CapaConfigStoreBuilder::build) + .collect(Collectors.toList()); + return new CapaConfigurationClientStore(capaConfigStores); + } +} diff --git a/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java index 77682fd..bafc9dd 100644 --- a/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java +++ b/sdk/src/main/java/group/rxcloud/capa/configuration/CapaConfigurationClientStore.java @@ -1,6 +1,9 @@ package group.rxcloud.capa.configuration; import group.rxcloud.capa.component.configstore.CapaConfigStore; +import group.rxcloud.capa.component.configstore.GetRequest; +import group.rxcloud.capa.component.configstore.SubscribeReq; +import group.rxcloud.capa.component.configstore.SubscribeResp; import group.rxcloud.capa.infrastructure.exceptions.CapaExceptions; import group.rxcloud.cloudruntimes.domain.core.configuration.ConfigurationItem; import group.rxcloud.cloudruntimes.domain.core.configuration.ConfigurationRequestItem; @@ -9,8 +12,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; /** * An adapter for the Config Store Client. @@ -18,45 +22,177 @@ public class CapaConfigurationClientStore extends AbstractCapaConfigurationClient { /** - * The Store client to be used. + * The Store clients to be used. * * @see CapaConfigStore */ - private final CapaConfigStore store; + private final Map configStores; - public CapaConfigurationClientStore(CapaConfigStore store) { - this.store = store; + public CapaConfigurationClientStore(List stores) { + if (stores == null || stores.isEmpty()) { + this.configStores = new HashMap<>(2, 1); + } else { + this.configStores = stores.stream() + .collect(Collectors.toMap( + CapaConfigStore::getStoreName, + Function.identity())); + } + } + + private CapaConfigStore getStore(String storeName) { + // check storeName + if (storeName == null || storeName.trim().isEmpty()) { + throw new IllegalArgumentException("Store Name cannot be null or empty."); + } + return Objects.requireNonNull(configStores.get(storeName), "Store Component cannot be null."); } @Override public Mono>> getConfiguration(ConfigurationRequestItem configurationRequestItem, TypeRef type) { try { - String storeName = configurationRequestItem.getStoreName(); - String appId = configurationRequestItem.getAppId(); + final String storeName = configurationRequestItem.getStoreName(); + final CapaConfigStore store = this.getStore(storeName); + + final String appId = configurationRequestItem.getAppId(); String group = configurationRequestItem.getGroup(); String label = configurationRequestItem.getLabel(); List keys = configurationRequestItem.getKeys(); Map metadata = configurationRequestItem.getMetadata(); - // check storeName - if (storeName == null || storeName.trim().isEmpty()) { - throw new IllegalArgumentException("Store Name cannot be null or empty."); + if (group == null || group.trim().isEmpty()) { + group = store.getDefaultGroup(); + } + if (label == null || label.trim().isEmpty()) { + label = store.getDefaultLabel(); } - // check appId - if (appId == null || appId.trim().isEmpty()) { - throw new IllegalArgumentException("App Id cannot be null or empty."); + if (keys == null) { + keys = new ArrayList<>(2); + } + if (metadata == null) { + metadata = new HashMap<>(2, 1); } - // TODO: 2021/8/6 + final String finalGroup = group; + final String finalLabel = label; + final List finalKeys = keys; + final Map finalMetadata = metadata; - return Mono.empty(); + return Mono.subscriberContext() + .flatMap(context -> { + GetRequest getRequest = this.getStoreRequest( + appId, + finalGroup, + finalLabel, + finalKeys, + finalMetadata); + return store.get(getRequest, type); + }) + .flatMap(configurationItems -> { + if (configurationItems == null || configurationItems.isEmpty()) { + return Mono.empty(); + } + return Flux.fromStream(configurationItems + .stream() + .map(this::getStoreResponse)) + .collectList(); + }); } catch (Exception ex) { return CapaExceptions.wrapMono(ex); } } + private GetRequest getStoreRequest(String appId, String group, String label, List keys, Map metadata) { + GetRequest getRequest = new GetRequest(); + getRequest.setAppId(appId); + getRequest.setGroup(group); + getRequest.setLabel(label); + getRequest.setKeys(keys); + getRequest.setMetadata(metadata); + return getRequest; + } + + private ConfigurationItem getStoreResponse(group.rxcloud.capa.component.configstore.ConfigurationItem tConfigurationItem) { + ConfigurationItem configurationItem = new ConfigurationItem<>(); + configurationItem.setKey(tConfigurationItem.getKey()); + configurationItem.setContent(tConfigurationItem.getContent()); + configurationItem.setGroup(tConfigurationItem.getGroup()); + configurationItem.setLabel(tConfigurationItem.getLabel()); + configurationItem.setTags(tConfigurationItem.getTags()); + configurationItem.setMetadata(tConfigurationItem.getMetadata()); + return configurationItem; + } + @Override public Flux> subscribeConfiguration(ConfigurationRequestItem configurationRequestItem, TypeRef type) { - return null; + try { + final String storeName = configurationRequestItem.getStoreName(); + final CapaConfigStore store = this.getStore(storeName); + + final String appId = configurationRequestItem.getAppId(); + String group = configurationRequestItem.getGroup(); + String label = configurationRequestItem.getLabel(); + List keys = configurationRequestItem.getKeys(); + Map metadata = configurationRequestItem.getMetadata(); + if (group == null || group.trim().isEmpty()) { + group = store.getDefaultGroup(); + } + if (label == null || label.trim().isEmpty()) { + label = store.getDefaultLabel(); + } + if (keys == null) { + keys = new ArrayList<>(2); + } + if (metadata == null) { + metadata = new HashMap<>(2, 1); + } + + final String finalGroup = group; + final String finalLabel = label; + final List finalKeys = keys; + final Map finalMetadata = metadata; + + return Flux.deferWithContext(Mono::just) + .flatMap(context -> { + SubscribeReq subscribeReq = this.getSubscribeRequest( + appId, + finalGroup, + finalLabel, + finalKeys, + finalMetadata); + return store.subscribe(subscribeReq, type); + }) + .flatMap(subscribeResp -> { + if (subscribeResp == null) { + return Flux.empty(); + } + SubConfigurationResp subscribeResponse = this.getSubscribeResponse(subscribeResp); + return Flux.just(subscribeResponse); + }); + } catch (Exception ex) { + return CapaExceptions.wrapFlux(ex); + } + } + + private SubscribeReq getSubscribeRequest(String appId, String group, String label, List keys, Map metadata) { + SubscribeReq subscribeReq = new SubscribeReq(); + subscribeReq.setAppId(appId); + subscribeReq.setGroup(group); + subscribeReq.setLabel(label); + subscribeReq.setKeys(keys); + subscribeReq.setMetadata(metadata); + return subscribeReq; + } + + private SubConfigurationResp getSubscribeResponse(SubscribeResp subscribeResp) { + SubConfigurationResp subConfigurationResp = new SubConfigurationResp<>(); + subConfigurationResp.setStoreName(subscribeResp.getStoreName()); + subConfigurationResp.setAppId(subscribeResp.getAppId()); + if (subscribeResp.getItems() != null && subscribeResp.getItems().isEmpty()) { + List> itemList = subscribeResp.getItems().stream() + .map(this::getStoreResponse) + .collect(Collectors.toList()); + subConfigurationResp.setItems(itemList); + } + return subConfigurationResp; } /** @@ -65,7 +201,9 @@ public Flux> subscribeConfiguration(ConfigurationReq @Override public void close() { try { - store.close(); + for (CapaConfigStore store : configStores.values()) { + store.close(); + } } catch (Exception e) { throw CapaExceptions.propagate(e); } diff --git a/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java b/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java index cdba481..da11138 100644 --- a/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java +++ b/sdk/src/main/java/group/rxcloud/capa/pubsub/CapaPubSubClient.java @@ -1,36 +1,13 @@ package group.rxcloud.capa.pubsub; import group.rxcloud.cloudruntimes.client.DefaultCloudRuntimesClient; -import group.rxcloud.cloudruntimes.domain.core.binding.InvokeBindingRequest; import group.rxcloud.cloudruntimes.domain.core.pubsub.PublishEventRequest; -import group.rxcloud.cloudruntimes.utils.TypeRef; import reactor.core.publisher.Mono; import java.util.Map; public interface CapaPubSubClient extends DefaultCloudRuntimesClient { - @Override - Mono invokeBinding(String bindingName, String operation, Object data); - - @Override - Mono invokeBinding(String bindingName, String operation, byte[] data, Map metadata); - - @Override - Mono invokeBinding(String bindingName, String operation, Object data, TypeRef type); - - @Override - Mono invokeBinding(String bindingName, String operation, Object data, Class clazz); - - @Override - Mono invokeBinding(String bindingName, String operation, Object data, Map metadata, TypeRef type); - - @Override - Mono invokeBinding(String bindingName, String operation, Object data, Map metadata, Class clazz); - - @Override - Mono invokeBinding(InvokeBindingRequest request, TypeRef type); - @Override Mono publishEvent(String pubsubName, String topicName, Object data); diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java b/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java index b659e0c..175f873 100644 --- a/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java +++ b/sdk/src/main/java/group/rxcloud/capa/rpc/AbstractCapaRpcClient.java @@ -23,7 +23,6 @@ public Mono invokeMethod(String appId, String methodName, Object request, .withHttpExtension(httpExtension) .withMetadata(metadata) .build(); - return this.invokeMethod(req, type); } diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java index 7bf0ec3..d3c9685 100644 --- a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java +++ b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaApiProtocol.java @@ -13,7 +13,8 @@ public enum CapaApiProtocol { /** * HTTP/1.1 */ - HTTP; + HTTP, + GRPC; /** * Parse protocol to {@link CapaApiProtocol}. diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java index 8317634..6c64375 100644 --- a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java +++ b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientBuilder.java @@ -7,8 +7,7 @@ import java.util.function.Supplier; /** - * A builder for the {@link CapaRpcClient}, - * Currently only HTTP Client will be supported. + * A builder for the {@link CapaRpcClient}, Currently only HTTP Client will be supported. */ public class CapaRpcClientBuilder { @@ -65,13 +64,10 @@ private CapaRpcClient buildCapaClient(CapaApiProtocol protocol) { if (protocol == null) { throw new IllegalStateException("Protocol is required."); } - - switch (protocol) { - case HTTP: - return buildCapaClientHttp(); - default: - throw new IllegalStateException("Unsupported protocol: " + protocol.name()); + if (protocol == CapaApiProtocol.HTTP) { + return buildCapaClientHttp(); } + throw new IllegalStateException("Unsupported protocol: " + protocol.name()); } /** diff --git a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java index 3439b6e..6ee8788 100644 --- a/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java +++ b/sdk/src/main/java/group/rxcloud/capa/rpc/CapaRpcClientHttp.java @@ -1,7 +1,6 @@ package group.rxcloud.capa.rpc; import group.rxcloud.capa.component.http.CapaHttp; -import group.rxcloud.capa.component.http.HttpResponse; import group.rxcloud.capa.infrastructure.exceptions.CapaExceptions; import group.rxcloud.cloudruntimes.domain.core.invocation.HttpExtension; import group.rxcloud.cloudruntimes.domain.core.invocation.InvokeMethodRequest; @@ -63,8 +62,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef headers.putAll(metadata); } - Mono> httpResponseMono = Mono - .subscriberContext() + return Mono.subscriberContext() .flatMap(context -> this.client.invokeApi( httpMethod, @@ -73,8 +71,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef request, headers, context, - type)); - Mono responseMono = httpResponseMono + type)) .flatMap(httpResponse -> { T object = httpResponse.getBody(); if (object != null) { @@ -82,7 +79,6 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef } return Mono.empty(); }); - return responseMono; } catch (Exception ex) { return CapaExceptions.wrapMono(ex); }