Skip to content

Commit

Permalink
NIFI-12411 Update PublishAMQP with configurable Header Source Property
Browse files Browse the repository at this point in the history
This closes #8105

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
umarhussain15 authored and exceptionfactory committed Aug 16, 2024
1 parent 7348740 commit abe41ff
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* Generic publisher of messages to AMQP-based messaging system. It is based on
* RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
* RabbitMQ client API (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>)
*/
final class AMQPPublisher extends AMQPWorker {

Expand Down Expand Up @@ -63,7 +63,7 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String
exchange = exchange == null ? "" : exchange.trim();

if (processorLog.isDebugEnabled()) {
if (exchange.length() == 0) {
if (exchange.isEmpty()) {
processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -49,14 +48,28 @@

/**
* Base processor that uses RabbitMQ client API
* (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
* (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>) to rendezvous with AMQP-based
* messaging systems version 0.9.1
*
* @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
* and {@link AMQPConsumer}
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {

public static final String AMQP_APPID_ATTRIBUTE = "amqp$appId";
public static final String AMQP_CONTENT_ENCODING_ATTRIBUTE = "amqp$contentEncoding";
public static final String AMQP_CONTENT_TYPE_ATTRIBUTE = "amqp$contentType";
public static final String AMQP_HEADERS_ATTRIBUTE = "amqp$headers";
public static final String AMQP_DELIVERY_MODE_ATTRIBUTE = "amqp$deliveryMode";
public static final String AMQP_PRIORITY_ATTRIBUTE = "amqp$priority";
public static final String AMQP_CORRELATION_ID_ATTRIBUTE = "amqp$correlationId";
public static final String AMQP_REPLY_TO_ATTRIBUTE = "amqp$replyTo";
public static final String AMQP_EXPIRATION_ATTRIBUTE = "amqp$expiration";
public static final String AMQP_MESSAGE_ID_ATTRIBUTE = "amqp$messageId";
public static final String AMQP_TIMESTAMP_ATTRIBUTE = "amqp$timestamp";
public static final String AMQP_TYPE_ATTRIBUTE = "amqp$type";
public static final String AMQP_USER_ID_ATTRIBUTE = "amqp$userId";
public static final String AMQP_CLUSTER_ID_ATTRIBUTE = "amqp$clusterId";
public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder()
.name("Brokers")
.description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is " +
Expand Down Expand Up @@ -129,17 +142,15 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
private static final List<PropertyDescriptor> propertyDescriptors;

static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(BROKERS);
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
properties.add(USER);
properties.add(PASSWORD);
properties.add(AMQP_VERSION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(USE_CERT_AUTHENTICATION);
propertyDescriptors = Collections.unmodifiableList(properties);
propertyDescriptors = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION);
}

protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
Expand Down
Loading

0 comments on commit abe41ff

Please sign in to comment.