Skip to content

Commit

Permalink
Merge pull request #42 from SolaceLabs/partitioned-queues
Browse files Browse the repository at this point in the history
Partitioned queues
  • Loading branch information
SravanThotakura05 authored Feb 1, 2024
2 parents 1eb077b + b5b95cd commit 345151d
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@MessageLogger(projectCode = "SRMSG", length = 5)
public interface SolaceLogging extends BasicLogger {

SolaceLogging log = Logger.getMessageLogger(SolaceLogging.class, "com.solace.quarkus");
SolaceLogging log = Logger.getMessageLogger(SolaceLogging.class, "com.solace.quarkus.messaging");

@Once
@LogMessage(level = Logger.Level.INFO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;

import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.util.Converter;
import com.solace.messaging.util.InteroperabilitySupport;
Expand Down Expand Up @@ -129,4 +130,8 @@ public Map<String, String> getProperties() {
return msg.getProperties();
}

public String getPartitionKey() {
return msg.getProperties().get(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.solace.messaging.config.MissingResourcesCreationConfiguration.MissingResourcesCreationStrategy;
import com.solace.messaging.config.ReceiverActivationPassivationConfiguration;
import com.solace.messaging.config.ReplayStrategy;
import com.solace.messaging.receiver.DirectMessageReceiver;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
Expand Down Expand Up @@ -63,7 +62,6 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.gracefulShutdown = ic.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout();
DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build();
Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED };
if (ic.getConsumerQueueSupportsNacks()) {
outcomes = new Outcome[] { Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED };
Expand Down Expand Up @@ -193,13 +191,13 @@ public Flow.Publisher<? extends Message<?>> getStream() {
public void waitForUnAcknowledgedMessages() {
try {
receiver.pause();
SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged");
SolaceLogging.log.infof("Waiting for incoming channel %s messages to be acknowledged", channel);
if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to be acknowledged."));
SolaceLogging.log.infof("Timed out while waiting for the" +
" remaining messages to be acknowledged on channel %s.", channel);
}
} catch (InterruptedException e) {
SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged"));
SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel);
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -246,6 +244,7 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {

@Override
public void onStateChange(ReceiverState receiverState, ReceiverState receiverState1, long l) {

SolaceLogging.log.infof("Consumer state changed from %s to %s on channel %s", receiverState.name(),
receiverState1.name(), channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class SolaceOutboundMetadata {
private final Integer classOfService;
private final String dynamicDestination;

private final String partitionKey;

public static PubSubOutboundMetadataBuilder builder() {
return new PubSubOutboundMetadataBuilder();
}
Expand All @@ -27,7 +29,7 @@ public SolaceOutboundMetadata(Map<String, String> httpContentHeaders,
String applicationMessageType,
Long timeToLive,
String applicationMessageId,
Integer classOfService, String dynamicDestination) {
Integer classOfService, String dynamicDestination, String partitionKey) {
this.httpContentHeaders = httpContentHeaders;
this.expiration = expiration;
this.priority = priority;
Expand All @@ -38,6 +40,7 @@ public SolaceOutboundMetadata(Map<String, String> httpContentHeaders,
this.applicationMessageId = applicationMessageId;
this.classOfService = classOfService;
this.dynamicDestination = dynamicDestination;
this.partitionKey = partitionKey;
}

public Map<String, String> getHttpContentHeaders() {
Expand Down Expand Up @@ -80,6 +83,10 @@ public String getDynamicDestination() {
return dynamicDestination;
}

public String getPartitionKey() {
return partitionKey;
}

public static class PubSubOutboundMetadataBuilder {
private Map<String, String> httpContentHeaders;
private Long expiration;
Expand All @@ -92,6 +99,8 @@ public static class PubSubOutboundMetadataBuilder {
private Integer classOfService;
private String dynamicDestination;

private String partitionKey;

public PubSubOutboundMetadataBuilder setHttpContentHeaders(Map<String, String> httpContentHeader) {
this.httpContentHeaders = httpContentHeaders;
return this;
Expand Down Expand Up @@ -142,9 +151,14 @@ public PubSubOutboundMetadataBuilder setDynamicDestination(String dynamicDestina
return this;
}

public PubSubOutboundMetadataBuilder setPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
return this;
}

public SolaceOutboundMetadata createPubSubOutboundMetadata() {
return new SolaceOutboundMetadata(httpContentHeaders, expiration, priority, senderId, properties,
applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination);
applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination, partitionKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.solace.messaging.MessagingService;
import com.solace.messaging.PersistentMessagePublisherBuilder;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.publisher.PersistentMessagePublisher;
Expand Down Expand Up @@ -145,6 +146,10 @@ private Uni<PublishReceipt> publishMessage(PersistentMessagePublisher publisher,
if (metadata.getClassOfService() != null) {
msgBuilder.withClassOfService(metadata.getClassOfService());
}
if (metadata.getPartitionKey() != null) {
msgBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
metadata.getPartitionKey());
}

if (metadata.getDynamicDestination() != null) {
topic.set(Topic.of(metadata.getDynamicDestination()));
Expand Down Expand Up @@ -196,13 +201,13 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber() {

public void waitForPublishedMessages() {
try {
SolaceLogging.log.info("Waiting for outgoing messages to be published");
SolaceLogging.log.infof("Waiting for outgoing channel %s messages to be published", channel);
if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to get publish acknowledgment."));
SolaceLogging.log.infof("Timed out while waiting for the" +
" remaining messages to be acknowledged on channel %s.", channel);
}
} catch (InterruptedException e) {
SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged"));
SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.*;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -18,6 +17,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.config.SolaceProperties;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.publisher.OutboundMessageBuilder;
Expand All @@ -36,7 +36,7 @@

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("com.solace.quarkus");
private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("com.solace.quarkus.messaging");
private SolaceTestAppender solaceTestAppender = new SolaceTestAppender();

private SolaceConsumerTest() {
Expand Down Expand Up @@ -192,6 +192,75 @@ void consumerFailedProcessingMoveToDMQ() {

@Test
@Order(6)
void partitionedQueue() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-1.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-1.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-2.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-2.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-2.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-3.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-3.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-3.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-4.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-4.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-4.consumer.queue.type", "durable-non-exclusive");

// Run app that consumes messages
MyPartitionedQueueConsumer app = runApplication(config, MyPartitionedQueueConsumer.class);

CopyOnWriteArrayList<String> partitionKeys = new CopyOnWriteArrayList<>() {
{
add("Group-1");
add("Group-2");
add("Group-3");
add("Group-4");
}
};
Map<String, Integer> partitionMessages = new HashMap<>() {
{
put(partitionKeys.get(0), 0);
put(partitionKeys.get(1), 0);
put(partitionKeys.get(2), 0);
put(partitionKeys.get(3), 0);
}
};

Random random = new Random();
// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION);
for (int i = 0; i < 1000; i++) {
int partitionIndex = random.nextInt(4);
String partitionKey = partitionKeys.get(partitionIndex);
int count = partitionMessages.get(partitionKey);
partitionMessages.put(partitionKey, (count + 1));
OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
messageBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, partitionKey);
OutboundMessage outboundMessage = messageBuilder.build(Integer.toString(i));
publisher.publish(outboundMessage, tp);
}

// Assert on published and consumed messages
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(0)))
.isEqualTo(partitionMessages.get(partitionKeys.get(0))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(1)))
.isEqualTo(partitionMessages.get(partitionKeys.get(1))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(2)))
.isEqualTo(partitionMessages.get(partitionKeys.get(2))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(3)))
.isEqualTo(partitionMessages.get(partitionKeys.get(3))));
}

@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand Down Expand Up @@ -224,7 +293,7 @@ void consumerPublishToErrorTopicPermissionException() {
}

@Test
@Order(7)
@Order(8)
void consumerGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "in")
Expand Down Expand Up @@ -269,7 +338,7 @@ void consumerGracefulCloseTest() {
}

@Test
@Order(8)
@Order(9)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand Down Expand Up @@ -353,4 +422,51 @@ public List<String> getReceivedFailedMessages() {
return receivedFailedMessages;
}
}

@ApplicationScoped
static class MyPartitionedQueueConsumer {
Map<String, Integer> partitionMessages = new HashMap<>() {
{
put("Group-1", 0);
put("Group-2", 0);
put("Group-3", 0);
put("Group-4", 0);
}
};

@Incoming("consumer-1")
CompletionStage<Void> consumer1(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-2")
CompletionStage<Void> consumer2(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-3")
CompletionStage<Void> consumer3(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-4")
CompletionStage<Void> consumer4(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

private void updatePartitionMessages(SolaceInboundMessage<?> msg) {
String partitionKey = msg.getMessage()
.getProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY);
int count = partitionMessages.get(partitionKey);
partitionMessages.put(partitionKey, (count + 1));
}

public Map<String, Integer> getPartitionMessages() {
return partitionMessages;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void startSolaceBroker() {
.withExposedPorts(SolaceContainer.Service.SMF.getPort())
.withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/queue/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/>", SolaceContainer.Service.SMF);
solace.start();
LOGGER.info("Solace broker started: " + solace.getOrigin(SolaceContainer.Service.SMF));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class SolaceContainer extends GenericContainer<SolaceContainer> {
public static final String INTEGRATION_TEST_ERROR_QUEUE_NAME = "integration-test-error-queue";
public static final String INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION = "quarkus/integration/test/provisioned/queue/error/topic";

public static final String INTEGRATION_TEST_PARTITION_QUEUE_NAME = "integration-test-partition-queue";
public static final String INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION = "quarkus/integration/test/provisioned/partition/queue/topic";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("solace/solace-pubsub-standard");

private static final String DEFAULT_VPN = "default";
Expand Down Expand Up @@ -137,6 +140,20 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "exit");

// Partitioned Queue
updateConfigScript(scriptBuilder, "message-spool message-vpn default");
updateConfigScript(scriptBuilder, "create queue " + INTEGRATION_TEST_PARTITION_QUEUE_NAME);
updateConfigScript(scriptBuilder, "access-type non-exclusive");
updateConfigScript(scriptBuilder, "subscription topic " + INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION);
updateConfigScript(scriptBuilder, "max-spool-usage 300");
updateConfigScript(scriptBuilder, "permission all consume");
updateConfigScript(scriptBuilder, "partition");
updateConfigScript(scriptBuilder, "count 4");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "no shutdown");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "exit");

// Create VPN if not default
if (!vpn.equals(DEFAULT_VPN)) {
updateConfigScript(scriptBuilder, "create message-vpn " + vpn);
Expand Down

0 comments on commit 345151d

Please sign in to comment.