From cf3667444bce4ff9da8c9097d7c7a2fa2f767386 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 6 Mar 2024 17:59:05 -0500 Subject: [PATCH] PROTON-2802 Make scripting message payload of a transfer simpler Provide a simpler API for scripting the expected message payload that will accompany an incoming transfer and provide some updates to match that API on the remote inject of transfer with message payloads. --- .../protonj2/client/impl/MessageSendTest.java | 21 +- .../driver/actions/TransferInjectAction.java | 52 +- .../expectations/TransferExpectation.java | 5 + .../AbstractMessageSectionMatcher.java | 10 +- .../transport/TransferMessageMatcher.java | 811 ++++++++++++++++++ .../TransferPayloadCompositeMatcher.java | 2 +- .../types/EncodedAmqpTypeMatcher.java | 16 +- .../test/driver/SenderHandlingTest.java | 366 ++++++++ 8 files changed, 1264 insertions(+), 19 deletions(-) create mode 100644 protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java index 319597c35..3746ea087 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java @@ -95,19 +95,16 @@ public void testSendMessageWithHeaderValuesPopulated() throws Exception { // Gates send on remote flow having been sent and received session.openReceiver("dummy").openFuture().get(); - HeaderMatcher headerMatcher = new HeaderMatcher(true); - headerMatcher.withDurable(true); - headerMatcher.withPriority((byte) 1); - headerMatcher.withTtl(65535); - headerMatcher.withFirstAcquirer(true); - headerMatcher.withDeliveryCount(2); - EncodedAmqpValueMatcher bodyMatcher = new EncodedAmqpValueMatcher("Hello World"); - TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher(); - payloadMatcher.setHeadersMatcher(headerMatcher); - payloadMatcher.setMessageContentMatcher(bodyMatcher); - peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.expectTransfer().withMessageFormat(0).withPayload(payloadMatcher).accept(); + peer.expectTransfer().withMessage().withMessageFormat(0) + .withHeader() + .withDurability(true) + .withPriority((byte) 1) + .withTimeToLive(65535) + .withFirstAcquirer(true) + .withDeliveryCount(2) + .also() + .withValue("Hello World"); peer.expectDetach().respond(); peer.expectClose().respond(); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java index dd2c6d013..e434a486f 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java @@ -257,6 +257,10 @@ public FooterBuilder withFooter() { return new FooterBuilder(); } + public MessageBuilder withMessage() { + return new MessageBuilder(); + } + private Header getOrCreateHeader() { if (header == null) { header = new Header(); @@ -479,6 +483,11 @@ public PropertiesBuilder withReplyToGroupId(String value) { public final class ApplicationPropertiesBuilder extends SectionBuilder { + public ApplicationPropertiesBuilder withProperty(String key, Object value) { + getOrCreateApplicationProperties().setApplicationProperty(key, value); + return this; + } + public ApplicationPropertiesBuilder withApplicationProperty(String key, Object value) { getOrCreateApplicationProperties().setApplicationProperty(key, value); return this; @@ -540,7 +549,12 @@ public BodySectionBuilder withDescribed(DescribedType described) { public final class FooterBuilder extends SectionBuilder { - public FooterBuilder withFooter(Object key, Object value) { + public FooterBuilder withFooter(String key, Object value) { + getOrCreateFooter().setFooterProperty(Symbol.valueOf(key), value); + return this; + } + + public FooterBuilder withFooter(Symbol key, Object value) { getOrCreateFooter().setFooterProperty(key, value); return this; } @@ -666,6 +680,42 @@ public TransactionalStateBuilder withModified(boolean failed, boolean undelivera } } + public final class MessageBuilder extends SectionBuilder { + + public MessageBuilder withMessageFormat(int format) { + TransferInjectAction.this.withMessageFormat(format); + return this; + } + + public HeaderBuilder withHeader() { + return TransferInjectAction.this.withHeader(); + } + + public DeliveryAnnotationsBuilder withDeliveryAnnotations() { + return TransferInjectAction.this.withDeliveryAnnotations(); + } + + public MessageAnnotationsBuilder withMessageAnnotations() { + return TransferInjectAction.this.withMessageAnnotations(); + } + + public PropertiesBuilder withProperties() { + return TransferInjectAction.this.withProperties(); + } + + public ApplicationPropertiesBuilder withApplicationProperties() { + return TransferInjectAction.this.withApplicationProperties(); + } + + public BodySectionBuilder withBody() { + return TransferInjectAction.this.withBody(); + } + + public FooterBuilder withFooter() { + return TransferInjectAction.this.withFooter(); + } + } + private static byte[] generateUniqueDeliveryTag() { final byte[] tag = new byte[Long.BYTES + Long.BYTES]; final UUID uuid = UUID.randomUUID(); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java index 6476a2ac2..0a0398f05 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java @@ -44,6 +44,7 @@ import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer; import org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher; import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMessageMatcher; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -285,6 +286,10 @@ public TransferExpectation withPayload(byte[] buffer) { return this; } + public TransferMessageMatcher withMessage() { + return (TransferMessageMatcher) (this.payloadMatcher = new TransferMessageMatcher(this)); + } + //----- Matcher based with methods for more complex validation public TransferExpectation withHandle(Matcher m) { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java index ac1746859..9fb760136 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java @@ -41,7 +41,7 @@ public abstract class AbstractMessageSectionMatcher> fieldMatchers; private Map receivedFields; - private final boolean allowTrailingBytes; + private boolean allowTrailingBytes; protected AbstractMessageSectionMatcher(UnsignedLong numericDescriptor, Symbol symbolicDescriptor, Map> fieldMatchers, boolean expectTrailingBytes) { this.numericDescriptor = numericDescriptor; @@ -50,6 +50,14 @@ protected AbstractMessageSectionMatcher(UnsignedLong numericDescriptor, Symbol s this.allowTrailingBytes = expectTrailingBytes; } + public void setAllowTrailingBytes(boolean allowTrailingBytes) { + this.allowTrailingBytes = allowTrailingBytes; + } + + public boolean isAllowTrailngBytes() { + return allowTrailingBytes; + } + protected Map> getMatchers() { return fieldMatchers; } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java new file mode 100644 index 000000000..25cb5a809 --- /dev/null +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java @@ -0,0 +1,811 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.protonj2.test.driver.matchers.transport; + +import static org.hamcrest.CoreMatchers.equalTo; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary; +import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol; +import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte; +import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger; +import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer; +import org.apache.qpid.protonj2.test.driver.expectations.TransferExpectation; +import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpSequenceMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpTypeMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.StringDescription; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matcher used by a {@link TransferExpectation} to build a matcher for the message + * payload that accompanies the {@link Transfer}. The matcher generally adheres to + * the standard AMQP message format zero layout. + */ +public class TransferMessageMatcher extends TypeSafeMatcher { + + private final TransferExpectation expectation; + + private HeaderMatcher headersMatcher; + private DeliveryAnnotationsMatcher deliveryAnnotationsMatcher; + private MessageAnnotationsMatcher messageAnnotationsMatcher; + private PropertiesMatcher propertiesMatcher; + private ApplicationPropertiesMatcher applicationPropertiesMatcher; + private List bodySectionMatchers = new ArrayList<>(); + private FooterMatcher footersMatcher; + + // String buckets for mismatch error descriptions. + private String headerMatcherFailureDescription; + private String deliveryAnnotationsMatcherFailureDescription; + private String messageAnnotationsMatcherFailureDescription; + private String propertiesMatcherFailureDescription; + private String applicationPropertiesMatcherFailureDescription; + private String msgContentMatcherFailureDescription; + private String footerMatcherFailureDescription; + + public TransferMessageMatcher(TransferExpectation expectation) { + this.expectation = expectation; + } + + public TransferExpectation also() { + return expectation; + } + + public TransferExpectation and() { + return expectation; + } + + @Override + protected boolean matchesSafely(ByteBuffer receivedBinary) { + final ByteBuffer receivedSlice = receivedBinary.slice().asReadOnlyBuffer(); + + int bytesConsumed = 0; + + // MessageHeader Section + if (headersMatcher != null) { + try { + bytesConsumed += headersMatcher.getInnerMatcher().verify(receivedSlice.slice()); + receivedSlice.position(bytesConsumed); + } catch (Throwable t) { + headerMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to MessageHeaderMatcher: " + receivedSlice; + headerMatcherFailureDescription += "\nMessageHeaderMatcher generated throwable: " + t; + + return false; + } + } + + // DeliveryAnnotations Section + if (deliveryAnnotationsMatcher != null) { + try { + bytesConsumed += deliveryAnnotationsMatcher.getInnerMatcher().verify(receivedSlice.slice()); + receivedSlice.position(bytesConsumed); + } catch (Throwable t) { + deliveryAnnotationsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed " + + "to DeliveryAnnotationsMatcher: " + receivedSlice; + deliveryAnnotationsMatcherFailureDescription += "\nDeliveryAnnotationsMatcher generated throwable: " + t; + + return false; + } + } + + // MessageAnnotations Section + if (messageAnnotationsMatcher != null) { + try { + bytesConsumed += messageAnnotationsMatcher.getInnerMatcher().verify(receivedSlice.slice()); + receivedSlice.position(bytesConsumed); + } catch (Throwable t) { + messageAnnotationsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to " + + "MessageAnnotationsMatcher: " + receivedSlice; + messageAnnotationsMatcherFailureDescription += "\nMessageAnnotationsMatcher generated throwable: " + t; + + return false; + } + } + + // Properties Section + if (propertiesMatcher != null) { + try { + bytesConsumed += propertiesMatcher.getInnerMatcher().verify(receivedSlice.slice()); + receivedSlice.position(bytesConsumed); + } catch (Throwable t) { + propertiesMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to " + + "PropertiesMatcher: " + receivedSlice; + propertiesMatcherFailureDescription += "\nPropertiesMatcher generated throwable: " + t; + + return false; + } + } + + // Application Properties Section + if (applicationPropertiesMatcher != null) { + try { + bytesConsumed += applicationPropertiesMatcher.getInnerMatcher().verify(receivedSlice.slice()); + receivedSlice.position(bytesConsumed); + } catch (Throwable t) { + applicationPropertiesMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to " + + "ApplicationPropertiesMatcher: " + receivedSlice; + applicationPropertiesMatcherFailureDescription += "\nApplicationPropertiesMatcher generated throwable: " + t; + + return false; + } + } + + // Message Content Body Section, already a Matcher + if (!bodySectionMatchers.isEmpty()) { + final ByteBuffer slicedMsgContext = receivedSlice.slice(); + + for (Matcher msgContentMatcher : bodySectionMatchers) { + final int originalReadableBytes = slicedMsgContext.remaining(); + final boolean contentMatches = msgContentMatcher.matches(slicedMsgContext); + if (!contentMatches) { + Description desc = new StringDescription(); + msgContentMatcher.describeTo(desc); + msgContentMatcher.describeMismatch(receivedSlice, desc); + + msgContentMatcherFailureDescription = "\nMessageContentMatcher mismatch Description:"; + msgContentMatcherFailureDescription += desc.toString(); + + return false; + } + + bytesConsumed += originalReadableBytes - slicedMsgContext.remaining(); + receivedSlice.position(bytesConsumed); + } + } + + // Footers Section + if (footersMatcher != null) { + try { + bytesConsumed += footersMatcher.getInnerMatcher().verify(receivedSlice.slice()); + } catch (Throwable t) { + footerMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to " + + "FooterMatcher: " + receivedSlice; + footerMatcherFailureDescription += "\nFooterMatcher generated throwable: " + t; + + return false; + } + } + + return true; + } + + public TransferMessageMatcher withMessageFormat(int format) { + this.expectation.withMessageFormat(format); + return this; + } + + public HeaderMatcher withHeader() { + if (headersMatcher == null) { + headersMatcher = new HeaderMatcher(this); + } + + if (deliveryAnnotationsMatcher != null || messageAnnotationsMatcher != null || + propertiesMatcher != null || applicationPropertiesMatcher != null || + !bodySectionMatchers.isEmpty() || footersMatcher != null) { + + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } else { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(false); + } + + return headersMatcher; + } + + public DeliveryAnnotationsMatcher withDeliveryAnnotations() { + if (deliveryAnnotationsMatcher == null) { + deliveryAnnotationsMatcher = new DeliveryAnnotationsMatcher(this); + } + + if (headersMatcher != null) { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + + if (messageAnnotationsMatcher != null || propertiesMatcher != null || applicationPropertiesMatcher != null || + !bodySectionMatchers.isEmpty() || footersMatcher != null) { + + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } else { + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(false); + } + + return deliveryAnnotationsMatcher; + } + + public MessageAnnotationsMatcher withMessageAnnotations() { + if (messageAnnotationsMatcher == null) { + messageAnnotationsMatcher = new MessageAnnotationsMatcher(this); + } + + if (headersMatcher != null) { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (deliveryAnnotationsMatcher != null) { + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + + if (propertiesMatcher != null || applicationPropertiesMatcher != null || + !bodySectionMatchers.isEmpty() || footersMatcher != null) { + + messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } else { + messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(false); + } + + return messageAnnotationsMatcher; + } + + public PropertiesMatcher withProperties() { + if (propertiesMatcher == null) { + propertiesMatcher = new PropertiesMatcher(this); + } + + if (headersMatcher != null) { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (deliveryAnnotationsMatcher != null) { + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (messageAnnotationsMatcher != null) { + messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + + if (applicationPropertiesMatcher != null || !bodySectionMatchers.isEmpty() || footersMatcher != null) { + propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } else { + propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(false); + } + + return propertiesMatcher; + } + + public ApplicationPropertiesMatcher withApplicationProperties() { + if (applicationPropertiesMatcher == null) { + applicationPropertiesMatcher = new ApplicationPropertiesMatcher(this); + } + + if (headersMatcher != null) { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (deliveryAnnotationsMatcher != null) { + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (messageAnnotationsMatcher != null) { + messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (propertiesMatcher != null) { + propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + + if (!bodySectionMatchers.isEmpty() || footersMatcher != null) { + applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } else { + applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(false); + } + + return applicationPropertiesMatcher; + } + + public TransferMessageMatcher withSequence(List sequence) { + final EncodedAmqpSequenceMatcher matcher = new EncodedAmqpSequenceMatcher(sequence, footersMatcher != null); + + if (!bodySectionMatchers.isEmpty()) { + bodySectionMatchers.get(bodySectionMatchers.size() - 1).setAllowTrailingBytes(true); + } + + bodySectionMatchers.add(matcher); + + return this; + } + + public TransferMessageMatcher withData(byte[] payload) { + final EncodedDataMatcher matcher = new EncodedDataMatcher(payload, footersMatcher != null); + + if (headersMatcher != null) { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (deliveryAnnotationsMatcher != null) { + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (messageAnnotationsMatcher != null) { + messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (propertiesMatcher != null) { + propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (applicationPropertiesMatcher != null) { + applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + + if (!bodySectionMatchers.isEmpty()) { + bodySectionMatchers.get(bodySectionMatchers.size() - 1).setAllowTrailingBytes(true); + } + + bodySectionMatchers.add(matcher); + + return this; + } + + public TransferMessageMatcher withValue(Object value) { + final EncodedAmqpValueMatcher matcher = new EncodedAmqpValueMatcher(value, footersMatcher != null); + + if (headersMatcher != null) { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (deliveryAnnotationsMatcher != null) { + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (messageAnnotationsMatcher != null) { + messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (propertiesMatcher != null) { + propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (applicationPropertiesMatcher != null) { + applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + + if (!bodySectionMatchers.isEmpty()) { + bodySectionMatchers.get(bodySectionMatchers.size() - 1).setAllowTrailingBytes(true); + } + + bodySectionMatchers.add(matcher); + + return this; + } + + public FooterMatcher withFooters() { + if (footersMatcher == null) { + footersMatcher = new FooterMatcher(this); + } + + if (headersMatcher != null) { + headersMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (deliveryAnnotationsMatcher != null) { + deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (messageAnnotationsMatcher != null) { + messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (propertiesMatcher != null) { + propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + if (applicationPropertiesMatcher != null) { + applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true); + } + + if (!bodySectionMatchers.isEmpty()) { + bodySectionMatchers.get(bodySectionMatchers.size() - 1).setAllowTrailingBytes(true); + } + + return footersMatcher; + } + + @Override + public void describeTo(Description description) { + description.appendText("a Binary encoding of a Transfer frames payload, containing an AMQP message"); + } + + @Override + protected void describeMismatchSafely(ByteBuffer item, Description mismatchDescription) { + mismatchDescription.appendText("\nActual encoded form of the full Transfer frame payload: ").appendValue(item); + + // MessageHeaders Section + if (headerMatcherFailureDescription != null) { + mismatchDescription.appendText("\nMessageHeadersMatcherFailed!"); + mismatchDescription.appendText(headerMatcherFailureDescription); + return; + } + + // MessageHeaders Section + if (deliveryAnnotationsMatcherFailureDescription != null) { + mismatchDescription.appendText("\nDeliveryAnnotationsMatcherFailed!"); + mismatchDescription.appendText(deliveryAnnotationsMatcherFailureDescription); + return; + } + + // MessageAnnotations Section + if (messageAnnotationsMatcherFailureDescription != null) { + mismatchDescription.appendText("\nMessageAnnotationsMatcherFailed!"); + mismatchDescription.appendText(messageAnnotationsMatcherFailureDescription); + return; + } + + // Properties Section + if (propertiesMatcherFailureDescription != null) { + mismatchDescription.appendText("\nPropertiesMatcherFailed!"); + mismatchDescription.appendText(propertiesMatcherFailureDescription); + return; + } + + // Application Properties Section + if (applicationPropertiesMatcherFailureDescription != null) { + mismatchDescription.appendText("\nApplicationPropertiesMatcherFailed!"); + mismatchDescription.appendText(applicationPropertiesMatcherFailureDescription); + return; + } + + // Message Content Body Section + if (msgContentMatcherFailureDescription != null) { + mismatchDescription.appendText("\nContentMatcherFailed!"); + mismatchDescription.appendText(msgContentMatcherFailureDescription); + return; + } + + // Footer Section + if (footerMatcherFailureDescription != null) { + mismatchDescription.appendText("\nContentMatcherFailed!"); + mismatchDescription.appendText(footerMatcherFailureDescription); + } + } + + public static final class HeaderMatcher { + + private final org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher matcher = + new org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher(false); + + private final TransferMessageMatcher transferMatcher; + + public HeaderMatcher(TransferMessageMatcher transferMatcher) { + this.transferMatcher = transferMatcher; + } + + public TransferMessageMatcher also() { + return transferMatcher; + } + + public TransferMessageMatcher and() { + return transferMatcher; + } + + public HeaderMatcher withDurability(boolean durable) { + matcher.withDurable(equalTo(durable)); + return this; + } + + public HeaderMatcher withDurability(Boolean durable) { + matcher.withDurable(equalTo(durable)); + return this; + } + + public HeaderMatcher withPriority(byte priority) { + matcher.withPriority(equalTo(UnsignedByte.valueOf(priority))); + return this; + } + + public HeaderMatcher withPriority(UnsignedByte priority) { + matcher.withPriority(equalTo(priority)); + return this; + } + + public HeaderMatcher withTimeToLive(int timeToLive) { + matcher.withTtl(equalTo(UnsignedInteger.valueOf(timeToLive))); + return this; + } + + public HeaderMatcher withTimeToLive(long timeToLive) { + matcher.withTtl(equalTo(UnsignedInteger.valueOf(timeToLive))); + return this; + } + + public HeaderMatcher withTimeToLive(UnsignedInteger timeToLive) { + matcher.withTtl(equalTo(timeToLive)); + return this; + } + + public HeaderMatcher withFirstAcquirer(boolean durable) { + matcher.withFirstAcquirer(equalTo(durable)); + return this; + } + + public HeaderMatcher withFirstAcquirer(Boolean durable) { + matcher.withFirstAcquirer(equalTo(durable)); + return this; + } + + public HeaderMatcher withDeliveryCount(int deliveryCount) { + matcher.withDeliveryCount(equalTo(UnsignedInteger.valueOf(deliveryCount))); + return this; + } + + public HeaderMatcher withDeliveryCount(long deliveryCount) { + matcher.withDeliveryCount(equalTo(UnsignedInteger.valueOf(deliveryCount))); + return this; + } + + public HeaderMatcher withDeliveryCount(UnsignedInteger deliveryCount) { + matcher.withDeliveryCount(equalTo(deliveryCount)); + return this; + } + + org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher getInnerMatcher() { + return matcher; + } + } + + public static final class DeliveryAnnotationsMatcher { + + private final org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher matcher = + new org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher(false); + + private final TransferMessageMatcher transferMatcher; + + public DeliveryAnnotationsMatcher(TransferMessageMatcher transferMatcher) { + this.transferMatcher = transferMatcher; + } + + public TransferMessageMatcher also() { + return transferMatcher; + } + + public TransferMessageMatcher and() { + return transferMatcher; + } + + public DeliveryAnnotationsMatcher withAnnotation(String key, Object value) { + matcher.withEntry(Symbol.valueOf(key), value); + return this; + } + + public DeliveryAnnotationsMatcher withAnnotation(Symbol key, Object value) { + matcher.withEntry(key, value); + return this; + } + + org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher getInnerMatcher() { + return matcher; + } + } + + public static final class MessageAnnotationsMatcher { + + private final org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher matcher = + new org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher(false); + + private final TransferMessageMatcher transferMatcher; + + public MessageAnnotationsMatcher(TransferMessageMatcher transferMatcher) { + this.transferMatcher = transferMatcher; + } + + public TransferMessageMatcher also() { + return transferMatcher; + } + + public TransferMessageMatcher and() { + return transferMatcher; + } + + public MessageAnnotationsMatcher withAnnotation(String key, Object value) { + matcher.withEntry(Symbol.valueOf(key), value); + return this; + } + + public MessageAnnotationsMatcher withAnnotation(Symbol key, Object value) { + matcher.withEntry(key, value); + return this; + } + + org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher getInnerMatcher() { + return matcher; + } + } + + public static final class PropertiesMatcher { + + private final org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher matcher = + new org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher(false); + + private final TransferMessageMatcher transferMatcher; + + public PropertiesMatcher(TransferMessageMatcher transferMatcher) { + this.transferMatcher = transferMatcher; + } + + public TransferMessageMatcher also() { + return transferMatcher; + } + + public TransferMessageMatcher and() { + return transferMatcher; + } + + public PropertiesMatcher withMessageId(Object messageId) { + matcher.withMessageId(messageId); + return this; + } + + public PropertiesMatcher withUserId(byte[] userId) { + matcher.withUserId(userId); + return this; + } + + public PropertiesMatcher withUserId(Binary userId) { + matcher.withUserId(userId); + return this; + } + + public PropertiesMatcher withTo(String to) { + matcher.withTo(to); + return this; + } + + public PropertiesMatcher withSubject(String subject) { + matcher.withSubject(subject); + return this; + } + + public PropertiesMatcher withReplyTo(String replyTo) { + matcher.withReplyTo(replyTo); + return this; + } + + public PropertiesMatcher withCorrelationId(Object correlationId) { + matcher.withCorrelationId(correlationId); + return this; + } + + public PropertiesMatcher withContentType(String contentType) { + matcher.withContentType(contentType); + return this; + } + + public PropertiesMatcher withContentType(Symbol contentType) { + matcher.withContentType(contentType); + return this; + } + + public PropertiesMatcher withContentEncoding(String contentEncoding) { + matcher.withContentEncoding(contentEncoding); + return this; + } + + public PropertiesMatcher withContentEncoding(Symbol contentEncoding) { + matcher.withContentEncoding(contentEncoding); + return this; + } + + public PropertiesMatcher withAbsoluteExpiryTime(int absoluteExpiryTime) { + matcher.withAbsoluteExpiryTime(absoluteExpiryTime); + return this; + } + + public PropertiesMatcher withAbsoluteExpiryTime(long absoluteExpiryTime) { + matcher.withAbsoluteExpiryTime(absoluteExpiryTime); + return this; + } + + public PropertiesMatcher withAbsoluteExpiryTime(Long absoluteExpiryTime) { + matcher.withAbsoluteExpiryTime(absoluteExpiryTime); + return this; + } + + public PropertiesMatcher withCreationTime(int creationTime) { + matcher.withCreationTime(creationTime); + return this; + } + + public PropertiesMatcher withCreationTime(long creationTime) { + matcher.withCreationTime(creationTime); + return this; + } + + public PropertiesMatcher withCreationTime(Long creationTime) { + matcher.withCreationTime(creationTime); + return this; + } + + public PropertiesMatcher withGroupId(String groupId) { + matcher.withGroupId(groupId); + return this; + } + + public PropertiesMatcher withGroupSequence(int groupSequence) { + matcher.withGroupSequence(groupSequence); + return this; + } + + public PropertiesMatcher withGroupSequence(long groupSequence) { + matcher.withGroupSequence(groupSequence); + return this; + } + + public PropertiesMatcher withGroupSequence(Long groupSequence) { + matcher.withGroupSequence(groupSequence); + return this; + } + + public PropertiesMatcher withReplyToGroupId(String replyToGroupId) { + matcher.withReplyToGroupId(replyToGroupId); + return this; + } + + org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher getInnerMatcher() { + return matcher; + } + } + + public static final class ApplicationPropertiesMatcher { + + private final org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher matcher = + new org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher(false); + + private final TransferMessageMatcher transferMatcher; + + public ApplicationPropertiesMatcher(TransferMessageMatcher transferMatcher) { + this.transferMatcher = transferMatcher; + } + + public TransferMessageMatcher also() { + return transferMatcher; + } + + public TransferMessageMatcher and() { + return transferMatcher; + } + + public ApplicationPropertiesMatcher withProperty(String key, Object value) { + matcher.withEntry(key, value); + return this; + } + + org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher getInnerMatcher() { + return matcher; + } + } + + public static final class FooterMatcher { + + private final org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher matcher = + new org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher(false); + + private final TransferMessageMatcher transferMatcher; + + public FooterMatcher(TransferMessageMatcher transferMatcher) { + this.transferMatcher = transferMatcher; + } + + public TransferMessageMatcher also() { + return transferMatcher; + } + + public TransferMessageMatcher and() { + return transferMatcher; + } + + public FooterMatcher withFooter(String key, Object value) { + matcher.withEntry(Symbol.valueOf(key), value); + return this; + } + + public FooterMatcher withFooter(Symbol key, Object value) { + matcher.withEntry(key, value); + return this; + } + + org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher getInnerMatcher() { + return matcher; + } + } +} diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java index 95dcd64d4..9e78b1815 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java @@ -173,7 +173,7 @@ protected boolean matchesSafely(final ByteBuffer receivedBinary) { } } - // MessageAnnotations Section + // Footers Section if (footersMatcher != null) { try { bytesConsumed += footersMatcher.verify(receivedSlice.slice()); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java index 2c6539a3f..0eaf4de01 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java @@ -40,7 +40,7 @@ public abstract class EncodedAmqpTypeMatcher extends TypeSafeMatcher private final Symbol descriptorSymbol; private final UnsignedLong descriptorCode; private final Object expectedValue; - private boolean permitTrailingBytes; + private boolean allowTrailingBytes; private DescribedType decodedDescribedType; private boolean unexpectedTrailingBytes; @@ -48,11 +48,19 @@ public EncodedAmqpTypeMatcher(Symbol symbol, UnsignedLong code, Object expectedV this(symbol, code, expectedValue, false); } - public EncodedAmqpTypeMatcher(Symbol symbol, UnsignedLong code, Object expectedValue, boolean permitTrailingBytes) { + public EncodedAmqpTypeMatcher(Symbol symbol, UnsignedLong code, Object expectedValue, boolean allowTrailingBytes) { this.descriptorSymbol = symbol; this.descriptorCode = code; this.expectedValue = expectedValue; - this.permitTrailingBytes = permitTrailingBytes; + this.allowTrailingBytes = allowTrailingBytes; + } + + public void setAllowTrailingBytes(boolean allowTrailingBytes) { + this.allowTrailingBytes = allowTrailingBytes; + } + + public boolean isAllowTrailngBytes() { + return allowTrailingBytes; } protected Object getExpectedValue() { @@ -110,7 +118,7 @@ protected boolean matchesSafely(ByteBuffer receivedBinary) { } } - if (decoded < length && !permitTrailingBytes) { + if (decoded < length && !allowTrailingBytes) { unexpectedTrailingBytes = true; return false; } diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java index e57208363..5982968bc 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java @@ -580,4 +580,370 @@ public void testDetachCanExpectMatcherInDescription() throws Exception { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testTransferInjectAndExpectAPIs() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.remoteFlow().withLinkCredit(1).queue(); + // Script a full message using the inject API + peer.expectTransfer().withMessage() + .withProperties().withCorrelationId("test").and() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withApplicationProperties().withProperty("ap", "pa").and() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withData(new byte[] {0, 1, 2}) + .withHeader().withDurability(true).and() + .withFooters().withFooter("footer", "value"); + peer.expectDetach().respond(); + peer.expectEnd().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectFlow().withLinkCredit(1).withHandle(42); + client.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("ap", "pa").also() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withProperties().withCorrelationId("test").also() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withFooter().withFooter("footer", "value").also() + .withBody().withData(new byte[] {0, 1, 2}).also() + .queue(); + + // Now start and then await the remote grant of credit and out send of a transfer + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(2).now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + client.expectDetach().withHandle(42); + client.expectEnd(); + + client.remoteDetach().now(); + client.remoteEnd().now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + + @Test + public void testTransferInjectAndExpectAPIsFailOnNoMatchInHeader() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.remoteFlow().withLinkCredit(1).queue(); + // Script a full message using the inject API + peer.expectTransfer().withMessage() + .withProperties().withCorrelationId("test").and() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withApplicationProperties().withProperty("ap", "pa").and() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withData(new byte[] {0, 1, 2}) + .withHeader().withDurability(true).and() + .withFooters().withFooter("footer", "value"); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectFlow().withLinkCredit(1).withHandle(42); + client.remoteTransfer().withHeader().withDurability(false).also() + .withApplicationProperties().withProperty("ap", "pa").also() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withProperties().withCorrelationId("test").also() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withFooter().withFooter("footer", "value").also() + .withBody().withData(new byte[] {0, 1, 2}).also() + .queue(); + + // Now start and then await the remote grant of credit and out send of a transfer + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(2).now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + assertThrows(AssertionError.class, () -> peer.waitForScriptToComplete(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testTransferInjectAndExpectAPIsFailOnNoMatchDeliveryAnnotations() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.remoteFlow().withLinkCredit(1).queue(); + // Script a full message using the inject API + peer.expectTransfer().withMessage() + .withProperties().withCorrelationId("test").and() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withApplicationProperties().withProperty("ap", "pa").and() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withData(new byte[] {0, 1, 2}) + .withHeader().withDurability(true).and() + .withFooters().withFooter("footer", "value"); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectFlow().withLinkCredit(1).withHandle(42); + client.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("ap", "pa").also() + .withDeliveryAnnotations().withAnnotation("da", "1").also() + .withProperties().withCorrelationId("test").also() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withFooter().withFooter("footer", "value").also() + .withBody().withData(new byte[] {0, 1, 2}).also() + .queue(); + + // Now start and then await the remote grant of credit and out send of a transfer + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(2).now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + assertThrows(AssertionError.class, () -> peer.waitForScriptToComplete(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testTransferInjectAndExpectAPIsFailOnNoMatchMessageAnnotations() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.remoteFlow().withLinkCredit(1).queue(); + // Script a full message using the inject API + peer.expectTransfer().withMessage() + .withProperties().withCorrelationId("test").and() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withApplicationProperties().withProperty("ap", "pa").and() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withData(new byte[] {0, 1, 2}) + .withHeader().withDurability(true).and() + .withFooters().withFooter("footer", "value"); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectFlow().withLinkCredit(1).withHandle(42); + client.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("ap", "pa").also() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withProperties().withCorrelationId("test").also() + .withMessageAnnotations().withAnnotation("ma", "1").also() + .withFooter().withFooter("footer", "value").also() + .withBody().withData(new byte[] {0, 1, 2}).also() + .queue(); + + // Now start and then await the remote grant of credit and out send of a transfer + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(2).now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + assertThrows(AssertionError.class, () -> peer.waitForScriptToComplete(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testTransferInjectAndExpectAPIsFailOnNoMatchFooter() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.remoteFlow().withLinkCredit(1).queue(); + // Script a full message using the inject API + peer.expectTransfer().withMessage() + .withProperties().withCorrelationId("test").and() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withApplicationProperties().withProperty("ap", "pa").and() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withData(new byte[] {0, 1, 2}) + .withHeader().withDurability(true).and() + .withFooters().withFooter("footer", "value"); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectFlow().withLinkCredit(1).withHandle(42); + client.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("ap", "pa").also() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withProperties().withCorrelationId("test").also() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withFooter().withFooter("footer", "1").also() + .withBody().withData(new byte[] {0, 1, 2}).also() + .queue(); + + // Now start and then await the remote grant of credit and out send of a transfer + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(2).now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + assertThrows(AssertionError.class, () -> peer.waitForScriptToComplete(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testTransferInjectAndExpectAPIsFailOnMissingSection() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.remoteFlow().withLinkCredit(1).queue(); + // Script a full message using the inject API + peer.expectTransfer().withMessage() + .withProperties().withCorrelationId("test").and() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withApplicationProperties().withProperty("ap", "pa").and() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withData(new byte[] {0, 1, 2}) + .withHeader().withDurability(true).and() + .withFooters().withFooter("footer", "value"); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectFlow().withLinkCredit(1).withHandle(42); + client.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("ap", "pa").also() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withProperties().withCorrelationId("test").also() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withBody().withData(new byte[] {0, 1, 2}).also() + .queue(); + + // Now start and then await the remote grant of credit and out send of a transfer + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(2).now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + assertThrows(AssertionError.class, () -> peer.waitForScriptToComplete(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testTransferInjectAndExpectAPIsMapTypePresenceOnly() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.remoteFlow().withLinkCredit(1).queue(); + // Script a full message using the inject API + peer.expectTransfer().withMessage().withMessageFormat(1) + .withProperties().withCorrelationId("test").and() + .withDeliveryAnnotations().also() + .withApplicationProperties().and() + .withMessageAnnotations().also() + .withData(new byte[] {0, 1, 2}) + .withHeader().withDurability(true).and() + .withFooters(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectFlow().withLinkCredit(1).withHandle(42); + client.remoteTransfer().withMessage().withMessageFormat(1) + .withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("ap", "pa").also() + .withDeliveryAnnotations().withAnnotation("da", "ad").also() + .withProperties().withCorrelationId("test").also() + .withMessageAnnotations().withAnnotation("ma", "am").also() + .withFooter().withFooter("footer", "1").also() + .withBody().withData(new byte[] {0, 1, 2}).also() + .queue(); + + // Now start and then await the remote grant of credit and out send of a transfer + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(2).now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } }