diff --git a/README.md b/README.md index 1d1bc7c..31ebbdf 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,62 @@ # Camel RocketMQ component -Usage: +## 快速入门 + +### 基本使用 ``` from("rocketmq:from_topic?namesrvAddr=localhost:9876&consumerGroup=consumer") .to("rocketmq:to_topic?namesrvAddr=localhost:9876&producerGroup=producer"); -``` \ No newline at end of file +``` + +### InOut 模式 + +InOut 模式的实现借助了 Message Key,Producer 在发送消息的时候,会生成一个 messageKey 追加到消息的 key 部分。 + +Producer 消息发送后,启动一个 Consumer 监听 `ReplyToTopic` 参数配置的 Topic。 + +当 `ReplyToTpic` 中的消息包含发送消息时生成的 Key,则对应消息的 Reply 已收到,继续执行后续路由。 + +如果超过 `requestTimeout` 毫秒后仍然没有收到 Reply,则抛出异常。 + +``` +from("rocketmq:{{inout.rocketmq.topic.from}}?namesrvAddr={{rocketmq.namesrv.addr}}" + + "&consumerGroup={{inout.rocketmq.consumer.group}}" + + "&requestTimeout=10000") + +.inOut("rocketmq:{{inout.rocketmq.topic.to}}?namesrvAddr={{rocketmq.namesrv.addr}}" + + "&producerGroup={{inout.rocketmq.producer.group}}" + + "&replyToTopic={{inout.rocketmq.reply.to.topic}}" + + "&requestTimeout={{inout.request.timeout}}" + + "&replyToConsumerGroup={{inout.rocketmq.reply.to.consumer}}" +) + +.to("log:InOutRoute?showAll=true") +``` + + + +注意:**在 InOut 模式下,只有收到 Reply 才会继续路由** + +## 组件参数 + +### InOnly 模式 + +| 参数 | 类型 | 含义 | 默认值 | +|---|---|---|---| +| topicName | common | (必须)消费或生产消息的 topic 名称 | | +| namesrvAddr | common | NameServer 地址,英文逗号分隔 | localhost:9876 | +| consumerGroup | consumer | 消费者组名称 | | +| subscribeTags | consumer | 订阅消息 Tag 表达式 | * | +| producerGroup | producer | 生产者组名称 | | +| sendTag | producer | 发送消息 Tag | | +| waitForSendResult | producer | 是否阻塞发送消息 | false | + + +### InOut 模式 + +| 参数 | 类型 | 含义 | 默认值 | +|---|---|---|---| +| replyToTopic | producer | 监听回复的 Topic || +| replyToConsumerGroup | producer | 监听回复的消费者组 || +| requestTimeout | producer | 等待回复时间 | 10000 | +| requestTimeoutCheckerInterval | advance | 回复超时检查间隔 | 1000 | diff --git a/build.gradle b/build.gradle index c888b1a..6dde5d3 100644 --- a/build.gradle +++ b/build.gradle @@ -93,8 +93,8 @@ publishing { def snapshotsRepoUrl = "https://oss.sonatype.org/content/repositories/snapshots/" url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl credentials { - username NEXUS_USERNAME - password NEXUS_PASSWORD + username System.getenv("NEXUS_USERNAME") + password System.getenv("NEXUS_PASSWORD") } } } diff --git a/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQComponent.java b/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQComponent.java index 426c90e..d0e8745 100644 --- a/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQComponent.java +++ b/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQComponent.java @@ -40,14 +40,14 @@ public class RocketMQComponent extends DefaultComponent { @Metadata(label = "consumer") private String consumerGroup; - @Metadata(label = "consumer") - private String subscribeTags; + @Metadata(label = "consumer", defaultValue = "*") + private String subscribeTags = "*"; - @Metadata(label = "common") - private String sendTag; + @Metadata(label = "common", defaultValue = "") + private String sendTag = ""; - @Metadata(label = "common") - private String namesrvAddr; + @Metadata(label = "common", defaultValue = "localhost:9876") + private String namesrvAddr = "localhost:9876"; @Metadata(label = "producer") private String replyToTopic; @@ -55,14 +55,14 @@ public class RocketMQComponent extends DefaultComponent { @Metadata(label = "producer") private String replyToConsumerGroup; - @Metadata(label = "advance") - private Long requestTimeout; + @Metadata(label = "advance", defaultValue = "10000") + private Long requestTimeout = 10000L; - @Metadata(label = "advance") - private Long requestTimeoutCheckerInterval; + @Metadata(label = "advance", defaultValue = "1000") + private Long requestTimeoutCheckerInterval = 1000L; - @Metadata(label = "producer") - private Boolean waitForSendResult; + @Metadata(label = "producer", defaultValue = "false") + private Boolean waitForSendResult = false; @Override protected RocketMQEndpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { diff --git a/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQEndpoint.java b/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQEndpoint.java index c1d5764..2e6512f 100644 --- a/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQEndpoint.java +++ b/src/main/java/vip/wuweijie/camel/component/rocketmq/RocketMQEndpoint.java @@ -38,35 +38,31 @@ public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { @UriPath @Metadata(required = "true") private String topicName; - @UriParam(label = "producer") private String producerGroup; - @UriParam(label = "consumer") private String consumerGroup; - private final RocketMQMessageConverter messageConverter = new RocketMQMessageConverter(); - @UriParam(label = "consumer") - private String subscribeTags; - @UriParam(label = "producer") - private String sendTag; - @UriParam(label = "common") - private String namesrvAddr = "localhost:9876"; + @UriParam(label = "consumer", defaultValue = "*") + private String subscribeTags = "*"; + @UriParam(label = "producer", defaultValue = "") + private String sendTag = ""; @UriParam(label = "producer") private String replyToTopic; @UriParam(label = "producer") private String replyToConsumerGroup; - @UriParam(label = "advance") + @UriParam(label = "common", defaultValue = "localhost:9876") + private String namesrvAddr = "localhost:9876"; + @UriParam(label = "advance", defaultValue = "10000") private Long requestTimeout = 10000L; - @UriParam(label = "advance") + @UriParam(label = "advance", defaultValue = "1000") private Long requestTimeoutCheckerInterval = 1000L; + @UriParam(label = "producer", defaultValue = "false") + private Boolean waitForSendResult = false; public RocketMQEndpoint() { } - @UriParam(label = "producer") - private Boolean waitForSendResult = false; - public RocketMQEndpoint(String endpointUri, RocketMQComponent component) { super(endpointUri, component); }