diff --git a/RabbitMQ.AMQP.Client/IManagement.cs b/RabbitMQ.AMQP.Client/IManagement.cs index f036434..38cd6ec 100644 --- a/RabbitMQ.AMQP.Client/IManagement.cs +++ b/RabbitMQ.AMQP.Client/IManagement.cs @@ -2,51 +2,64 @@ // and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.AMQP.Client { - public class ModelException : Exception - { - public ModelException(string message) : base(message) - { - } - } - - public class PreconditionFailedException : Exception - { - public PreconditionFailedException(string message) : base(message) - { - } - } - - public class BadRequestException : Exception - { - public BadRequestException(string message) : base(message) - { - } - } - /// - /// IManagement interface and is responsible for managing the AMQP resources. - /// RabbitMQ uses AMQP end point: "/management" to manage the resources like queues, exchanges, and bindings. - /// The management endpoint works like an HTTP RPC endpoint where the client sends a request to the server. + /// The interface is used to manage the + /// AMQP 0.9.1 model + /// topology (exchanges, queues, and bindings). /// public interface IManagement : ILifeCycle { + /// + /// Create an , with an auto-generated name. + /// + /// A builder for IQueueSpecification Queue(); + + /// + /// Create an , with the given name. + /// + /// A builder for IQueueSpecification Queue(string name); + /// + /// Get the for the given queue specification. + /// + /// The + /// The + /// The for the given spec. Task GetQueueInfoAsync(IQueueSpecification queueSpec, CancellationToken cancellationToken = default); + + /// + /// Get the for the given queue name. + /// + /// The queue name + /// The + /// The for the given spec. Task GetQueueInfoAsync(string queueName, CancellationToken cancellationToken = default); + /// + /// Create an , with an auto-generated name. + /// + /// A builder for IExchangeSpecification Exchange(); + + /// + /// Create an , with the given name. + /// + /// A builder for IExchangeSpecification Exchange(string name); + /// + /// Create an . + /// + /// A builder for IBindingSpecification Binding(); } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs index b7bb4e5..43d5f84 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs @@ -16,8 +16,9 @@ namespace RabbitMQ.AMQP.Client.Impl { /// - /// AmqpManagement implements the IManagement - /// See for more information + /// AmqpManagement implements the interface + /// to manage the AMQP 0.9.1 model + /// topology (exchanges, queues, and bindings). /// public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopology { @@ -55,10 +56,9 @@ internal AmqpManagement(AmqpManagementParameters amqpManagementParameters) } /// - /// Create a new queue specification - /// See for more information + /// Create an , with an auto-generated name. /// - /// A builder for IQueueSpecification + /// A builder for public IQueueSpecification Queue() { ThrowIfClosed(); @@ -66,20 +66,20 @@ public IQueueSpecification Queue() } /// - /// Create a new queue specification with the given name - /// See for more information + /// Create an , with the given name. /// - /// A builder for IQueueSpecification + /// A builder for public IQueueSpecification Queue(string name) { return Queue().Name(name); } /// - /// Get the queue info for the given queue specification - /// See for more information + /// Get the for the given queue specification /// - /// Queue Information + /// The + /// The + /// The for the given spec. public Task GetQueueInfoAsync(IQueueSpecification queueSpec, CancellationToken cancellationToken = default) { @@ -87,10 +87,11 @@ public Task GetQueueInfoAsync(IQueueSpecification queueSpec, } /// - /// Get the queue info for the given queue name - /// See for more information + /// Get the for the given queue name. /// - /// Queue Information + /// The queue name + /// The + /// The for the given spec. public async Task GetQueueInfoAsync(string queueName, CancellationToken cancellationToken = default) { @@ -105,19 +106,10 @@ public async Task GetQueueInfoAsync(string queueName, return new DefaultQueueInfo((Map)response.Body); } - internal IQueueSpecification Queue(QueueSpec spec) - { - return Queue().Name(spec.QueueName) - .AutoDelete(spec.IsAutoDelete) - .Exclusive(spec.IsExclusive) - .Arguments(spec.QueueArguments); - } - /// - /// Create a new AMQPExchange specification - /// See for more information + /// Create an , with an auto-generated name. /// - /// A builder for IExchangeSpecification + /// A builder for public IExchangeSpecification Exchange() { ThrowIfClosed(); @@ -125,46 +117,25 @@ public IExchangeSpecification Exchange() } /// - /// - /// Create a new AMQPExchange specification with the given name - /// See for more information + /// Create an , with the given name. /// - /// A builder for IExchangeSpecification + /// A builder for public IExchangeSpecification Exchange(string name) { return Exchange().Name(name); } - internal IExchangeSpecification Exchange(ExchangeSpec spec) - { - return Exchange().Name(spec.ExchangeName) - .AutoDelete(spec.IsAutoDelete) - .Type(spec.ExchangeType) - .Arguments(spec.ExchangeArguments); - } - + /// + /// Create an . + /// + /// A builder for public IBindingSpecification Binding() { return new AmqpBindingSpecification(this); } - internal IBindingSpecification Binding(BindingSpec spec) - { - return Binding() - .SourceExchange(spec.SourceExchangeName) - .DestinationQueue(spec.DestinationQueueName) - .DestinationExchange(spec.DestinationExchangeName) - .Key(spec.BindingKey) - .Arguments(spec.BindingArguments); - } - - ITopologyListener IManagementTopology.TopologyListener() - { - return _amqpManagementParameters.TopologyListener(); - } - /// - /// Open the management session to RabbitMQ + /// Open the management session. /// public override async Task OpenAsync() { @@ -193,169 +164,67 @@ await base.OpenAsync() .ConfigureAwait(false); } - private void OnManagementSessionClosed(IAmqpObject sender, Amqp.Framing.Error error) + /// + /// Close the management session. + /// + public override async Task CloseAsync() { - if (State != State.Closed && error != null) - { - Trace.WriteLine(TraceLevel.Warning, $"Management session closed " + - $"with error: {Utils.ConvertError(error)} " + - $" AmqpManagement: {ToString()}"); - } + // TODO 10 seconds seems too long + TimeSpan closeSpan = TimeSpan.FromSeconds(10); - OnNewStatus(State.Closed, Utils.ConvertError(error)); + if (_managementSession is { IsClosed: false }) + { + OnNewStatus(State.Closing, null); - // Note: TrySetResult *must* be used here - _managementSessionClosedTcs.TrySetResult(true); - } + await _managementSession.CloseAsync(closeSpan) + .ConfigureAwait(false); - private async Task ProcessResponses() - { - try - { - while (_managementSession?.IsClosed == false && - _amqpManagementParameters.IsNativeConnectionClosed == false) - { - if (_receiverLink == null) - { - continue; - } + await _managementSessionClosedTcs.Task.WaitAsync(closeSpan) + .ConfigureAwait(false); - TimeSpan timeout = TimeSpan.FromSeconds(59); - using (Message msg = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false)) - { - if (msg == null) - { - // this is not a problem, it is just a timeout. - // the timeout is set to 60 seconds. - // For the moment I'd trace it at some point we can remove it - Trace.WriteLine(TraceLevel.Verbose, - $"{ToString()} - Timeout {timeout.Seconds} s.. waiting for message."); - continue; - } + _managementSession = null; + _senderLink = null; + _receiverLink = null; - _receiverLink.Accept(msg); - HandleResponseMessage(msg); - } - } - } - catch (Exception e) - { - // TODO this is a serious situation that should be thrown - // up to the client application - if (_receiverLink?.IsClosed == false) - { - Trace.WriteLine(TraceLevel.Error, - $"Receiver link error in management session {e}. Receiver link closed: {_receiverLink?.IsClosed}"); - } + // this is actually a double set of the status, but it is needed to ensure that the status is set to closed + // but the `OnNewStatus` is idempotent + OnNewStatus(State.Closed, null); } - - Trace.WriteLine(TraceLevel.Verbose, "ProcessResponses Task closed"); } - private async Task EnsureReceiverLinkAsync() + /// + /// Convert this instance to a string. + /// + /// The string representation of this instance. + public override string ToString() { - if (_receiverLink == null || _receiverLink.IsClosed) - { - var receiveAttach = new Attach() - { - SndSettleMode = SenderSettleMode.Settled, - RcvSettleMode = ReceiverSettleMode.First, - Properties = new Fields { { new Symbol("paired"), true } }, - LinkName = LinkPairName, - Source = - new Source() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("LINK_DETACH"), }, - Handle = 1, - Target = - new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), }, - }; - - TaskCompletionSource tcs = Utils.CreateTaskCompletionSource(); - var tmpReceiverLink = new ReceiverLink( - _managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) => - { - if (link is ReceiverLink receiverLink) - { - tcs.SetResult(receiverLink); - } - else - { - // TODO create "internal bug" exception type? - var ex = new InvalidOperationException( - "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); - tcs.SetException(ex); - } - }); - - _receiverLink = await tcs.Task - .ConfigureAwait(false); + string info = $"AmqpManagement{{" + + $"AmqpConnection='{_amqpManagementParameters.Connection}', " + + $"Status='{State.ToString()}'" + + $"ReceiverLink closed: {_receiverLink?.IsClosed} " + + $"}}"; - if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink)) - { - // TODO log this case? - } + return info; + } - // TODO - // using a credit of 1 can result in AmqpExceptions in ProcessResponses - _receiverLink.SetCredit(100); - } + ITopologyListener IManagementTopology.TopologyListener() + { + return _amqpManagementParameters.TopologyListener(); } - private async Task EnsureSenderLinkAsync() + internal protected virtual Task InternalSendAsync(Message message, TimeSpan timeout) { - if (_senderLink == null || _senderLink.IsClosed) + if (_senderLink is null) { - var senderAttach = new Attach - { - SndSettleMode = SenderSettleMode.Settled, - RcvSettleMode = ReceiverSettleMode.First, - Properties = new Fields { { new Symbol("paired"), true } }, - LinkName = LinkPairName, - Source = new Source() - { - Address = ManagementNodeAddress, - ExpiryPolicy = new Symbol("LINK_DETACH"), - Timeout = 0, - Dynamic = false, - Durable = 0 - }, - Handle = 0, - Target = new Target() - { - Address = ManagementNodeAddress, - ExpiryPolicy = new Symbol("SESSION_END"), - Timeout = 0, - Dynamic = false, - }, - }; - - TaskCompletionSource tcs = Utils.CreateTaskCompletionSource(); - var tmpSenderLink = new SenderLink( - _managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) => - { - if (link is SenderLink senderLink) - { - tcs.SetResult(senderLink); - } - else - { - // TODO create "internal bug" exception type? - var ex = new InvalidOperationException( - "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); - tcs.SetException(ex); - } - }); - - _senderLink = await tcs.Task - .ConfigureAwait(false); - - if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink)) - { - // TODO log this case? - } + // TODO create "internal bug" exception type? + throw new InvalidOperationException( + "_senderLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); } + + return _senderLink.SendAsync(message, timeout); } - protected void HandleResponseMessage(Message msg) + internal protected void HandleResponseMessage(Message msg) { if (msg.Properties.CorrelationId != null && _requests.TryRemove(msg.Properties.CorrelationId, out TaskCompletionSource? mre)) @@ -371,6 +240,32 @@ protected void HandleResponseMessage(Message msg) } } + internal IQueueSpecification Queue(QueueSpec spec) + { + return Queue().Name(spec.QueueName) + .AutoDelete(spec.IsAutoDelete) + .Exclusive(spec.IsExclusive) + .Arguments(spec.QueueArguments); + } + + internal IExchangeSpecification Exchange(ExchangeSpec spec) + { + return Exchange().Name(spec.ExchangeName) + .AutoDelete(spec.IsAutoDelete) + .Type(spec.ExchangeType) + .Arguments(spec.ExchangeArguments); + } + + internal IBindingSpecification Binding(BindingSpec spec) + { + return Binding() + .SourceExchange(spec.SourceExchangeName) + .DestinationQueue(spec.DestinationQueueName) + .DestinationExchange(spec.DestinationExchangeName) + .Key(spec.BindingKey) + .Arguments(spec.BindingArguments); + } + internal Task RequestAsync(string path, string method, int[] expectedResponseCodes, TimeSpan? timeout = null, @@ -473,6 +368,11 @@ await sendTask.WaitAsync(linkedCts.Token) return result; } + internal void ChangeStatus(State newState, Error? error) + { + OnNewStatus(newState, error); + } + /// /// Check the response of a request and throw exceptions if needed /// @@ -515,64 +415,166 @@ internal void CheckResponse(Message sentMessage, int[] expectedResponseCodes, Me } } - protected virtual Task InternalSendAsync(Message message, TimeSpan timeout) + private void OnManagementSessionClosed(IAmqpObject sender, Amqp.Framing.Error error) { - if (_senderLink is null) + if (State != State.Closed && error != null) { - // TODO create "internal bug" exception type? - throw new InvalidOperationException( - "_senderLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); + Trace.WriteLine(TraceLevel.Warning, $"Management session closed " + + $"with error: {Utils.ConvertError(error)} " + + $" AmqpManagement: {ToString()}"); } - return _senderLink.SendAsync(message, timeout); + OnNewStatus(State.Closed, Utils.ConvertError(error)); + + // Note: TrySetResult *must* be used here + _managementSessionClosedTcs.TrySetResult(true); } - public override async Task CloseAsync() + private async Task ProcessResponses() { - // TODO 10 seconds seems too long - TimeSpan closeSpan = TimeSpan.FromSeconds(10); + try + { + while (_managementSession?.IsClosed == false && + _amqpManagementParameters.IsNativeConnectionClosed == false) + { + if (_receiverLink == null) + { + continue; + } - if (_managementSession is { IsClosed: false }) + TimeSpan timeout = TimeSpan.FromSeconds(59); + using (Message msg = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false)) + { + if (msg == null) + { + // this is not a problem, it is just a timeout. + // the timeout is set to 60 seconds. + // For the moment I'd trace it at some point we can remove it + Trace.WriteLine(TraceLevel.Verbose, + $"{ToString()} - Timeout {timeout.Seconds} s.. waiting for message."); + continue; + } + + _receiverLink.Accept(msg); + HandleResponseMessage(msg); + } + } + } + catch (Exception e) { - OnNewStatus(State.Closing, null); + // TODO this is a serious situation that should be thrown + // up to the client application + if (_receiverLink?.IsClosed == false) + { + Trace.WriteLine(TraceLevel.Error, + $"Receiver link error in management session {e}. Receiver link closed: {_receiverLink?.IsClosed}"); + } + } - await _managementSession.CloseAsync(closeSpan) - .ConfigureAwait(false); + Trace.WriteLine(TraceLevel.Verbose, "ProcessResponses Task closed"); + } - await _managementSessionClosedTcs.Task.WaitAsync(closeSpan) + private async Task EnsureReceiverLinkAsync() + { + if (_receiverLink == null || _receiverLink.IsClosed) + { + var receiveAttach = new Attach() + { + SndSettleMode = SenderSettleMode.Settled, + RcvSettleMode = ReceiverSettleMode.First, + Properties = new Fields { { new Symbol("paired"), true } }, + LinkName = LinkPairName, + Source = + new Source() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("LINK_DETACH"), }, + Handle = 1, + Target = + new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), }, + }; + + TaskCompletionSource tcs = Utils.CreateTaskCompletionSource(); + var tmpReceiverLink = new ReceiverLink( + _managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) => + { + if (link is ReceiverLink receiverLink) + { + tcs.SetResult(receiverLink); + } + else + { + // TODO create "internal bug" exception type? + var ex = new InvalidOperationException( + "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); + tcs.SetException(ex); + } + }); + + _receiverLink = await tcs.Task .ConfigureAwait(false); - _managementSession = null; - _senderLink = null; - _receiverLink = null; + if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink)) + { + // TODO log this case? + } - // this is actually a double set of the status, but it is needed to ensure that the status is set to closed - // but the `OnNewStatus` is idempotent - OnNewStatus(State.Closed, null); + // TODO + // using a credit of 1 can result in AmqpExceptions in ProcessResponses + _receiverLink.SetCredit(100); } } - public override string ToString() + private async Task EnsureSenderLinkAsync() { - string info = $"AmqpManagement{{" + - $"AmqpConnection='{_amqpManagementParameters.Connection}', " + - $"Status='{State.ToString()}'" + - $"ReceiverLink closed: {_receiverLink?.IsClosed} " + - $"}}"; + if (_senderLink == null || _senderLink.IsClosed) + { + var senderAttach = new Attach + { + SndSettleMode = SenderSettleMode.Settled, + RcvSettleMode = ReceiverSettleMode.First, + Properties = new Fields { { new Symbol("paired"), true } }, + LinkName = LinkPairName, + Source = new Source() + { + Address = ManagementNodeAddress, + ExpiryPolicy = new Symbol("LINK_DETACH"), + Timeout = 0, + Dynamic = false, + Durable = 0 + }, + Handle = 0, + Target = new Target() + { + Address = ManagementNodeAddress, + ExpiryPolicy = new Symbol("SESSION_END"), + Timeout = 0, + Dynamic = false, + }, + }; - return info; - } + TaskCompletionSource tcs = Utils.CreateTaskCompletionSource(); + var tmpSenderLink = new SenderLink( + _managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) => + { + if (link is SenderLink senderLink) + { + tcs.SetResult(senderLink); + } + else + { + // TODO create "internal bug" exception type? + var ex = new InvalidOperationException( + "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); + tcs.SetException(ex); + } + }); - internal void ChangeStatus(State newState, Error? error) - { - OnNewStatus(newState, error); - } - } + _senderLink = await tcs.Task + .ConfigureAwait(false); - public class InvalidCodeException : Exception - { - public InvalidCodeException(string message) : base(message) - { + if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink)) + { + // TODO log this case? + } + } } } } diff --git a/RabbitMQ.AMQP.Client/ManagementExceptions.cs b/RabbitMQ.AMQP.Client/ManagementExceptions.cs new file mode 100644 index 0000000..919913e --- /dev/null +++ b/RabbitMQ.AMQP.Client/ManagementExceptions.cs @@ -0,0 +1,64 @@ +// This source code is dual-licensed under the Apache License, version 2.0, +// and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; + +namespace RabbitMQ.AMQP.Client +{ + /// + /// Thrown when management response code or correlation ID mismatch. + /// + public class ModelException : Exception + { + /// + /// Create a new + /// + /// The exception message. + public ModelException(string message) : base(message) + { + } + } + + /// + /// Thrown when management response is "precondition failed". + /// + public class PreconditionFailedException : Exception + { + /// + /// Create a new + /// + /// The exception message. + public PreconditionFailedException(string message) : base(message) + { + } + } + + /// + /// Thrown when management response is code 400, bad request. + /// + public class BadRequestException : Exception + { + /// + /// Create a new + /// + /// The exception message. + public BadRequestException(string message) : base(message) + { + } + } + + /// + /// Thrown when management response code is not expected. + /// + public class InvalidCodeException : Exception + { + /// + /// Create a new + /// + /// The exception message. + public InvalidCodeException(string message) : base(message) + { + } + } +} diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 6c88407..7b38b66 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -531,8 +531,6 @@ RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Name() -> string! RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Type() -> RabbitMQ.AMQP.Client.QueueType RabbitMQ.AMQP.Client.Impl.FieldNotSetException RabbitMQ.AMQP.Client.Impl.FieldNotSetException.FieldNotSetException() -> void -RabbitMQ.AMQP.Client.Impl.InvalidCodeException -RabbitMQ.AMQP.Client.Impl.InvalidCodeException.InvalidCodeException(string! message) -> void RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder @@ -601,6 +599,8 @@ RabbitMQ.AMQP.Client.InternalBugException.InternalBugException() -> void RabbitMQ.AMQP.Client.InternalBugException.InternalBugException(string! message) -> void RabbitMQ.AMQP.Client.InvalidAddressException RabbitMQ.AMQP.Client.InvalidAddressException.InvalidAddressException(string! message) -> void +RabbitMQ.AMQP.Client.InvalidCodeException +RabbitMQ.AMQP.Client.InvalidCodeException.InvalidCodeException(string! message) -> void RabbitMQ.AMQP.Client.IPublisher RabbitMQ.AMQP.Client.IPublisher.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IPublisherBuilder diff --git a/Tests/Management/MockManagementTests.cs b/Tests/Management/MockManagementTests.cs index 462d920..f1f0472 100644 --- a/Tests/Management/MockManagementTests.cs +++ b/Tests/Management/MockManagementTests.cs @@ -16,7 +16,7 @@ public class MockManagementTests() { public class TestAmqpManagement() : AmqpManagement(new AmqpManagementParameters(null!)) { - protected override async Task InternalSendAsync(Message message, TimeSpan timeout) + internal protected override async Task InternalSendAsync(Message message, TimeSpan timeout) { await Task.Delay(1000); } @@ -29,7 +29,7 @@ public TestAmqpManagementOpen() : base(new AmqpManagementParameters(null!)) State = State.Open; } - protected override async Task InternalSendAsync(Message message, TimeSpan timeout) + internal protected override async Task InternalSendAsync(Message message, TimeSpan timeout) { await Task.Delay(1000); }