diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
index 5d241f62647d..bf6f7b40d47f 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
@@ -45,6 +45,14 @@ language governing permissions and limitations under the License. -->
org.apache.commons
commons-lang3
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
org.apache.nifi
nifi-mock
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 87cefc7922da..23552d64306c 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.amqp.processors;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
@@ -26,6 +28,7 @@
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -39,7 +42,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -53,7 +55,9 @@
@WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"),
- @WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message"),
+ @WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."),
+ @WritesAttribute(attribute = ".",
+ description = "Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute"),
@WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = "amqp$priority", description = "The Message priority"),
@WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
@@ -68,7 +72,16 @@
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor {
+
private static final String ATTRIBUTES_PREFIX = "amqp$";
+ public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
+
+ public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = new AllowableValue("Comma-Separated String", "Comma-Separated String",
+ "Put all headers as a string with the specified separator in the attribute 'amqp$headers'.");
+ public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new AllowableValue("JSON String", "JSON String",
+ "Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well.");
+ public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new AllowableValue("FlowFile Attributes", "FlowFile Attributes",
+ "Put each header as attribute of the flow file with a prefix specified in the properties");
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("Queue")
@@ -76,7 +89,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor {
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
.name("auto.acknowledge")
.displayName("Auto-Acknowledge Messages")
.description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing "
@@ -99,15 +112,35 @@ public class ConsumeAMQP extends AbstractAMQPProcessor {
.defaultValue("10")
.required(true)
.build();
+
+ public static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder()
+ .name("header.format")
+ .displayName("Header Output Format")
+ .description("Defines how to output headers from the received message")
+ .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
+ .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
+ .required(true)
+ .build();
+ public static final PropertyDescriptor HEADER_KEY_PREFIX = new PropertyDescriptor.Builder()
+ .name("header.key.prefix")
+ .displayName("Header Key Prefix")
+ .description("Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property")
+ .defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
- .name("header.separator")
- .displayName("Header Separator")
- .description("The character that is used to separate key-value for header in String. The value must only one character."
- + "Otherwise you will get an error message")
- .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
- .defaultValue(",")
- .required(false)
- .build();
+ .name("header.separator")
+ .displayName("Header Separator")
+ .description("The character that is used to separate key-value for header in String. The value must be only one character."
+ )
+ .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
+ .defaultValue(",")
+ .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
+ .required(false)
+ .build();
static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder()
.name("remove.curly.braces")
.displayName("Remove Curly Braces")
@@ -115,6 +148,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("False")
.allowableValues("True", "False")
+ .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.required(false)
.build();
@@ -126,19 +160,23 @@ public class ConsumeAMQP extends AbstractAMQPProcessor {
private static final List propertyDescriptors;
private static final Set relationships;
+ private static final ObjectMapper objectMapper;
+
static {
List properties = new ArrayList<>();
properties.add(QUEUE);
properties.add(AUTO_ACKNOWLEDGE);
properties.add(BATCH_SIZE);
- properties.add(REMOVE_CURLY_BRACES);
+ properties.add(HEADER_FORMAT);
+ properties.add(HEADER_KEY_PREFIX);
properties.add(HEADER_SEPARATOR);
+ properties.add(REMOVE_CURLY_BRACES);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);
- Set rels = new HashSet<>();
- rels.add(REL_SUCCESS);
- relationships = Collections.unmodifiableSet(rels);
+ relationships = Set.of(REL_SUCCESS);
+
+ objectMapper = new ObjectMapper();
}
/**
@@ -170,8 +208,10 @@ protected void processResource(final Connection connection, final AMQPConsumer c
final BasicProperties amqpProperties = response.getProps();
final Envelope envelope = response.getEnvelope();
- final Map attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
- context.getProperty(HEADER_SEPARATOR).toString());
+ final String headerFormat = context.getProperty(HEADER_FORMAT).getValue();
+ final String headerKeyPrefix = context.getProperty(HEADER_KEY_PREFIX).getValue();
+ final Map attributes = buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix,
+ context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), context.getProperty(HEADER_SEPARATOR).toString());
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@@ -185,12 +225,13 @@ protected void processResource(final Connection connection, final AMQPConsumer c
}
}
- private Map buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, String valueSeperatorForHeaders) {
+ private Map buildAttributes(final BasicProperties properties, final Envelope envelope, String headersStringFormat, String headerAttributePrefix, boolean removeCurlyBraces,
+ String valueSeparatorForHeaders) {
+ AllowableValue headerFormat = new AllowableValue(headersStringFormat);
final Map attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
- addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces, valueSeperatorForHeaders));
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
@@ -201,8 +242,19 @@ private Map buildAttributes(final BasicProperties properties, fi
addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId());
- addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
- addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
+ addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
+ addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
+ Map headers = properties.getHeaders();
+ if (headers != null) {
+ if (HEADERS_FORMAT_ATTRIBUTES.equals(headerFormat)) {
+ headers.forEach((key, value) -> addAttribute(attributes,
+ String.format("%s.%s", headerAttributePrefix, key), value));
+ } else {
+ addAttribute(attributes, ATTRIBUTES_PREFIX + "headers",
+ buildHeaders(properties.getHeaders(), headerFormat, removeCurlyBraces,
+ valueSeparatorForHeaders));
+ }
+ }
return attributes;
}
@@ -214,14 +266,23 @@ private void addAttribute(final Map attributes, final String att
attributes.put(attributeName, value.toString());
}
- private String buildHeaders(Map headers, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
+ private String buildHeaders(Map headers, AllowableValue headerFormat, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
if (headers == null) {
return null;
}
- String headerString = convertMapToString(headers,valueSeparatorForHeaders);
+ String headerString = null;
+ if (headerFormat.equals(HEADERS_FORMAT_COMMA_SEPARATED_STRING)) {
+ headerString = convertMapToString(headers, valueSeparatorForHeaders);
- if (!removeCurlyBraces) {
- headerString = "{" + headerString + "}";
+ if (!removeCurlyBraces) {
+ headerString = "{" + headerString + "}";
+ }
+ } else if (headerFormat.equals(HEADERS_FORMAT_JSON_STRING)) {
+ try {
+ headerString = convertMapToJSONString(headers);
+ } catch (JsonProcessingException e) {
+ getLogger().warn("Header formatting as JSON failed", e);
+ }
}
return headerString;
}
@@ -231,6 +292,10 @@ private static String convertMapToString(Map headers, String val
.collect(Collectors.joining(valueSeparatorForHeaders));
}
+ private static String convertMapToJSONString(Map headers) throws JsonProcessingException {
+ return objectMapper.writeValueAsString(headers);
+ }
+
@Override
protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
try {
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index c382e730ddd4..0655caaa217d 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.amqp.processors;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
@@ -61,12 +63,12 @@ public void testMessageAcked() throws TimeoutException, IOException {
runner.run();
- runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
+ runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 2);
- final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
helloFF.assertContentEquals("hello");
- final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
+ final MockFlowFile worldFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(1);
worldFF.assertContentEquals("world");
// A single cumulative ack should be used
@@ -92,12 +94,12 @@ public void testBatchSizeAffectsAcks() throws TimeoutException, IOException {
runner.run(2);
- runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
+ runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 2);
- final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
helloFF.assertContentEquals("hello");
- final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
+ final MockFlowFile worldFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(1);
worldFF.assertContentEquals("world");
// A single cumulative ack should be used
@@ -125,9 +127,9 @@ public void testConsumerStopped() throws TimeoutException, IOException {
runner.run();
proc.close();
- runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1);
+ runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 1);
- final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
helloFF.assertContentEquals("hello");
@@ -156,13 +158,83 @@ public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
}
}
+ @Test
+ public void validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess() throws Exception {
+ final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+ final Map headersMap = new HashMap<>();
+ headersMap.put("foo1", "bar,bar");
+ headersMap.put("foo2", "bar,bar");
+ headersMap.put("foo3", "null");
+ headersMap.put("foo4", null);
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode expectedJson = objectMapper.valueToTree(headersMap);
+
+ AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
+ builderBasicProperties.headers(headersMap);
+
+ final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+
+ try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
+
+ ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+ TestRunner runner = initTestRunner(proc);
+ runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
+ runner.run();
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+ successFF.assertAttributeEquals("amqp$routingKey", "key1");
+ successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+ String headers = successFF.getAttribute("amqp$headers");
+ JsonNode jsonNode = objectMapper.readTree(headers);
+ assertEquals(expectedJson, jsonNode);
+ }
+ }
+
+ @Test
+ public void validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAndTransferToSuccess() throws Exception {
+ final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+ final Map expectedHeadersMap = new HashMap<>();
+ expectedHeadersMap.put("foo1", "bar,bar");
+ expectedHeadersMap.put("foo2", "bar,bar");
+ expectedHeadersMap.put("foo3", "null");
+ final Map headersMap = new HashMap<>(expectedHeadersMap);
+ headersMap.put("foo4", null);
+
+ final String headerPrefix = "test.header";
+
+ AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
+ builderBasicProperties.headers(headersMap);
+
+ final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+
+ try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
+
+ ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+ TestRunner runner = initTestRunner(proc);
+ runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_ATTRIBUTES);
+ runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX,headerPrefix);
+ runner.run();
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+ successFF.assertAttributeEquals("amqp$routingKey", "key1");
+ successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+ successFF.assertAttributeNotExists("amqp$headers");
+ expectedHeadersMap.forEach((key, value) ->{
+ successFF.assertAttributeEquals(headerPrefix + "." + key, value.toString());
+ } );
+ }
+ }
@Test
public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception {
final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
@@ -186,7 +258,7 @@ public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransfer
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
runner.run();
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@@ -223,7 +295,7 @@ public void validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSucc
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
runner.run();
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@@ -255,7 +327,7 @@ public void validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParamet
runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|");
runner.run();
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@@ -288,7 +360,7 @@ public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() throws E
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@@ -321,7 +393,7 @@ protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection conne
throw new IllegalStateException("Consumer already created");
}
- consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
+ consumer = new AMQPConsumer(connection, context.getProperty(ConsumeAMQP.QUEUE).getValue(), context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
return consumer;
} catch (IOException e) {
throw new ProcessException(e);