Skip to content
This repository has been archived by the owner on Jan 22, 2023. It is now read-only.

Commit

Permalink
增加rocketmq Acl 账号验证 (#3)
Browse files Browse the repository at this point in the history
* Add acl code.

* Add acl and update doc.
  • Loading branch information
kuaile-zc authored Jun 6, 2022
1 parent 1e9a7e0 commit 78f213d
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 14 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ Notice: **In InOut pattern, the message won't be routed until reply received.**
|---|---|---|---|
| topicName | common | (Required) consumer/producer's topic | |
| namesrvAddr | common | NameServer (Separate by comma) | localhost:9876 |
| accessKey | common | Rocketmq acl accessKey | |
| secretKey | common | Rocketmq acl secretKey | |
| consumerGroup | consumer | Consumer group name | |
| subscribeTags | consumer | Subscribe tags expression | * |
| producerGroup | producer | Producer group name | |
Expand Down
2 changes: 2 additions & 0 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ from("rocketmq:{{inout.rocketmq.topic.from}}?namesrvAddr={{rocketmq.namesrv.addr
|---|---|---|---|
| topicName | common | (必须)消费或生产消息的 topic 名称 | |
| namesrvAddr | common | NameServer 地址,英文逗号分隔 | localhost:9876 |
| accessKey | common | Rocketmq acl accessKey | |
| secretKey | common | Rocketmq acl secretKey | |
| consumerGroup | consumer | 消费者组名称 | |
| subscribeTags | consumer | 订阅消息 Tag 表达式 | * |
| producerGroup | producer | 生产者组名称 | |
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ plugins {
}

group 'icu.wwj.camel'
version '3.2.0-RC1'
version '3.2.0-RC2'

sourceCompatibility = 1.8

Expand All @@ -40,6 +40,7 @@ repositories {
dependencies {
implementation "org.apache.camel:camel-core:${camelCoreVersion}"
implementation "org.apache.rocketmq:rocketmq-client:4.7.0"
implementation "org.apache.rocketmq:rocketmq-acl:4.7.0"
testImplementation 'junit:junit:4.13'
testImplementation "org.apache.camel.springboot:camel-spring-boot-starter:${camelCoreVersion}"
testImplementation "io.hawt:hawtio-springboot:2.9.1"
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/icu/wwj/camel/component/rocketmq/AclUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package icu.wwj.camel.component.rocketmq;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

/**
* @author zhangchen
*/
public class AclUtils {

public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package icu.wwj.camel.component.rocketmq;

import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.support.DefaultComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,6 +65,12 @@ public class RocketMQComponent extends DefaultComponent {
@Metadata(label = "producer", defaultValue = "false")
private Boolean waitForSendResult = false;

@Metadata(label = "accessKey")
private String accessKey;

@Metadata(label = "secretKey")
private String secretKey;

@Override
protected RocketMQEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {

Expand Down Expand Up @@ -163,4 +170,20 @@ public Boolean getWaitForSendResult() {
public void setWaitForSendResult(Boolean waitForSendResult) {
this.waitForSendResult = waitForSendResult;
}

public String getAccessKey() {
return accessKey;
}

public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}

public String getSecretKey() {
return secretKey;
}

public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private void startConsumer() throws MQClientException {
if (mqPushConsumer != null) {
logger.warn("Overriding RocketMQ Consumer! {}", mqPushConsumer);
}
mqPushConsumer = new DefaultMQPushConsumer(endpoint.getConsumerGroup());
mqPushConsumer = new DefaultMQPushConsumer(null, endpoint.getConsumerGroup(), AclUtils.getAclRPCHook(getEndpoint().getAccessKey(), getEndpoint().getSecretKey()));
mqPushConsumer.setNamesrvAddr(endpoint.getNamesrvAddr());
mqPushConsumer.subscribe(endpoint.getTopicName(), "*");
mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
Expand Down Expand Up @@ -86,7 +86,7 @@ private void stopConsumer() {
public RocketMQEndpoint getEndpoint() {
return (RocketMQEndpoint) super.getEndpoint();
}

@Override
protected void doSuspend() {
stopConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
private Long requestTimeoutCheckerInterval = 1000L;
@UriParam(label = "producer", defaultValue = "false")
private Boolean waitForSendResult = false;
@UriParam(label = "accessKey")
private String accessKey;
@UriParam(label = "secretKey")
private String secretKey;

public RocketMQEndpoint() {
}
Expand Down Expand Up @@ -188,4 +192,20 @@ public Boolean getWaitForSendResult() {
public void setWaitForSendResult(Boolean waitForSendResult) {
this.waitForSendResult = waitForSendResult;
}

public String getAccessKey() {
return accessKey;
}

public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}

public String getSecretKey() {
return secretKey;
}

public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,21 +50,21 @@
* @author wuweijie
*/
public class RocketMQProducer extends DefaultAsyncProducer {

public static final String GENERATE_MESSAGE_KEY_PREFIX = "camel-rocketmq-";

private final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);

private final AtomicBoolean started = new AtomicBoolean(false);

private DefaultMQProducer mqProducer;

private ReplyManager replyManager;

public RocketMQProducer(RocketMQEndpoint endpoint) {
super(endpoint);
}

@Override
public RocketMQEndpoint getEndpoint() {
return (RocketMQEndpoint) super.getEndpoint();
Expand Down Expand Up @@ -169,7 +173,7 @@ protected void unInitReplyManager() {
started.set(false);
}
}

private ReplyManager createReplyManager() {
RocketMQReplyManagerSupport replyManager = new RocketMQReplyManagerSupport(getEndpoint().getCamelContext());
replyManager.setEndpoint(getEndpoint());
Expand All @@ -180,7 +184,7 @@ private ReplyManager createReplyManager() {
ServiceHelper.startService(replyManager);
return replyManager;
}

protected boolean processInOnly(Exchange exchange, AsyncCallback callback) throws NoTypeConversionAvailableException, InterruptedException, RemotingException, MQClientException {
org.apache.camel.Message in = exchange.getIn();
Message message = new Message();
Expand Down Expand Up @@ -212,11 +216,11 @@ public void onException(Throwable e) {

@Override
protected void doStart() throws Exception {
this.mqProducer = new DefaultMQProducer(getEndpoint().getProducerGroup());
this.mqProducer = new DefaultMQProducer(null, getEndpoint().getProducerGroup(), AclUtils.getAclRPCHook(getEndpoint().getAccessKey(), getEndpoint().getSecretKey()));
this.mqProducer.setNamesrvAddr(getEndpoint().getNamesrvAddr());
this.mqProducer.start();
}

@Override
protected void doStop() {
unInitReplyManager();
Expand Down

0 comments on commit 78f213d

Please sign in to comment.