Skip to content
This repository has been archived by the owner on Jan 22, 2023. It is now read-only.

Commit

Permalink
添加默认值,完善文档
Browse files Browse the repository at this point in the history
  • Loading branch information
TeslaCN committed Apr 10, 2020
1 parent e670b61 commit 3c2a1e9
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 30 deletions.
59 changes: 57 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,62 @@
# Camel RocketMQ component

Usage:
## 快速入门

### 基本使用
```
from("rocketmq:from_topic?namesrvAddr=localhost:9876&consumerGroup=consumer")
.to("rocketmq:to_topic?namesrvAddr=localhost:9876&producerGroup=producer");
```
```

### InOut 模式

InOut 模式的实现借助了 Message Key,Producer 在发送消息的时候,会生成一个 messageKey 追加到消息的 key 部分。

Producer 消息发送后,启动一个 Consumer 监听 `ReplyToTopic` 参数配置的 Topic。

`ReplyToTpic` 中的消息包含发送消息时生成的 Key,则对应消息的 Reply 已收到,继续执行后续路由。

如果超过 `requestTimeout` 毫秒后仍然没有收到 Reply,则抛出异常。

```
from("rocketmq:{{inout.rocketmq.topic.from}}?namesrvAddr={{rocketmq.namesrv.addr}}" +
"&consumerGroup={{inout.rocketmq.consumer.group}}" +
"&requestTimeout=10000")
.inOut("rocketmq:{{inout.rocketmq.topic.to}}?namesrvAddr={{rocketmq.namesrv.addr}}" +
"&producerGroup={{inout.rocketmq.producer.group}}" +
"&replyToTopic={{inout.rocketmq.reply.to.topic}}" +
"&requestTimeout={{inout.request.timeout}}" +
"&replyToConsumerGroup={{inout.rocketmq.reply.to.consumer}}"
)
.to("log:InOutRoute?showAll=true")
```



注意:**在 InOut 模式下,只有收到 Reply 才会继续路由**

## 组件参数

### InOnly 模式

| 参数 | 类型 | 含义 | 默认值 |
|---|---|---|---|
| topicName | common | (必须)消费或生产消息的 topic 名称 | |
| namesrvAddr | common | NameServer 地址,英文逗号分隔 | localhost:9876 |
| consumerGroup | consumer | 消费者组名称 | |
| subscribeTags | consumer | 订阅消息 Tag 表达式 | * |
| producerGroup | producer | 生产者组名称 | |
| sendTag | producer | 发送消息 Tag | |
| waitForSendResult | producer | 是否阻塞发送消息 | false |


### InOut 模式

| 参数 | 类型 | 含义 | 默认值 |
|---|---|---|---|
| replyToTopic | producer | 监听回复的 Topic ||
| replyToConsumerGroup | producer | 监听回复的消费者组 ||
| requestTimeout | producer | 等待回复时间 | 10000 |
| requestTimeoutCheckerInterval | advance | 回复超时检查间隔 | 1000 |
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ publishing {
def snapshotsRepoUrl = "https://oss.sonatype.org/content/repositories/snapshots/"
url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl
credentials {
username NEXUS_USERNAME
password NEXUS_PASSWORD
username System.getenv("NEXUS_USERNAME")
password System.getenv("NEXUS_PASSWORD")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,29 @@ public class RocketMQComponent extends DefaultComponent {
@Metadata(label = "consumer")
private String consumerGroup;

@Metadata(label = "consumer")
private String subscribeTags;
@Metadata(label = "consumer", defaultValue = "*")
private String subscribeTags = "*";

@Metadata(label = "common")
private String sendTag;
@Metadata(label = "common", defaultValue = "")
private String sendTag = "";

@Metadata(label = "common")
private String namesrvAddr;
@Metadata(label = "common", defaultValue = "localhost:9876")
private String namesrvAddr = "localhost:9876";

@Metadata(label = "producer")
private String replyToTopic;

@Metadata(label = "producer")
private String replyToConsumerGroup;

@Metadata(label = "advance")
private Long requestTimeout;
@Metadata(label = "advance", defaultValue = "10000")
private Long requestTimeout = 10000L;

@Metadata(label = "advance")
private Long requestTimeoutCheckerInterval;
@Metadata(label = "advance", defaultValue = "1000")
private Long requestTimeoutCheckerInterval = 1000L;

@Metadata(label = "producer")
private Boolean waitForSendResult;
@Metadata(label = "producer", defaultValue = "false")
private Boolean waitForSendResult = false;

@Override
protected RocketMQEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,31 @@ public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
@UriPath
@Metadata(required = "true")
private String topicName;

@UriParam(label = "producer")
private String producerGroup;

@UriParam(label = "consumer")
private String consumerGroup;

private final RocketMQMessageConverter messageConverter = new RocketMQMessageConverter();
@UriParam(label = "consumer")
private String subscribeTags;
@UriParam(label = "producer")
private String sendTag;
@UriParam(label = "common")
private String namesrvAddr = "localhost:9876";
@UriParam(label = "consumer", defaultValue = "*")
private String subscribeTags = "*";
@UriParam(label = "producer", defaultValue = "")
private String sendTag = "";
@UriParam(label = "producer")
private String replyToTopic;
@UriParam(label = "producer")
private String replyToConsumerGroup;
@UriParam(label = "advance")
@UriParam(label = "common", defaultValue = "localhost:9876")
private String namesrvAddr = "localhost:9876";
@UriParam(label = "advance", defaultValue = "10000")
private Long requestTimeout = 10000L;
@UriParam(label = "advance")
@UriParam(label = "advance", defaultValue = "1000")
private Long requestTimeoutCheckerInterval = 1000L;
@UriParam(label = "producer", defaultValue = "false")
private Boolean waitForSendResult = false;

public RocketMQEndpoint() {
}

@UriParam(label = "producer")
private Boolean waitForSendResult = false;

public RocketMQEndpoint(String endpointUri, RocketMQComponent component) {
super(endpointUri, component);
}
Expand Down

0 comments on commit 3c2a1e9

Please sign in to comment.