Skip to content

Commit

Permalink
Solace Direct Messaging capability
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jul 12, 2024
1 parent 6e4c350 commit c04db81
Show file tree
Hide file tree
Showing 13 changed files with 1,645 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import com.solace.messaging.MessagingService;
import com.solace.quarkus.messaging.incoming.SolaceDirectMessageIncomingChannel;
import com.solace.quarkus.messaging.incoming.SolaceIncomingChannel;
import com.solace.quarkus.messaging.outgoing.SolaceDirectMessageOutgoingChannel;
import com.solace.quarkus.messaging.outgoing.SolaceOutgoingChannel;

import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
Expand All @@ -34,25 +36,28 @@
@Connector(SolaceConnector.CONNECTOR_NAME)

// TODO only persisted is implemented
//@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted")
@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted")
@ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false")
@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true")
@ConnectorAttribute(name = "client.tracing-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to enable tracing for incoming and outgoing messages", defaultValue = "false")
@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")

@ConnectorAttribute(name = "client.type.direct.back-pressure.strategy", type = "string", direction = INCOMING, description = "It is possible for the client application to consume messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "elastic")
@ConnectorAttribute(name = "client.type.direct.back-pressure.buffer-capacity", type = "int", direction = INCOMING, description = "It is possible for the client application to consume messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "1024")
@ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver.")
@ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver. Supported values `durable-exclusive`, `durable-non-exclusive`, `non-durable-exclusive`", defaultValue = "durable-exclusive")
@ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create")
@ConnectorAttribute(name = "consumer.queue.add-additional-subscriptions", type = "boolean", direction = INCOMING, description = "Whether to add configured subscriptions to queue. Will fail if permissions to configure subscriptions is not allowed on broker", defaultValue = "false")
@ConnectorAttribute(name = "consumer.queue.subscriptions", type = "string", direction = INCOMING, description = "The comma separated list of subscriptions, the channel name if empty")
@ConnectorAttribute(name = "consumer.subscriptions", type = "string", direction = INCOMING, description = "The comma separated list of subscriptions, the channel name if empty")
@ConnectorAttribute(name = "consumer.queue.selector-query", type = "string", direction = INCOMING, description = "The receiver selector query")
@ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy. Supported values all-messages, time-based, replication-group-message-id")
@ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time")
@ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
@ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error")
@ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false")
@ConnectorAttribute(name = "consumer.queue.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.")
@ConnectorAttribute(name = "consumer.queue.error.message.max-delivery-attempts", type = "int", direction = INCOMING, description = "Maximum number of attempts to send a failed message to the error topic in case of failure. Each attempt will have a backoff interval of 1 second. When all delivery attempts have been exhausted, the failed message will be requeued on the queue for redelivery.", defaultValue = "3")
@ConnectorAttribute(name = "consumer.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
@ConnectorAttribute(name = "consumer.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error")
@ConnectorAttribute(name = "consumer.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false")
@ConnectorAttribute(name = "consumer.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.")
@ConnectorAttribute(name = "consumer.error.message.max-delivery-attempts", type = "int", direction = INCOMING, description = "Maximum number of attempts to send a failed message to the error topic in case of failure. Each attempt will have a backoff interval of 1 second. When all delivery attempts have been exhausted, the failed message will be requeued on the queue for redelivery.", defaultValue = "3")
@ConnectorAttribute(name = "consumer.queue.supports-nacks", type = "boolean", direction = INCOMING, description = "Whether to enable negative acknowledgments on failed messages. Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an exception is thrown", defaultValue = "false")

@ConnectorAttribute(name = "producer.topic", type = "string", direction = OUTGOING, description = "The topic to publish messages, by default the channel name")
Expand All @@ -75,12 +80,16 @@ public class SolaceConnector implements InboundConnector, OutboundConnector, Hea
Vertx vertx;

List<SolaceIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<SolaceDirectMessageIncomingChannel> directMessageIncomingChannels = new CopyOnWriteArrayList<>();
List<SolaceOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();
List<SolaceDirectMessageOutgoingChannel> directMessageOutgoingChannels = new CopyOnWriteArrayList<>();

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) {
incomingChannels.forEach(SolaceIncomingChannel::close);
directMessageIncomingChannels.forEach(SolaceDirectMessageIncomingChannel::close);
outgoingChannels.forEach(SolaceOutgoingChannel::close);
directMessageOutgoingChannels.forEach(SolaceDirectMessageOutgoingChannel::close);
}

@PostConstruct
Expand All @@ -91,17 +100,29 @@ void init() {
@Override
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
var ic = new SolaceConnectorIncomingConfiguration(config);
SolaceIncomingChannel channel = new SolaceIncomingChannel(vertx, ic, solace);
incomingChannels.add(channel);
return channel.getStream();
if (ic.getClientType().equals("direct")) {
SolaceDirectMessageIncomingChannel channel = new SolaceDirectMessageIncomingChannel(vertx, ic, solace);
directMessageIncomingChannels.add(channel);
return channel.getStream();
} else {
SolaceIncomingChannel channel = new SolaceIncomingChannel(vertx, ic, solace);
incomingChannels.add(channel);
return channel.getStream();
}
}

@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
var oc = new SolaceConnectorOutgoingConfiguration(config);
SolaceOutgoingChannel channel = new SolaceOutgoingChannel(vertx, oc, solace);
outgoingChannels.add(channel);
return channel.getSubscriber();
if (oc.getClientType().equals("direct")) {
SolaceDirectMessageOutgoingChannel channel = new SolaceDirectMessageOutgoingChannel(vertx, oc, solace);
directMessageOutgoingChannels.add(channel);
return channel.getSubscriber();
} else {
SolaceOutgoingChannel channel = new SolaceOutgoingChannel(vertx, oc, solace);
outgoingChannels.add(channel);
return channel.getSubscriber();
}
}

@Override
Expand All @@ -110,9 +131,15 @@ public HealthReport getStartup() {
for (SolaceIncomingChannel in : incomingChannels) {
in.isStarted(builder);
}
for (SolaceDirectMessageIncomingChannel in : directMessageIncomingChannels) {
in.isStarted(builder);
}
for (SolaceOutgoingChannel sink : outgoingChannels) {
sink.isStarted(builder);
}
for (SolaceDirectMessageOutgoingChannel sink : directMessageOutgoingChannels) {
sink.isStarted(builder);
}
return builder.build();
}

Expand All @@ -122,9 +149,15 @@ public HealthReport getReadiness() {
for (SolaceIncomingChannel in : incomingChannels) {
in.isReady(builder);
}
for (SolaceDirectMessageIncomingChannel in : directMessageIncomingChannels) {
in.isReady(builder);
}
for (SolaceOutgoingChannel sink : outgoingChannels) {
sink.isReady(builder);
}
for (SolaceDirectMessageOutgoingChannel sink : directMessageOutgoingChannels) {
sink.isReady(builder);
}
return builder.build();

}
Expand All @@ -135,9 +168,15 @@ public HealthReport getLiveness() {
for (SolaceIncomingChannel in : incomingChannels) {
in.isAlive(builder);
}
for (SolaceDirectMessageIncomingChannel in : directMessageIncomingChannels) {
in.isAlive(builder);
}
for (SolaceOutgoingChannel out : outgoingChannels) {
out.isAlive(builder);
}
for (SolaceDirectMessageOutgoingChannel out : directMessageOutgoingChannels) {
out.isAlive(builder);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reaso
SolaceLogging.log.messageSettled(channel,
MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(),
"Message is published to error topic and acknowledged on queue.");
ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
if (ackSupport != null) {
ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
}
})
.replaceWithVoid()
.onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reaso

SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage());
return Uni.createFrom().voidItem()
.invoke(() -> ackSupport.settle(msg.getMessage(), outcome))
.invoke(() -> {
if (ackSupport != null) {
ackSupport.settle(msg.getMessage(), outcome);
}
})
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
Expand Down
Loading

0 comments on commit c04db81

Please sign in to comment.