From 4f4e99085d1ab94b0e598cccc6ff75270c523af3 Mon Sep 17 00:00:00 2001 From: Umar Hussain Date: Sun, 27 Aug 2023 20:19:08 +0200 Subject: [PATCH] NIFI-11995 Added Header Format configuration to ConsumeAMQP This closes #7652 Signed-off-by: David Handermann --- .../nifi-amqp-processors/pom.xml | 8 ++ .../nifi/amqp/processors/ConsumeAMQP.java | 115 ++++++++++++++---- .../nifi/amqp/processors/ConsumeAMQPTest.java | 100 ++++++++++++--- 3 files changed, 184 insertions(+), 39 deletions(-) diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml index 5d241f62647d..bf6f7b40d47f 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml @@ -45,6 +45,14 @@ language governing permissions and limitations under the License. --> org.apache.commons commons-lang3 + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java index 87cefc7922da..23552d64306c 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.amqp.processors; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; @@ -26,6 +28,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -39,7 +42,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,7 +55,9 @@ @WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"), @WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"), @WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"), - @WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message"), + @WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."), + @WritesAttribute(attribute = "
.", + description = "Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute"), @WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"), @WritesAttribute(attribute = "amqp$priority", description = "The Message priority"), @WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"), @@ -68,7 +72,16 @@ @WritesAttribute(attribute = "amqp$exchange", description = "The exchange from which AMQP Message was received") }) public class ConsumeAMQP extends AbstractAMQPProcessor { + private static final String ATTRIBUTES_PREFIX = "amqp$"; + public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp"; + + public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = new AllowableValue("Comma-Separated String", "Comma-Separated String", + "Put all headers as a string with the specified separator in the attribute 'amqp$headers'."); + public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new AllowableValue("JSON String", "JSON String", + "Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well."); + public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new AllowableValue("FlowFile Attributes", "FlowFile Attributes", + "Put each header as attribute of the flow file with a prefix specified in the properties"); public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() .name("Queue") @@ -76,7 +89,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder() .name("auto.acknowledge") .displayName("Auto-Acknowledge Messages") .description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing " @@ -99,15 +112,35 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .defaultValue("10") .required(true) .build(); + + public static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder() + .name("header.format") + .displayName("Header Output Format") + .description("Defines how to output headers from the received message") + .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES) + .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue()) + .required(true) + .build(); + public static final PropertyDescriptor HEADER_KEY_PREFIX = new PropertyDescriptor.Builder() + .name("header.key.prefix") + .displayName("Header Key Prefix") + .description("Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property") + .defaultValue(DEFAULT_HEADERS_KEY_PREFIX) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES) + .required(true) + .build(); + public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder() - .name("header.separator") - .displayName("Header Separator") - .description("The character that is used to separate key-value for header in String. The value must only one character." - + "Otherwise you will get an error message") - .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR) - .defaultValue(",") - .required(false) - .build(); + .name("header.separator") + .displayName("Header Separator") + .description("The character that is used to separate key-value for header in String. The value must be only one character." + ) + .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR) + .defaultValue(",") + .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING) + .required(false) + .build(); static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder() .name("remove.curly.braces") .displayName("Remove Curly Braces") @@ -115,6 +148,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .defaultValue("False") .allowableValues("True", "False") + .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING) .required(false) .build(); @@ -126,19 +160,23 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { private static final List propertyDescriptors; private static final Set relationships; + private static final ObjectMapper objectMapper; + static { List properties = new ArrayList<>(); properties.add(QUEUE); properties.add(AUTO_ACKNOWLEDGE); properties.add(BATCH_SIZE); - properties.add(REMOVE_CURLY_BRACES); + properties.add(HEADER_FORMAT); + properties.add(HEADER_KEY_PREFIX); properties.add(HEADER_SEPARATOR); + properties.add(REMOVE_CURLY_BRACES); properties.addAll(getCommonPropertyDescriptors()); propertyDescriptors = Collections.unmodifiableList(properties); - Set rels = new HashSet<>(); - rels.add(REL_SUCCESS); - relationships = Collections.unmodifiableSet(rels); + relationships = Set.of(REL_SUCCESS); + + objectMapper = new ObjectMapper(); } /** @@ -170,8 +208,10 @@ protected void processResource(final Connection connection, final AMQPConsumer c final BasicProperties amqpProperties = response.getProps(); final Envelope envelope = response.getEnvelope(); - final Map attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), - context.getProperty(HEADER_SEPARATOR).toString()); + final String headerFormat = context.getProperty(HEADER_FORMAT).getValue(); + final String headerKeyPrefix = context.getProperty(HEADER_KEY_PREFIX).getValue(); + final Map attributes = buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix, + context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), context.getProperty(HEADER_SEPARATOR).toString()); flowFile = session.putAllAttributes(flowFile, attributes); session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue()); @@ -185,12 +225,13 @@ protected void processResource(final Connection connection, final AMQPConsumer c } } - private Map buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, String valueSeperatorForHeaders) { + private Map buildAttributes(final BasicProperties properties, final Envelope envelope, String headersStringFormat, String headerAttributePrefix, boolean removeCurlyBraces, + String valueSeparatorForHeaders) { + AllowableValue headerFormat = new AllowableValue(headersStringFormat); final Map attributes = new HashMap<>(); addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId()); addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding()); addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType()); - addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces, valueSeperatorForHeaders)); addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode()); addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority()); addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId()); @@ -201,8 +242,19 @@ private Map buildAttributes(final BasicProperties properties, fi addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType()); addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId()); addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId()); - addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey()); - addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey()); + addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange()); + Map headers = properties.getHeaders(); + if (headers != null) { + if (HEADERS_FORMAT_ATTRIBUTES.equals(headerFormat)) { + headers.forEach((key, value) -> addAttribute(attributes, + String.format("%s.%s", headerAttributePrefix, key), value)); + } else { + addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", + buildHeaders(properties.getHeaders(), headerFormat, removeCurlyBraces, + valueSeparatorForHeaders)); + } + } return attributes; } @@ -214,14 +266,23 @@ private void addAttribute(final Map attributes, final String att attributes.put(attributeName, value.toString()); } - private String buildHeaders(Map headers, boolean removeCurlyBraces, String valueSeparatorForHeaders) { + private String buildHeaders(Map headers, AllowableValue headerFormat, boolean removeCurlyBraces, String valueSeparatorForHeaders) { if (headers == null) { return null; } - String headerString = convertMapToString(headers,valueSeparatorForHeaders); + String headerString = null; + if (headerFormat.equals(HEADERS_FORMAT_COMMA_SEPARATED_STRING)) { + headerString = convertMapToString(headers, valueSeparatorForHeaders); - if (!removeCurlyBraces) { - headerString = "{" + headerString + "}"; + if (!removeCurlyBraces) { + headerString = "{" + headerString + "}"; + } + } else if (headerFormat.equals(HEADERS_FORMAT_JSON_STRING)) { + try { + headerString = convertMapToJSONString(headers); + } catch (JsonProcessingException e) { + getLogger().warn("Header formatting as JSON failed", e); + } } return headerString; } @@ -231,6 +292,10 @@ private static String convertMapToString(Map headers, String val .collect(Collectors.joining(valueSeparatorForHeaders)); } + private static String convertMapToJSONString(Map headers) throws JsonProcessingException { + return objectMapper.writeValueAsString(headers); + } + @Override protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) { try { diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index c382e730ddd4..0655caaa217d 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.amqp.processors; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; @@ -61,12 +63,12 @@ public void testMessageAcked() throws TimeoutException, IOException { runner.run(); - runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2); + runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 2); - final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); helloFF.assertContentEquals("hello"); - final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1); + final MockFlowFile worldFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(1); worldFF.assertContentEquals("world"); // A single cumulative ack should be used @@ -92,12 +94,12 @@ public void testBatchSizeAffectsAcks() throws TimeoutException, IOException { runner.run(2); - runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2); + runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 2); - final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); helloFF.assertContentEquals("hello"); - final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1); + final MockFlowFile worldFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(1); worldFF.assertContentEquals("world"); // A single cumulative ack should be used @@ -125,9 +127,9 @@ public void testConsumerStopped() throws TimeoutException, IOException { runner.run(); proc.close(); - runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1); + runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 1); - final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); helloFF.assertContentEquals("hello"); @@ -156,13 +158,83 @@ public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { TestRunner runner = initTestRunner(proc); runner.run(); - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); } } + @Test + public void validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess() throws Exception { + final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map headersMap = new HashMap<>(); + headersMap.put("foo1", "bar,bar"); + headersMap.put("foo2", "bar,bar"); + headersMap.put("foo3", "null"); + headersMap.put("foo4", null); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode expectedJson = objectMapper.valueToTree(headersMap); + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_JSON_STRING); + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + String headers = successFF.getAttribute("amqp$headers"); + JsonNode jsonNode = objectMapper.readTree(headers); + assertEquals(expectedJson, jsonNode); + } + } + + @Test + public void validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAndTransferToSuccess() throws Exception { + final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + final Map expectedHeadersMap = new HashMap<>(); + expectedHeadersMap.put("foo1", "bar,bar"); + expectedHeadersMap.put("foo2", "bar,bar"); + expectedHeadersMap.put("foo3", "null"); + final Map headersMap = new HashMap<>(expectedHeadersMap); + headersMap.put("foo4", null); + + final String headerPrefix = "test.header"; + + AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); + builderBasicProperties.headers(headersMap); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = initTestRunner(proc); + runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_ATTRIBUTES); + runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX,headerPrefix); + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeEquals("amqp$routingKey", "key1"); + successFF.assertAttributeEquals("amqp$exchange", "myExchange"); + successFF.assertAttributeNotExists("amqp$headers"); + expectedHeadersMap.forEach((key, value) ->{ + successFF.assertAttributeEquals(headerPrefix + "." + key, value.toString()); + } ); + } + } @Test public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception { final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); @@ -186,7 +258,7 @@ public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransfer TestRunner runner = initTestRunner(proc); runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|"); runner.run(); - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); @@ -223,7 +295,7 @@ public void validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSucc TestRunner runner = initTestRunner(proc); runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True"); runner.run(); - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); @@ -255,7 +327,7 @@ public void validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParamet runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|"); runner.run(); - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); @@ -288,7 +360,7 @@ public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() throws E TestRunner runner = initTestRunner(proc); runner.run(); - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); @@ -321,7 +393,7 @@ protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection conne throw new IllegalStateException("Consumer already created"); } - consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(), getLogger()); + consumer = new AMQPConsumer(connection, context.getProperty(ConsumeAMQP.QUEUE).getValue(), context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), getLogger()); return consumer; } catch (IOException e) { throw new ProcessException(e);