Skip to content
This repository has been archived by the owner on Jan 22, 2023. It is now read-only.

Commit

Permalink
Implement InOut Exchange Pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
TeslaCN committed Apr 9, 2020
1 parent b5e0f06 commit e670b61
Show file tree
Hide file tree
Showing 18 changed files with 1,139 additions and 39 deletions.
66 changes: 66 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
plugins {
id 'java'
id 'maven'
id 'maven-publish'
id 'signing'
}

group 'vip.wuweijie.camel'
Expand All @@ -43,3 +45,67 @@ dependencies {
testImplementation "io.hawt:hawtio-springboot:2.5.0"
testImplementation "org.apache.camel:camel-test-spring:${camelCoreVersion}"
}

task sourcesJar(type: Jar) {
from sourceSets.main.allJava
classifier = 'sources'
}
task javadocJar(type: Jar) {
from javadoc
classifier = 'javadoc'
}

publishing {
publications {
mavenJava(MavenPublication) {
artifactId = 'camel-rocketmq'
from components.java
artifact sourcesJar
artifact javadocJar
pom {
name = 'camel-rocketmq'
description = 'Camel RocketMQ Component'
url = 'https://github.com/TeslaCN/camel-rocketmq'
licenses {
license {
name = 'The Apache License, Version 2.0'
url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
developers {
developer {
id = 'TeslaCN'
name = 'wuweijie'
email = 'wuweijie.io@qq.com'
}
}
scm {
connection = 'scm:git:git://github.com/TeslaCN/camel-rocketmq.git'
developerConnection = 'scm:git:ssh://github.com/TeslaCN/camel-rocketmq.git'
url = 'https://github.com/TeslaCN/camel-rocketmq'
}
}
}
}
repositories {
maven {
def releasesRepoUrl = "https://oss.sonatype.org/service/local/staging/deploy/maven2/"
def snapshotsRepoUrl = "https://oss.sonatype.org/content/repositories/snapshots/"
url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl
credentials {
username NEXUS_USERNAME
password NEXUS_PASSWORD
}
}
}
}

jar {
manifest {
attributes 'Implementation-Title': 'Camel RocketMQ',
'Implementation-Version': version
}
}
signing {
sign publishing.publications.mavenJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,30 @@ public class RocketMQComponent extends DefaultComponent {
@Metadata(label = "consumer")
private String consumerGroup;

@Metadata(label = "common")
private String tag;
@Metadata(label = "consumer")
private String subscribeTags;

@Metadata(label = "common")
private String key;
private String sendTag;

@Metadata(label = "common")
private String namesrvAddr;

@Metadata(label = "producer")
private String replyToTopic;

@Metadata(label = "producer")
private String replyToConsumerGroup;

@Metadata(label = "advance")
private Long requestTimeout;

@Metadata(label = "advance")
private Long requestTimeoutCheckerInterval;

@Metadata(label = "producer")
private Boolean waitForSendResult;

@Override
protected RocketMQEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {

Expand All @@ -69,20 +84,20 @@ public void setTopicName(String topicName) {
this.topicName = topicName;
}

public String getTag() {
return tag;
public String getSubscribeTags() {
return subscribeTags;
}

public void setTag(String tag) {
this.tag = tag;
public void setSubscribeTags(String subscribeTags) {
this.subscribeTags = subscribeTags;
}

public String getKey() {
return key;
public String getSendTag() {
return sendTag;
}

public void setKey(String key) {
this.key = key;
public void setSendTag(String sendTag) {
this.sendTag = sendTag;
}

public String getNamesrvAddr() {
Expand All @@ -108,4 +123,44 @@ public String getConsumerGroup() {
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}

public String getReplyToTopic() {
return replyToTopic;
}

public void setReplyToTopic(String replyToTopic) {
this.replyToTopic = replyToTopic;
}

public String getReplyToConsumerGroup() {
return replyToConsumerGroup;
}

public void setReplyToConsumerGroup(String replyToConsumerGroup) {
this.replyToConsumerGroup = replyToConsumerGroup;
}

public Long getRequestTimeout() {
return requestTimeout;
}

public void setRequestTimeout(Long requestTimeout) {
this.requestTimeout = requestTimeout;
}

public Long getRequestTimeoutCheckerInterval() {
return requestTimeoutCheckerInterval;
}

public void setRequestTimeoutCheckerInterval(Long requestTimeoutCheckerInterval) {
this.requestTimeoutCheckerInterval = requestTimeoutCheckerInterval;
}

public Boolean getWaitForSendResult() {
return waitForSendResult;
}

public void setWaitForSendResult(Boolean waitForSendResult) {
this.waitForSendResult = waitForSendResult;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Wu Weijie
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package vip.wuweijie.camel.component.rocketmq;

/**
* @author wuweijie
*/
public class RocketMQConstants {

public static final String BROKER_NAME = "rocketmq.BROKER_NAME";
public static final String QUEUE_ID = "rocketmq.QUEUE_ID";
public static final String STORE_SIZE = "rocketmq.STORE_SIZE";
public static final String QUEUE_OFFSET = "rocketmq.QUEUE_OFFSET";
public static final String SYS_FLAG = "rocketmq.SYS_FLAG";
public static final String BORN_TIMESTAMP = "rocketmq.BORN_TIMESTAMP";
public static final String BORN_HOST = "rocketmq.BORN_HOST";
public static final String STORE_TIMESTAMP = "rocketmq.STORE_TIMESTAMP";
public static final String STORE_HOST = "rocketmq.STORE_HOST";
public static final String MSG_ID = "rocketmq.MSG_ID";
public static final String COMMIT_LOG_OFFSET = "rocketmq.COMMIT_LOG_OFFSET";
public static final String BODY_CRC = "rocketmq.BODY_CRC";
public static final String RECONSUME_TIMES = "rocketmq.RECONSUME_TIMES";
public static final String PREPARED_TRANSACTION_OFFSET = "rocketmq.PREPARED_TRANSACTION_OFFSET";

public static final String OVERRIDE_TOPIC_NAME = "rocketmq.OVERRIDE_TOPIC_NAME";
public static final String OVERRIDE_TAG = "rocketmq.OVERRIDE_TAG";
public static final String OVERRIDE_MESSAGE_KEY = "rocketmq.OVERRIDE_MESSAGE_KEY";

public static final String TAG = "rocketmq.TAG";
public static final String TOPIC = "rocketmq.TOPIC";
public static final String KEY = "rocketmq.KEY";

private RocketMQConstants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ private void startConsumer() throws MQClientException {
mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
Exchange exchange = endpoint.createRocketExchange(msgs.get(0).getBody());
MessageExt messageExt = msgs.get(0);
Exchange exchange = endpoint.createRocketExchange(messageExt.getBody());
getEndpoint().getMessageConverter().setExchangeHeadersByMessageExt(exchange, messageExt);
try {
getProcessor().process(exchange);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,28 @@ public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
@UriParam(label = "consumer")
private String consumerGroup;

private final RocketMQMessageConverter messageConverter = new RocketMQMessageConverter();
@UriParam(label = "consumer")
private String subscribeTags;
@UriParam(label = "producer")
private String sendTag;
@UriParam(label = "common")
private String tag;

@UriParam(label = "common")
private String key;

@UriParam(label = "common")
private String namesrvAddr;
private String namesrvAddr = "localhost:9876";
@UriParam(label = "producer")
private String replyToTopic;
@UriParam(label = "producer")
private String replyToConsumerGroup;
@UriParam(label = "advance")
private Long requestTimeout = 10000L;
@UriParam(label = "advance")
private Long requestTimeoutCheckerInterval = 1000L;

public RocketMQEndpoint() {
}

@UriParam(label = "producer")
private Boolean waitForSendResult = false;

public RocketMQEndpoint(String endpointUri, RocketMQComponent component) {
super(endpointUri, component);
}
Expand Down Expand Up @@ -95,20 +105,20 @@ public void setTopicName(String topicName) {
this.topicName = topicName;
}

public String getTag() {
return tag;
public String getSubscribeTags() {
return subscribeTags;
}

public void setTag(String tag) {
this.tag = tag;
public void setSubscribeTags(String subscribeTags) {
this.subscribeTags = subscribeTags;
}

public String getKey() {
return key;
public String getSendTag() {
return sendTag;
}

public void setKey(String key) {
this.key = key;
public void setSendTag(String sendTag) {
this.sendTag = sendTag;
}

public String getNamesrvAddr() {
Expand All @@ -134,4 +144,48 @@ public String getConsumerGroup() {
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}

public String getReplyToTopic() {
return replyToTopic;
}

public void setReplyToTopic(String replyToTopic) {
this.replyToTopic = replyToTopic;
}

public String getReplyToConsumerGroup() {
return replyToConsumerGroup;
}

public void setReplyToConsumerGroup(String replyToConsumerGroup) {
this.replyToConsumerGroup = replyToConsumerGroup;
}

public RocketMQMessageConverter getMessageConverter() {
return messageConverter;
}

public Long getRequestTimeout() {
return requestTimeout;
}

public void setRequestTimeout(Long requestTimeout) {
this.requestTimeout = requestTimeout;
}

public Long getRequestTimeoutCheckerInterval() {
return requestTimeoutCheckerInterval;
}

public void setRequestTimeoutCheckerInterval(Long requestTimeoutCheckerInterval) {
this.requestTimeoutCheckerInterval = requestTimeoutCheckerInterval;
}

public Boolean getWaitForSendResult() {
return waitForSendResult;
}

public void setWaitForSendResult(Boolean waitForSendResult) {
this.waitForSendResult = waitForSendResult;
}
}
Loading

0 comments on commit e670b61

Please sign in to comment.