diff --git a/sdk/core/Azure.Core.TestFramework/src/TestEnvironment.cs b/sdk/core/Azure.Core.TestFramework/src/TestEnvironment.cs index f2d5150227dd1..fa1b62a3fc931 100644 --- a/sdk/core/Azure.Core.TestFramework/src/TestEnvironment.cs +++ b/sdk/core/Azure.Core.TestFramework/src/TestEnvironment.cs @@ -309,7 +309,7 @@ private async Task ExtendResourceGroupExpirationAsync() string clientSecret = GetOptionalVariable("CLIENT_SECRET"); string authorityHost = GetOptionalVariable("AZURE_AUTHORITY_HOST"); - if (tenantId == null || clientId == null || clientSecret == null || authorityHost == null) + if (tenantId == null || clientId == null || clientSecret == null || authorityHost == null || ResourceManagerUrl == null) { return; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs index 47d210ebe04eb..edb31918c2c43 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs @@ -1007,6 +1007,14 @@ public TrueRuleFilter() : base (default(string)) { } public override string ToString() { throw null; } } } +namespace Azure.Messaging.ServiceBus.Primitives +{ + public static partial class ServiceBusAmqpExtensions + { + public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage FromAmqpBytes(System.BinaryData messageBytes, System.BinaryData lockTokenBytes) { throw null; } + public static System.BinaryData ToAmqpBytes(this Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message) { throw null; } + } +} namespace Microsoft.Extensions.Azure { public static partial class ServiceBusClientBuilderExtensions diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs index a1ffa09fa4073..6438709c9d9f3 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs @@ -171,26 +171,30 @@ private static ArraySegment ReadStreamToArraySegment(Stream stream) } } - public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) + public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) => AmqpAnnotatedMessageToAmqpMessage(sbMessage.AmqpMessage); + + public virtual AmqpMessage AmqpAnnotatedMessageToAmqpMessage(AmqpAnnotatedMessage annotatedMessage) { + Argument.AssertNotNull(annotatedMessage, nameof(annotatedMessage)); + // body - var amqpMessage = sbMessage.ToAmqpMessage(); + AmqpMessage amqpMessage = annotatedMessage.ToAmqpMessage(); // properties - amqpMessage.Properties.MessageId = sbMessage.MessageId; - amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId; - amqpMessage.Properties.ContentType = sbMessage.ContentType; - amqpMessage.Properties.ContentEncoding = sbMessage.AmqpMessage.Properties.ContentEncoding; - amqpMessage.Properties.Subject = sbMessage.Subject; - amqpMessage.Properties.To = sbMessage.To; - amqpMessage.Properties.ReplyTo = sbMessage.ReplyTo; - amqpMessage.Properties.GroupId = sbMessage.SessionId; - amqpMessage.Properties.ReplyToGroupId = sbMessage.ReplyToSessionId; - amqpMessage.Properties.GroupSequence = sbMessage.AmqpMessage.Properties.GroupSequence; - - if (sbMessage.AmqpMessage.Properties.UserId.HasValue) - { - ReadOnlyMemory userId = sbMessage.AmqpMessage.Properties.UserId.Value; + amqpMessage.Properties.MessageId = annotatedMessage.Properties.MessageId?.ToString(); + amqpMessage.Properties.CorrelationId = annotatedMessage.Properties.CorrelationId?.ToString(); + amqpMessage.Properties.ContentType = annotatedMessage.Properties.ContentType; + amqpMessage.Properties.ContentEncoding = annotatedMessage.Properties.ContentEncoding; + amqpMessage.Properties.Subject = annotatedMessage.Properties.Subject; + amqpMessage.Properties.To = annotatedMessage.Properties.To?.ToString(); + amqpMessage.Properties.ReplyTo = annotatedMessage.Properties.ReplyTo?.ToString(); + amqpMessage.Properties.GroupId = annotatedMessage.Properties.GroupId; + amqpMessage.Properties.ReplyToGroupId = annotatedMessage.Properties.ReplyToGroupId; + amqpMessage.Properties.GroupSequence = annotatedMessage.Properties.GroupSequence; + + if (annotatedMessage.Properties.UserId.HasValue) + { + ReadOnlyMemory userId = annotatedMessage.Properties.UserId.Value; if (MemoryMarshal.TryGetArray(userId, out ArraySegment segment)) { amqpMessage.Properties.UserId = segment; @@ -202,14 +206,15 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) } // If TTL is set, it is used to calculate AbsoluteExpiryTime and CreationTime - if (sbMessage.TimeToLive != TimeSpan.MaxValue) + TimeSpan ttl = annotatedMessage.GetTimeToLive(); + if (ttl != TimeSpan.MaxValue) { - amqpMessage.Header.Ttl = (uint)sbMessage.TimeToLive.TotalMilliseconds; + amqpMessage.Header.Ttl = (uint)ttl.TotalMilliseconds; amqpMessage.Properties.CreationTime = DateTime.UtcNow; - if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > sbMessage.TimeToLive) + if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > ttl) { - amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + sbMessage.TimeToLive; + amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + ttl; } else { @@ -218,38 +223,41 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) } else { - if (sbMessage.AmqpMessage.Properties.CreationTime.HasValue) + if (annotatedMessage.Properties.CreationTime.HasValue) { - amqpMessage.Properties.CreationTime = sbMessage.AmqpMessage.Properties.CreationTime.Value.UtcDateTime; + amqpMessage.Properties.CreationTime = annotatedMessage.Properties.CreationTime.Value.UtcDateTime; } - if (sbMessage.AmqpMessage.Properties.AbsoluteExpiryTime.HasValue) + if (annotatedMessage.Properties.AbsoluteExpiryTime.HasValue) { - amqpMessage.Properties.AbsoluteExpiryTime = sbMessage.AmqpMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime; + amqpMessage.Properties.AbsoluteExpiryTime = annotatedMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime; } } // message annotations - foreach (KeyValuePair kvp in sbMessage.AmqpMessage.MessageAnnotations) + foreach (KeyValuePair kvp in annotatedMessage.MessageAnnotations) { switch (kvp.Key) { case AmqpMessageConstants.ScheduledEnqueueTimeUtcName: - if ((sbMessage.ScheduledEnqueueTime != null) && sbMessage.ScheduledEnqueueTime > DateTimeOffset.MinValue) + DateTimeOffset scheduledEnqueueTime = annotatedMessage.GetScheduledEnqueueTime(); + if (scheduledEnqueueTime != default) { - amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTime.UtcDateTime); + amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ScheduledEnqueueTimeUtcName, scheduledEnqueueTime.UtcDateTime); } break; case AmqpMessageConstants.PartitionKeyName: - if (sbMessage.PartitionKey != null) + string partitionKey = annotatedMessage.GetPartitionKey(); + if (partitionKey != null) { - amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.PartitionKeyName, sbMessage.PartitionKey); + amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.PartitionKeyName, partitionKey); } break; case AmqpMessageConstants.ViaPartitionKeyName: - if (sbMessage.TransactionPartitionKey != null) + string viaPartitionKey = annotatedMessage.GetViaPartitionKey(); + if (viaPartitionKey != null) { - amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ViaPartitionKeyName, sbMessage.TransactionPartitionKey); + amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ViaPartitionKeyName, viaPartitionKey); } break; default: @@ -260,14 +268,14 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) // application properties - if (sbMessage.ApplicationProperties != null && sbMessage.ApplicationProperties.Count > 0) + if (annotatedMessage.ApplicationProperties.Count > 0) { if (amqpMessage.ApplicationProperties == null) { amqpMessage.ApplicationProperties = new ApplicationProperties(); } - foreach (KeyValuePair pair in sbMessage.ApplicationProperties) + foreach (KeyValuePair pair in annotatedMessage.ApplicationProperties) { if (TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out var amqpObject)) { @@ -282,7 +290,7 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) // delivery annotations - foreach (KeyValuePair kvp in sbMessage.AmqpMessage.DeliveryAnnotations) + foreach (KeyValuePair kvp in annotatedMessage.DeliveryAnnotations) { if (TryGetAmqpObjectFromNetObject(kvp.Value, MappingType.ApplicationProperty, out var amqpObject)) { @@ -292,26 +300,26 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) // header - except for ttl which is set above with the properties - if (sbMessage.AmqpMessage.Header.DeliveryCount != null) + if (annotatedMessage.Header.DeliveryCount != null) { - amqpMessage.Header.DeliveryCount = sbMessage.AmqpMessage.Header.DeliveryCount; + amqpMessage.Header.DeliveryCount = annotatedMessage.Header.DeliveryCount; } - if (sbMessage.AmqpMessage.Header.Durable != null) + if (annotatedMessage.Header.Durable != null) { - amqpMessage.Header.Durable = sbMessage.AmqpMessage.Header.Durable; + amqpMessage.Header.Durable = annotatedMessage.Header.Durable; } - if (sbMessage.AmqpMessage.Header.FirstAcquirer != null) + if (annotatedMessage.Header.FirstAcquirer != null) { - amqpMessage.Header.FirstAcquirer = sbMessage.AmqpMessage.Header.FirstAcquirer; + amqpMessage.Header.FirstAcquirer = annotatedMessage.Header.FirstAcquirer; } - if (sbMessage.AmqpMessage.Header.Priority != null) + if (annotatedMessage.Header.Priority != null) { - amqpMessage.Header.Priority = sbMessage.AmqpMessage.Header.Priority; + amqpMessage.Header.Priority = annotatedMessage.Header.Priority; } // footer - foreach (KeyValuePair kvp in sbMessage.AmqpMessage.Footer) + foreach (KeyValuePair kvp in annotatedMessage.Footer) { amqpMessage.Footer.Map.Add(kvp.Key, kvp.Value); } @@ -319,11 +327,10 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) return amqpMessage; } - public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false) + private static AmqpAnnotatedMessage AmqpMessageToAnnotatedMessage(AmqpMessage amqpMessage, bool isPeeked) { Argument.AssertNotNull(amqpMessage, nameof(amqpMessage)); AmqpAnnotatedMessage annotatedMessage; - // body if ((amqpMessage.BodyType & SectionFlag.Data) != 0 && amqpMessage.DataBody != null) @@ -351,7 +358,6 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqp { annotatedMessage = new AmqpAnnotatedMessage(new AmqpMessageBody(Enumerable.Empty>())); } - ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage); SectionFlag sections = amqpMessage.Sections; @@ -540,22 +546,39 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqp } } + return annotatedMessage; + } + + public virtual ServiceBusReceivedMessage AmqpMessageToSBReceivedMessage(AmqpMessage amqpMessage, bool isPeeked = false) + { + AmqpAnnotatedMessage annotatedMessage = AmqpMessageToAnnotatedMessage(amqpMessage, isPeeked); + + ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage); + // lock token - if (amqpMessage.DeliveryTag.Count == GuidSizeInBytes) + sbMessage.LockTokenGuid = ParseGuidBytes(amqpMessage.DeliveryTag); + + amqpMessage.Dispose(); + + return sbMessage; + } + + public virtual Guid ParseGuidBytes(ReadOnlyMemory bytes) + { + if (bytes.Length == GuidSizeInBytes) { - Span guidBytes = stackalloc byte[GuidSizeInBytes]; - amqpMessage.DeliveryTag.AsSpan().CopyTo(guidBytes); - if (!MemoryMarshal.TryRead(guidBytes, out var lockTokenGuid)) + // Use TryRead to avoid allocating an array if we are on a little endian machine. + if (!BitConverter.IsLittleEndian || !MemoryMarshal.TryRead(bytes.Span, out var lockTokenGuid)) { - lockTokenGuid = new Guid(guidBytes.ToArray()); + // Either we are on a big endian machine or the bytes were not a valid GUID. + // Even if the bytes were not valid, use the Guid constructor to leverage the Guid validation rather than throwing ourselves. + lockTokenGuid = new Guid(bytes.ToArray()); } - sbMessage.LockTokenGuid = lockTokenGuid; + return lockTokenGuid; } - amqpMessage.Dispose(); - - return sbMessage; + return default; } internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType mappingType, out object amqpObject) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs index 02ad4bf063a43..f9315b500c54f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs @@ -15,13 +15,15 @@ namespace Azure.Messaging.ServiceBus.Amqp { internal static class AmqpMessageExtensions { - public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) + public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) => ToAmqpMessage(message.AmqpMessage); + + public static AmqpMessage ToAmqpMessage(this AmqpAnnotatedMessage message) { - if (message.AmqpMessage.Body.TryGetData(out IEnumerable> dataBody)) + if (message.Body.TryGetData(out IEnumerable> dataBody)) { return AmqpMessage.Create(dataBody.AsAmqpData()); } - if (message.AmqpMessage.Body.TryGetValue(out object value)) + if (message.Body.TryGetValue(out object value)) { if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(value, MappingType.MessageBody, out object amqpObject)) { @@ -32,12 +34,12 @@ public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) throw new NotSupportedException(Resources.InvalidAmqpMessageValueBody.FormatForUser(amqpObject?.GetType())); } } - if (message.AmqpMessage.Body.TryGetSequence(out IEnumerable> sequence)) + if (message.Body.TryGetSequence(out IEnumerable> sequence)) { return AmqpMessage.Create(sequence.Select(s => new AmqpSequence((IList)s)).ToList()); } - throw new NotSupportedException($"{message.AmqpMessage.Body.GetType()} is not a supported message body type."); + throw new NotSupportedException($"{message.Body.GetType()} is not a supported message body type."); } private static IEnumerable AsAmqpData(this IEnumerable> binaryData) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index fe4912319bed6..6e88bbb1e9983 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -385,7 +385,7 @@ private async Task> ReceiveMessagesAsyn link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome); } - receivedMessages.Add(_messageConverter.AmqpMessageToSBMessage(message)); + receivedMessages.Add(_messageConverter.AmqpMessageToSBReceivedMessage(message)); message.Dispose(); } @@ -1001,7 +1001,7 @@ private async Task> PeekMessagesInterna var payload = (ArraySegment)entry[ManagementConstants.Properties.Message]; var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true); - message = _messageConverter.AmqpMessageToSBMessage(amqpMessage, true); + message = _messageConverter.AmqpMessageToSBReceivedMessage(amqpMessage, true); messages.Add(message); } @@ -1311,7 +1311,7 @@ internal virtual async Task> ReceiveDef var payload = (ArraySegment)entry[ManagementConstants.Properties.Message]; var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true); - var message = _messageConverter.AmqpMessageToSBMessage(amqpMessage); + var message = _messageConverter.AmqpMessageToSBReceivedMessage(amqpMessage); if (entry.TryGetValue(ManagementConstants.Properties.LockToken, out var lockToken)) { message.LockTokenGuid = lockToken; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Compatibility/ServiceBusAmqpExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Compatibility/ServiceBusAmqpExtensions.cs new file mode 100644 index 0000000000000..41580ef2dd476 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Compatibility/ServiceBusAmqpExtensions.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using Azure.Messaging.ServiceBus.Amqp; +using Microsoft.Azure.Amqp; +using AmqpLib = Microsoft.Azure.Amqp; + +namespace Azure.Messaging.ServiceBus.Primitives +{ + /// + /// Extensions for converting to and from AMQP. + /// + public static class ServiceBusAmqpExtensions + { + /// + /// Converts the to its serialized AMQP form. + /// + /// + public static BinaryData ToAmqpBytes(this ServiceBusReceivedMessage message) + { + var stream = AmqpMessageConverter.Default.AmqpAnnotatedMessageToAmqpMessage(message.GetRawAmqpMessage()).ToStream(); + return BinaryData.FromStream(stream); + } + + /// + /// Constructs a from its serialized AMQP form. + /// + /// The AMQP message bytes. + /// The lock token bytes. + /// + public static ServiceBusReceivedMessage FromAmqpBytes(BinaryData messageBytes, BinaryData lockTokenBytes) + { + var bufferStream = BufferListStream.Create(messageBytes.ToStream(), 4096); + var amqpMessage = AmqpLib.AmqpMessage.CreateInputMessage(bufferStream); + + var message = AmqpMessageConverter.Default.AmqpMessageToSBReceivedMessage(amqpMessage); + message.LockTokenGuid = AmqpMessageConverter.Default.ParseGuidBytes(lockTokenBytes); + return message; + } + } +} \ No newline at end of file diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs index a0cbc8b92cb44..8a90e891cf77e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs @@ -131,7 +131,7 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage( amqpMessage.MessageAnnotations[AmqpMessageConstants.DeadLetterSourceName] = deadLetterSource; amqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueueSequenceNumberName] = enqueuedSequenceNumber; amqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueuedTimeUtcName] = enqueuedTime.UtcDateTime; - amqpMessage.MessageAnnotations[AmqpMessageConstants.MessageStateName] = serviceBusMessageState; + amqpMessage.MessageAnnotations[AmqpMessageConstants.MessageStateName] = (int)serviceBusMessageState; return new ServiceBusReceivedMessage(amqpMessage) { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs index 1bc028adcf724..f430cceecf188 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConverterTests.cs @@ -25,7 +25,7 @@ public void ConvertAmqpMessageToSBMessage() data.Value = messageBody; var amqpMessage = AmqpMessage.Create(data); - var sbMessage = converter.AmqpMessageToSBMessage(amqpMessage); + var sbMessage = converter.AmqpMessageToSBReceivedMessage(amqpMessage); ReadOnlyMemory sbBody = sbMessage.Body; Assert.AreEqual(messageBody, sbBody.ToArray()); } @@ -65,7 +65,7 @@ public void ConvertSBMessageToAmqpMessageAndBack() sbMessage.ApplicationProperties.Add("UserProperty", "SomeUserProperty"); var amqpMessage = converter.SBMessageToAmqpMessage(sbMessage); - var convertedSbMessage = converter.AmqpMessageToSBMessage(amqpMessage); + var convertedSbMessage = converter.AmqpMessageToSBReceivedMessage(amqpMessage); Assert.AreEqual("SomeUserProperty", convertedSbMessage.ApplicationProperties["UserProperty"]); Assert.AreEqual(messageBody, convertedSbMessage.Body.ToArray()); @@ -104,7 +104,7 @@ public void PeekedMessageShouldNotIncrementDeliveryCount() var amqpMessage = AmqpMessage.Create(amqpValue); amqpMessage.Header.DeliveryCount = 2; - var sbMessage = converter.AmqpMessageToSBMessage(amqpMessage, isPeeked: true); + var sbMessage = converter.AmqpMessageToSBReceivedMessage(amqpMessage, isPeeked: true); sbMessage.SequenceNumber = 1L; Assert.AreEqual(2, sbMessage.DeliveryCount); @@ -121,7 +121,7 @@ public void ReceivedMessageShouldIncrementDeliveryCount() var amqpMessage = AmqpMessage.Create(amqpValue); amqpMessage.Header.DeliveryCount = 2; - var sbMessage = converter.AmqpMessageToSBMessage(amqpMessage, isPeeked: false); + var sbMessage = converter.AmqpMessageToSBReceivedMessage(amqpMessage, isPeeked: false); sbMessage.SequenceNumber = 1L; Assert.AreEqual(3, sbMessage.DeliveryCount); @@ -132,13 +132,13 @@ public void CanParseDictionaryValueSection() { var converter = new AmqpMessageConverter(); var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = new Dictionary { { "key", "value" } } }); - var sbMessage = converter.AmqpMessageToSBMessage(amqpMessage); + var sbMessage = converter.AmqpMessageToSBReceivedMessage(amqpMessage); var body = sbMessage.GetRawAmqpMessage().Body; Assert.IsTrue(body.TryGetValue(out object val)); Assert.AreEqual("value", ((Dictionary)val)["key"]); amqpMessage = AmqpMessage.Create(new AmqpValue { Value = new AmqpMap { { new MapKey("key"), "value" } } }); - sbMessage = converter.AmqpMessageToSBMessage(amqpMessage); + sbMessage = converter.AmqpMessageToSBReceivedMessage(amqpMessage); body = sbMessage.GetRawAmqpMessage().Body; Assert.IsTrue(body.TryGetValue(out val)); Assert.AreEqual("value", ((Dictionary)val)["key"]); @@ -152,7 +152,7 @@ public void CanParseMaxAbsoluteExpiryTime() var amqpMessage = AmqpMessage.Create(data); amqpMessage.Properties.AbsoluteExpiryTime = DateTime.MaxValue; - var convertedSbMessage = converter.AmqpMessageToSBMessage(amqpMessage); + var convertedSbMessage = converter.AmqpMessageToSBReceivedMessage(amqpMessage); Assert.AreEqual(DateTimeOffset.MaxValue, convertedSbMessage.ExpiresAt); } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs index 59b4c995d5a96..91af18ea6ec3b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs @@ -9,6 +9,7 @@ using Azure.Core.Amqp; using Azure.Core.Serialization; using Azure.Messaging.ServiceBus.Amqp; +using Azure.Messaging.ServiceBus.Primitives; using Microsoft.Azure.Amqp.Encoding; using NUnit.Framework; @@ -71,6 +72,33 @@ public async Task MessagePropertiesShouldSupportValidPropertyTypes() Assert.IsInstanceOf(typeof(DateTime), receivedMsg.ApplicationProperties["DateTime"]); Assert.IsInstanceOf(typeof(DateTimeOffset), receivedMsg.ApplicationProperties["DateTimeOffset"]); Assert.IsInstanceOf(typeof(TimeSpan), receivedMsg.ApplicationProperties["TimeSpan"]); + + var bytes = receivedMsg.ToAmqpBytes(); + + var copyReceivedMessage = ServiceBusAmqpExtensions.FromAmqpBytes( + bytes, + BinaryData.FromBytes(receivedMsg.LockTokenGuid.ToByteArray())); + + Assert.AreEqual(receivedMsg.LockToken, copyReceivedMessage.LockToken); + Assert.IsInstanceOf(typeof(byte), copyReceivedMessage.ApplicationProperties["byte"]); + Assert.IsInstanceOf(typeof(sbyte), copyReceivedMessage.ApplicationProperties["sbyte"]); + Assert.IsInstanceOf(typeof(char), copyReceivedMessage.ApplicationProperties["char"]); + Assert.IsInstanceOf(typeof(short), copyReceivedMessage.ApplicationProperties["short"]); + Assert.IsInstanceOf(typeof(ushort), copyReceivedMessage.ApplicationProperties["ushort"]); + Assert.IsInstanceOf(typeof(int), copyReceivedMessage.ApplicationProperties["int"]); + Assert.IsInstanceOf(typeof(uint), copyReceivedMessage.ApplicationProperties["uint"]); + Assert.IsInstanceOf(typeof(long), copyReceivedMessage.ApplicationProperties["long"]); + Assert.IsInstanceOf(typeof(ulong), copyReceivedMessage.ApplicationProperties["ulong"]); + Assert.IsInstanceOf(typeof(float), copyReceivedMessage.ApplicationProperties["float"]); + Assert.IsInstanceOf(typeof(double), copyReceivedMessage.ApplicationProperties["double"]); + Assert.IsInstanceOf(typeof(decimal), copyReceivedMessage.ApplicationProperties["decimal"]); + Assert.IsInstanceOf(typeof(bool), copyReceivedMessage.ApplicationProperties["bool"]); + Assert.IsInstanceOf(typeof(Guid), copyReceivedMessage.ApplicationProperties["Guid"]); + Assert.IsInstanceOf(typeof(string), copyReceivedMessage.ApplicationProperties["string"]); + Assert.IsInstanceOf(typeof(Uri), copyReceivedMessage.ApplicationProperties["Uri"]); + Assert.IsInstanceOf(typeof(DateTime), copyReceivedMessage.ApplicationProperties["DateTime"]); + Assert.IsInstanceOf(typeof(DateTimeOffset), copyReceivedMessage.ApplicationProperties["DateTimeOffset"]); + Assert.IsInstanceOf(typeof(TimeSpan), copyReceivedMessage.ApplicationProperties["TimeSpan"]); } } @@ -615,6 +643,63 @@ public async Task CanRoundTripAbsoluteExpiryCreationTime() } } + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task CanSerializeDeserializeAmqpBytes(bool useSession) + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: true, enableSession: useSession)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + var sender = client.CreateSender(scope.QueueName); + var msg = new ServiceBusMessage(new BinaryData(ServiceBusTestUtilities.GetRandomBuffer(100))); + msg.ContentType = "contenttype"; + msg.CorrelationId = "correlationid"; + msg.Subject = "label"; + msg.MessageId = "messageId"; + msg.PartitionKey = "key"; + msg.ApplicationProperties.Add("testProp", "my prop"); + msg.ReplyTo = "replyto"; + + msg.ScheduledEnqueueTime = DateTimeOffset.Now; + if (useSession) + { + msg.SessionId = "key"; + msg.ReplyToSessionId = "replytosession"; + } + + msg.TimeToLive = TimeSpan.FromSeconds(60); + msg.To = "to"; + await sender.SendMessageAsync(msg); + + ServiceBusReceiver receiver; + if (useSession) + receiver = await client.AcceptNextSessionAsync(scope.QueueName); + else + receiver = client.CreateReceiver(scope.QueueName); + + ServiceBusReceivedMessage received = await receiver.ReceiveMessageAsync(); + + var serializedBytes = received.ToAmqpBytes(); + var deserialized = ServiceBusAmqpExtensions.FromAmqpBytes( + serializedBytes, + BinaryData.FromBytes(received.LockTokenGuid.ToByteArray())); + Assert.AreEqual(received.ContentType, deserialized.ContentType); + Assert.AreEqual(received.CorrelationId, deserialized.CorrelationId); + Assert.AreEqual(received.Subject, deserialized.Subject); + Assert.AreEqual(received.MessageId, deserialized.MessageId); + Assert.AreEqual(received.PartitionKey, deserialized.PartitionKey); + Assert.AreEqual(received.ApplicationProperties["testProp"], deserialized.ApplicationProperties["testProp"]); + Assert.AreEqual(received.ReplyTo, deserialized.ReplyTo); + Assert.AreEqual(received.ReplyToSessionId, deserialized.ReplyToSessionId); + Assert.AreEqual(received.ScheduledEnqueueTime, deserialized.ScheduledEnqueueTime); + Assert.AreEqual(received.SessionId, deserialized.SessionId); + Assert.AreEqual(received.TimeToLive, deserialized.TimeToLive); + Assert.AreEqual(received.To, deserialized.To); + Assert.AreEqual(received.LockTokenGuid, deserialized.LockTokenGuid); + } + } + private class TestBody { public string A { get; set; } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs index bc07d2fe0d9f0..7141f4fe1d2bc 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs @@ -4,6 +4,7 @@ using System; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Primitives; using Microsoft.Azure.WebJobs.Description; using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.Host.Bindings; @@ -96,6 +97,7 @@ public void Initialize(ExtensionConfigContext context) .AddConverter(new MessageToStringConverter()) .AddConverter(new MessageToByteArrayConverter()) .AddConverter(message => message.Body) + .AddConverter(ConvertReceivedMessageToBindingData) .AddOpenConverter(typeof(MessageToPocoConverter<>), _options.JsonSerializerSettings); // register our trigger binding provider @@ -108,6 +110,25 @@ public void Initialize(ExtensionConfigContext context) context.AddBindingRule().Bind(bindingProvider); } + internal static ParameterBindingData ConvertReceivedMessageToBindingData(ServiceBusReceivedMessage message) + { + ReadOnlyMemory messageBytes = message.ToAmqpBytes().ToMemory(); + byte[] lockTokenBytes = Guid.Parse(message.LockToken).ToByteArray(); + + // The lock token is a 16 byte GUID + const int lockTokenLength = 16; + + byte[] combinedBytes = new byte[messageBytes.Length + lockTokenLength]; + + // The 16 lock token bytes go in the beginning + lockTokenBytes.CopyTo(combinedBytes.AsSpan()); + + // The AMQP message bytes go after the lock token bytes + messageBytes.CopyTo(combinedBytes.AsMemory(lockTokenLength)); + + return new ParameterBindingData("1.0", "AzureServiceBusReceivedMessage", BinaryData.FromBytes(combinedBytes), "application/octet-stream"); + } + internal static void LogExceptionReceivedEvent(ProcessErrorEventArgs e, ILoggerFactory loggerFactory) { try diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index 5360089de09b0..2dfcc3a79b85f 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -35,4 +35,8 @@ + + + + diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageInteropTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageInteropTests.cs index 0b97ded5a253c..358273f61b2d9 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageInteropTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageInteropTests.cs @@ -9,6 +9,8 @@ using System.IO; using System.Linq; using System.Runtime.Serialization; +using Azure.Messaging.ServiceBus.Primitives; +using Microsoft.Azure.WebJobs.ServiceBus.Config; namespace Microsoft.Azure.ServiceBus.UnitTests.MessageInterop { @@ -33,6 +35,55 @@ public void RunSerializerTests(string testCaseName) Assert.AreEqual(book, returned); } + [Test] + public void ParameterBindingDataTest() + { + var lockToken = Guid.NewGuid(); + var message = ServiceBusModelFactory.ServiceBusReceivedMessage( + body: BinaryData.FromString("body"), + messageId: "messageId", + correlationId: "correlationId", + sessionId: "sessionId", + replyTo: "replyTo", + replyToSessionId: "replyToSessionId", + contentType: "contentType", + subject: "label", + to: "to", + partitionKey: "partitionKey", + viaPartitionKey: "viaPartitionKey", + deadLetterSource: "deadLetterSource", + enqueuedSequenceNumber: 1, + lockTokenGuid: lockToken); + + var bindingData = ServiceBusExtensionConfigProvider.ConvertReceivedMessageToBindingData(message); + Assert.AreEqual("application/octet-stream", bindingData.ContentType); + Assert.AreEqual("1.0", bindingData.Version); + Assert.AreEqual("AzureServiceBusReceivedMessage", bindingData.Source); + + var bytes = bindingData.Content.ToMemory(); + var lockTokenBytes = bytes.Slice(0, 16).ToArray(); + Assert.AreEqual(lockToken.ToByteArray(), lockTokenBytes); + + var deserialized = ServiceBusAmqpExtensions.FromAmqpBytes( + BinaryData.FromBytes(bytes.Slice(16, bytes.Length - 16)), + BinaryData.FromBytes(lockTokenBytes)); + + Assert.AreEqual(message.Body.ToArray(), deserialized.Body.ToArray()); + Assert.AreEqual(message.MessageId, deserialized.MessageId); + Assert.AreEqual(message.CorrelationId, deserialized.CorrelationId); + Assert.AreEqual(message.SessionId, deserialized.SessionId); + Assert.AreEqual(message.ReplyTo, deserialized.ReplyTo); + Assert.AreEqual(message.ReplyToSessionId, deserialized.ReplyToSessionId); + Assert.AreEqual(message.ContentType, deserialized.ContentType); + Assert.AreEqual(message.Subject, deserialized.Subject); + Assert.AreEqual(message.To, deserialized.To); + Assert.AreEqual(message.PartitionKey, deserialized.PartitionKey); + Assert.AreEqual(message.TransactionPartitionKey, deserialized.TransactionPartitionKey); + Assert.AreEqual(message.DeadLetterSource, deserialized.DeadLetterSource); + Assert.AreEqual(message.EnqueuedSequenceNumber, deserialized.EnqueuedSequenceNumber); + Assert.AreEqual(message.LockToken, deserialized.LockToken); + } + private ServiceBusMessage GetBrokeredMessage(XmlObjectSerializer serializer, TestBook book) { byte[] payload = null;