Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support serializing received message to/from AMQP bytes #33682

Merged
merged 15 commits into from
Feb 2, 2023
2 changes: 1 addition & 1 deletion sdk/core/Azure.Core.TestFramework/src/TestEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private async Task ExtendResourceGroupExpirationAsync()
string clientSecret = GetOptionalVariable("CLIENT_SECRET");
string authorityHost = GetOptionalVariable("AZURE_AUTHORITY_HOST");

if (tenantId == null || clientId == null || clientSecret == null || authorityHost == null)
if (tenantId == null || clientId == null || clientSecret == null || authorityHost == null || ResourceManagerUrl == null)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,14 @@ public TrueRuleFilter() : base (default(string)) { }
public override string ToString() { throw null; }
}
}
namespace Azure.Messaging.ServiceBus.Primitives
{
public static partial class ServiceBusAmqpExtensions
{
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage FromAmqpBytes(System.BinaryData messageBytes, System.BinaryData lockTokenBytes) { throw null; }
public static System.BinaryData ToAmqpBytes(this Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message) { throw null; }
}
}
namespace Microsoft.Extensions.Azure
{
public static partial class ServiceBusClientBuilderExtensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,26 +171,30 @@ private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream)
}
}

public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) => AmqpAnnotatedMessageToAmqpMessage(sbMessage.AmqpMessage);

public virtual AmqpMessage AmqpAnnotatedMessageToAmqpMessage(AmqpAnnotatedMessage annotatedMessage)
{
Argument.AssertNotNull(annotatedMessage, nameof(annotatedMessage));

// body
var amqpMessage = sbMessage.ToAmqpMessage();
AmqpMessage amqpMessage = annotatedMessage.ToAmqpMessage();

// properties
amqpMessage.Properties.MessageId = sbMessage.MessageId;
amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId;
amqpMessage.Properties.ContentType = sbMessage.ContentType;
amqpMessage.Properties.ContentEncoding = sbMessage.AmqpMessage.Properties.ContentEncoding;
amqpMessage.Properties.Subject = sbMessage.Subject;
amqpMessage.Properties.To = sbMessage.To;
amqpMessage.Properties.ReplyTo = sbMessage.ReplyTo;
amqpMessage.Properties.GroupId = sbMessage.SessionId;
amqpMessage.Properties.ReplyToGroupId = sbMessage.ReplyToSessionId;
amqpMessage.Properties.GroupSequence = sbMessage.AmqpMessage.Properties.GroupSequence;

if (sbMessage.AmqpMessage.Properties.UserId.HasValue)
{
ReadOnlyMemory<byte> userId = sbMessage.AmqpMessage.Properties.UserId.Value;
amqpMessage.Properties.MessageId = annotatedMessage.Properties.MessageId?.ToString();
amqpMessage.Properties.CorrelationId = annotatedMessage.Properties.CorrelationId?.ToString();
amqpMessage.Properties.ContentType = annotatedMessage.Properties.ContentType;
amqpMessage.Properties.ContentEncoding = annotatedMessage.Properties.ContentEncoding;
amqpMessage.Properties.Subject = annotatedMessage.Properties.Subject;
amqpMessage.Properties.To = annotatedMessage.Properties.To?.ToString();
amqpMessage.Properties.ReplyTo = annotatedMessage.Properties.ReplyTo?.ToString();
amqpMessage.Properties.GroupId = annotatedMessage.Properties.GroupId;
amqpMessage.Properties.ReplyToGroupId = annotatedMessage.Properties.ReplyToGroupId;
amqpMessage.Properties.GroupSequence = annotatedMessage.Properties.GroupSequence;

if (annotatedMessage.Properties.UserId.HasValue)
{
ReadOnlyMemory<byte> userId = annotatedMessage.Properties.UserId.Value;
if (MemoryMarshal.TryGetArray(userId, out ArraySegment<byte> segment))
{
amqpMessage.Properties.UserId = segment;
Expand All @@ -202,14 +206,15 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
}

// If TTL is set, it is used to calculate AbsoluteExpiryTime and CreationTime
if (sbMessage.TimeToLive != TimeSpan.MaxValue)
TimeSpan ttl = annotatedMessage.GetTimeToLive();
if (ttl != TimeSpan.MaxValue)
{
amqpMessage.Header.Ttl = (uint)sbMessage.TimeToLive.TotalMilliseconds;
amqpMessage.Header.Ttl = (uint)ttl.TotalMilliseconds;
amqpMessage.Properties.CreationTime = DateTime.UtcNow;

if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > sbMessage.TimeToLive)
if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > ttl)
{
amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + sbMessage.TimeToLive;
amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + ttl;
}
else
{
Expand All @@ -218,38 +223,41 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
}
else
{
if (sbMessage.AmqpMessage.Properties.CreationTime.HasValue)
if (annotatedMessage.Properties.CreationTime.HasValue)
{
amqpMessage.Properties.CreationTime = sbMessage.AmqpMessage.Properties.CreationTime.Value.UtcDateTime;
amqpMessage.Properties.CreationTime = annotatedMessage.Properties.CreationTime.Value.UtcDateTime;
}
if (sbMessage.AmqpMessage.Properties.AbsoluteExpiryTime.HasValue)
if (annotatedMessage.Properties.AbsoluteExpiryTime.HasValue)
{
amqpMessage.Properties.AbsoluteExpiryTime = sbMessage.AmqpMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime;
amqpMessage.Properties.AbsoluteExpiryTime = annotatedMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime;
}
}

// message annotations

foreach (KeyValuePair<string, object> kvp in sbMessage.AmqpMessage.MessageAnnotations)
foreach (KeyValuePair<string, object> kvp in annotatedMessage.MessageAnnotations)
{
switch (kvp.Key)
{
case AmqpMessageConstants.ScheduledEnqueueTimeUtcName:
if ((sbMessage.ScheduledEnqueueTime != null) && sbMessage.ScheduledEnqueueTime > DateTimeOffset.MinValue)
DateTimeOffset scheduledEnqueueTime = annotatedMessage.GetScheduledEnqueueTime();
if (scheduledEnqueueTime != default)
{
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTime.UtcDateTime);
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ScheduledEnqueueTimeUtcName, scheduledEnqueueTime.UtcDateTime);
}
break;
case AmqpMessageConstants.PartitionKeyName:
if (sbMessage.PartitionKey != null)
string partitionKey = annotatedMessage.GetPartitionKey();
if (partitionKey != null)
{
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.PartitionKeyName, sbMessage.PartitionKey);
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.PartitionKeyName, partitionKey);
}
break;
case AmqpMessageConstants.ViaPartitionKeyName:
if (sbMessage.TransactionPartitionKey != null)
string viaPartitionKey = annotatedMessage.GetViaPartitionKey();
if (viaPartitionKey != null)
{
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ViaPartitionKeyName, sbMessage.TransactionPartitionKey);
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ViaPartitionKeyName, viaPartitionKey);
}
break;
default:
Expand All @@ -260,14 +268,14 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)

// application properties

if (sbMessage.ApplicationProperties != null && sbMessage.ApplicationProperties.Count > 0)
if (annotatedMessage.ApplicationProperties.Count > 0)
{
if (amqpMessage.ApplicationProperties == null)
{
amqpMessage.ApplicationProperties = new ApplicationProperties();
}

foreach (KeyValuePair<string, object> pair in sbMessage.ApplicationProperties)
foreach (KeyValuePair<string, object> pair in annotatedMessage.ApplicationProperties)
{
if (TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out var amqpObject))
{
Expand All @@ -282,7 +290,7 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)

// delivery annotations

foreach (KeyValuePair<string, object> kvp in sbMessage.AmqpMessage.DeliveryAnnotations)
foreach (KeyValuePair<string, object> kvp in annotatedMessage.DeliveryAnnotations)
{
if (TryGetAmqpObjectFromNetObject(kvp.Value, MappingType.ApplicationProperty, out var amqpObject))
{
Expand All @@ -292,38 +300,37 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)

// header - except for ttl which is set above with the properties

if (sbMessage.AmqpMessage.Header.DeliveryCount != null)
if (annotatedMessage.Header.DeliveryCount != null)
{
amqpMessage.Header.DeliveryCount = sbMessage.AmqpMessage.Header.DeliveryCount;
amqpMessage.Header.DeliveryCount = annotatedMessage.Header.DeliveryCount;
}
if (sbMessage.AmqpMessage.Header.Durable != null)
if (annotatedMessage.Header.Durable != null)
{
amqpMessage.Header.Durable = sbMessage.AmqpMessage.Header.Durable;
amqpMessage.Header.Durable = annotatedMessage.Header.Durable;
}
if (sbMessage.AmqpMessage.Header.FirstAcquirer != null)
if (annotatedMessage.Header.FirstAcquirer != null)
{
amqpMessage.Header.FirstAcquirer = sbMessage.AmqpMessage.Header.FirstAcquirer;
amqpMessage.Header.FirstAcquirer = annotatedMessage.Header.FirstAcquirer;
}
if (sbMessage.AmqpMessage.Header.Priority != null)
if (annotatedMessage.Header.Priority != null)
{
amqpMessage.Header.Priority = sbMessage.AmqpMessage.Header.Priority;
amqpMessage.Header.Priority = annotatedMessage.Header.Priority;
}

// footer

foreach (KeyValuePair<string, object> kvp in sbMessage.AmqpMessage.Footer)
foreach (KeyValuePair<string, object> kvp in annotatedMessage.Footer)
{
amqpMessage.Footer.Map.Add(kvp.Key, kvp.Value);
}

return amqpMessage;
}

public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false)
private static AmqpAnnotatedMessage AmqpMessageToAnnotatedMessage(AmqpMessage amqpMessage, bool isPeeked)
{
Argument.AssertNotNull(amqpMessage, nameof(amqpMessage));
AmqpAnnotatedMessage annotatedMessage;

// body

if ((amqpMessage.BodyType & SectionFlag.Data) != 0 && amqpMessage.DataBody != null)
Expand Down Expand Up @@ -351,7 +358,6 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqp
{
annotatedMessage = new AmqpAnnotatedMessage(new AmqpMessageBody(Enumerable.Empty<ReadOnlyMemory<byte>>()));
}
ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage);

SectionFlag sections = amqpMessage.Sections;

Expand Down Expand Up @@ -540,22 +546,39 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqp
}
}

return annotatedMessage;
}

public virtual ServiceBusReceivedMessage AmqpMessageToSBReceivedMessage(AmqpMessage amqpMessage, bool isPeeked = false)
{
AmqpAnnotatedMessage annotatedMessage = AmqpMessageToAnnotatedMessage(amqpMessage, isPeeked);

ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage);

// lock token

if (amqpMessage.DeliveryTag.Count == GuidSizeInBytes)
sbMessage.LockTokenGuid = ParseGuidBytes(amqpMessage.DeliveryTag);

amqpMessage.Dispose();

return sbMessage;
}

public virtual Guid ParseGuidBytes(ReadOnlyMemory<byte> bytes)
{
if (bytes.Length == GuidSizeInBytes)
{
Span<byte> guidBytes = stackalloc byte[GuidSizeInBytes];
amqpMessage.DeliveryTag.AsSpan().CopyTo(guidBytes);
if (!MemoryMarshal.TryRead<Guid>(guidBytes, out var lockTokenGuid))
// Use TryRead to avoid allocating an array if we are on a little endian machine.
if (!BitConverter.IsLittleEndian || !MemoryMarshal.TryRead<Guid>(bytes.Span, out var lockTokenGuid))
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
lockTokenGuid = new Guid(guidBytes.ToArray());
// Either we are on a big endian machine or the bytes were not a valid GUID.
// Even if the bytes were not valid, use the Guid constructor to leverage the Guid validation rather than throwing ourselves.
lockTokenGuid = new Guid(bytes.ToArray());
}
sbMessage.LockTokenGuid = lockTokenGuid;
return lockTokenGuid;
}

amqpMessage.Dispose();

return sbMessage;
return default;
}

internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType mappingType, out object amqpObject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ namespace Azure.Messaging.ServiceBus.Amqp
{
internal static class AmqpMessageExtensions
{
public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message)
public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) => ToAmqpMessage(message.AmqpMessage);

public static AmqpMessage ToAmqpMessage(this AmqpAnnotatedMessage message)
{
if (message.AmqpMessage.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
if (message.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
{
return AmqpMessage.Create(dataBody.AsAmqpData());
}
if (message.AmqpMessage.Body.TryGetValue(out object value))
if (message.Body.TryGetValue(out object value))
{
if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(value, MappingType.MessageBody, out object amqpObject))
{
Expand All @@ -32,12 +34,12 @@ public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message)
throw new NotSupportedException(Resources.InvalidAmqpMessageValueBody.FormatForUser(amqpObject?.GetType()));
}
}
if (message.AmqpMessage.Body.TryGetSequence(out IEnumerable<IList<object>> sequence))
if (message.Body.TryGetSequence(out IEnumerable<IList<object>> sequence))
{
return AmqpMessage.Create(sequence.Select(s => new AmqpSequence((IList)s)).ToList());
}

throw new NotSupportedException($"{message.AmqpMessage.Body.GetType()} is not a supported message body type.");
throw new NotSupportedException($"{message.Body.GetType()} is not a supported message body type.");
}

private static IEnumerable<Data> AsAmqpData(this IEnumerable<ReadOnlyMemory<byte>> binaryData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
}

receivedMessages.Add(_messageConverter.AmqpMessageToSBMessage(message));
receivedMessages.Add(_messageConverter.AmqpMessageToSBReceivedMessage(message));
message.Dispose();
}

Expand Down Expand Up @@ -1001,7 +1001,7 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna

var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
message = _messageConverter.AmqpMessageToSBMessage(amqpMessage, true);
message = _messageConverter.AmqpMessageToSBReceivedMessage(amqpMessage, true);
messages.Add(message);
}

Expand Down Expand Up @@ -1311,7 +1311,7 @@ internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDef
var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
var amqpMessage =
AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
var message = _messageConverter.AmqpMessageToSBMessage(amqpMessage);
var message = _messageConverter.AmqpMessageToSBReceivedMessage(amqpMessage);
if (entry.TryGetValue<Guid>(ManagementConstants.Properties.LockToken, out var lockToken))
{
message.LockTokenGuid = lockToken;
Expand Down
Loading