Skip to content

Commit

Permalink
NIFI-11995 Added Header Format configuration to ConsumeAMQP
Browse files Browse the repository at this point in the history
This closes #7652

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
umarhussain15 authored and exceptionfactory committed Nov 10, 2023
1 parent eb7d49c commit 4f4e990
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.nifi.amqp.processors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
Expand All @@ -26,6 +28,7 @@
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -39,7 +42,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -53,7 +55,9 @@
@WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message"),
@WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."),
@WritesAttribute(attribute = "<Header Key Prefix>.<attribute>",
description = "Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute"),
@WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = "amqp$priority", description = "The Message priority"),
@WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
Expand All @@ -68,15 +72,24 @@
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {

private static final String ATTRIBUTES_PREFIX = "amqp$";
public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";

public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = new AllowableValue("Comma-Separated String", "Comma-Separated String",
"Put all headers as a string with the specified separator in the attribute 'amqp$headers'.");
public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new AllowableValue("JSON String", "JSON String",
"Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well.");
public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new AllowableValue("FlowFile Attributes", "FlowFile Attributes",
"Put each header as attribute of the flow file with a prefix specified in the properties");

public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("Queue")
.description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
.name("auto.acknowledge")
.displayName("Auto-Acknowledge Messages")
.description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing "
Expand All @@ -99,22 +112,43 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.defaultValue("10")
.required(true)
.build();

public static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder()
.name("header.format")
.displayName("Header Output Format")
.description("Defines how to output headers from the received message")
.allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
.defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
.required(true)
.build();
public static final PropertyDescriptor HEADER_KEY_PREFIX = new PropertyDescriptor.Builder()
.name("header.key.prefix")
.displayName("Header Key Prefix")
.description("Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property")
.defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
.required(true)
.build();

public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
.name("header.separator")
.displayName("Header Separator")
.description("The character that is used to separate key-value for header in String. The value must only one character."
+ "Otherwise you will get an error message")
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.defaultValue(",")
.required(false)
.build();
.name("header.separator")
.displayName("Header Separator")
.description("The character that is used to separate key-value for header in String. The value must be only one character."
)
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.defaultValue(",")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.required(false)
.build();
static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder()
.name("remove.curly.braces")
.displayName("Remove Curly Braces")
.description("If true Remove Curly Braces, Curly Braces in the header will be automatically remove.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("False")
.allowableValues("True", "False")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.required(false)
.build();

Expand All @@ -126,19 +160,23 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
private static final List<PropertyDescriptor> propertyDescriptors;
private static final Set<Relationship> relationships;

private static final ObjectMapper objectMapper;

static {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUEUE);
properties.add(AUTO_ACKNOWLEDGE);
properties.add(BATCH_SIZE);
properties.add(REMOVE_CURLY_BRACES);
properties.add(HEADER_FORMAT);
properties.add(HEADER_KEY_PREFIX);
properties.add(HEADER_SEPARATOR);
properties.add(REMOVE_CURLY_BRACES);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);

Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(rels);
relationships = Set.of(REL_SUCCESS);

objectMapper = new ObjectMapper();
}

/**
Expand Down Expand Up @@ -170,8 +208,10 @@ protected void processResource(final Connection connection, final AMQPConsumer c

final BasicProperties amqpProperties = response.getProps();
final Envelope envelope = response.getEnvelope();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
context.getProperty(HEADER_SEPARATOR).toString());
final String headerFormat = context.getProperty(HEADER_FORMAT).getValue();
final String headerKeyPrefix = context.getProperty(HEADER_KEY_PREFIX).getValue();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix,
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), context.getProperty(HEADER_SEPARATOR).toString());
flowFile = session.putAllAttributes(flowFile, attributes);

session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
Expand All @@ -185,12 +225,13 @@ protected void processResource(final Connection connection, final AMQPConsumer c
}
}

private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, String valueSeperatorForHeaders) {
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, String headersStringFormat, String headerAttributePrefix, boolean removeCurlyBraces,
String valueSeparatorForHeaders) {
AllowableValue headerFormat = new AllowableValue(headersStringFormat);
final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces, valueSeperatorForHeaders));
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
Expand All @@ -201,8 +242,19 @@ private Map<String, String> buildAttributes(final BasicProperties properties, fi
addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (HEADERS_FORMAT_ATTRIBUTES.equals(headerFormat)) {
headers.forEach((key, value) -> addAttribute(attributes,
String.format("%s.%s", headerAttributePrefix, key), value));
} else {
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers",
buildHeaders(properties.getHeaders(), headerFormat, removeCurlyBraces,
valueSeparatorForHeaders));
}
}
return attributes;
}

Expand All @@ -214,14 +266,23 @@ private void addAttribute(final Map<String, String> attributes, final String att
attributes.put(attributeName, value.toString());
}

private String buildHeaders(Map<String, Object> headers, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
private String buildHeaders(Map<String, Object> headers, AllowableValue headerFormat, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
if (headers == null) {
return null;
}
String headerString = convertMapToString(headers,valueSeparatorForHeaders);
String headerString = null;
if (headerFormat.equals(HEADERS_FORMAT_COMMA_SEPARATED_STRING)) {
headerString = convertMapToString(headers, valueSeparatorForHeaders);

if (!removeCurlyBraces) {
headerString = "{" + headerString + "}";
if (!removeCurlyBraces) {
headerString = "{" + headerString + "}";
}
} else if (headerFormat.equals(HEADERS_FORMAT_JSON_STRING)) {
try {
headerString = convertMapToJSONString(headers);
} catch (JsonProcessingException e) {
getLogger().warn("Header formatting as JSON failed", e);
}
}
return headerString;
}
Expand All @@ -231,6 +292,10 @@ private static String convertMapToString(Map<String, Object> headers, String val
.collect(Collectors.joining(valueSeparatorForHeaders));
}

private static String convertMapToJSONString(Map<String, Object> headers) throws JsonProcessingException {
return objectMapper.writeValueAsString(headers);
}

@Override
protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
try {
Expand Down
Loading

0 comments on commit 4f4e990

Please sign in to comment.