Skip to content

Commit

Permalink
[Host.Memory] Ability to use headers in Memory provider without enabl…
Browse files Browse the repository at this point in the history
…ing message serialization #295

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Aug 22, 2024
1 parent 8f8b340 commit 50277ab
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 13 deletions.
21 changes: 19 additions & 2 deletions docs/provider_memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Introduction](#introduction)
- [Configuration](#configuration)
- [Serialization](#serialization)
- [Virtual Topics](#virtual-topics)
- [Headers](#headers)
- [Auto Declaration](#auto-declaration)
- [Polymorphic message support](#polymorphic-message-support)
- [Lifecycle](#lifecycle)
Expand Down Expand Up @@ -68,6 +68,23 @@ services.AddSlimMessageBus(mbb =>
> When serialization is disabled for in memory passed messages, the exact same object instance send by the producer will be received by the consumer. Therefore state changes on the consumer end will be visible by the producer.
> Consider making the messages immutable (read only) in that case.
### Headers

The headers published to the memory bus are delivered to the consumer.
This is managed by the `cfg.EnableMessageHeaders` setting, which is enabled by default.

````cs
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderMemory(cfg =>
{
// Header passing can be disabled when not needed (and to save on memory allocations)
cfg.EnableMessageHeaders = false
});
});

Before version v2.5.1, to enable header passing the [serialization](#serialization) had to be enabled.

### Virtual Topics

Unlike other transport providers, memory transport does not have true notion of topics (or queues). However, it is still required to use topic names. This is required, so that the bus knows on which virtual topic to deliver the message to, and from what virtual topic to consume from.
Expand All @@ -80,7 +97,7 @@ mbb.Consume<OrderSubmittedEvent>(x => x.Topic(x.MessageType.Name).WithConsumer<O

// alternatively
mbb.Consume<OrderSubmittedEvent>(x => x.Topic("OrderSubmittedEvent").WithConsumer<OrderSubmittedHandler>());
```
````

The producer configuration side should use `.DefaultTopic()` to set the virtual topic name:

Expand Down
21 changes: 19 additions & 2 deletions docs/provider_memory.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Introduction](#introduction)
- [Configuration](#configuration)
- [Serialization](#serialization)
- [Virtual Topics](#virtual-topics)
- [Headers](#headers)
- [Auto Declaration](#auto-declaration)
- [Polymorphic message support](#polymorphic-message-support)
- [Lifecycle](#lifecycle)
Expand Down Expand Up @@ -68,6 +68,23 @@ services.AddSlimMessageBus(mbb =>
> When serialization is disabled for in memory passed messages, the exact same object instance send by the producer will be received by the consumer. Therefore state changes on the consumer end will be visible by the producer.
> Consider making the messages immutable (read only) in that case.
### Headers

The headers published to the memory bus are delivered to the consumer.
This is managed by the `cfg.EnableMessageHeaders` setting, which is enabled by default.

````cs
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderMemory(cfg =>
{
// Header passing can be disabled when not needed (and to save on memory allocations)
cfg.EnableMessageHeaders = false
});
});

Before version v2.5.1, to enable header passing the [serialization](#serialization) had to be enabled.

### Virtual Topics

Unlike other transport providers, memory transport does not have true notion of topics (or queues). However, it is still required to use topic names. This is required, so that the bus knows on which virtual topic to deliver the message to, and from what virtual topic to consume from.
Expand All @@ -80,7 +97,7 @@ mbb.Consume<OrderSubmittedEvent>(x => x.Topic(x.MessageType.Name).WithConsumer<O

// alternatively
mbb.Consume<OrderSubmittedEvent>(x => x.Topic("OrderSubmittedEvent").WithConsumer<OrderSubmittedHandler>());
```
````

The producer configuration side should use `.DefaultTopic()` to set the virtual topic name:

Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.5.0</Version>
<Version>2.5.1</Version>
</PropertyGroup>

</Project>
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ protected override void BuildPendingRequestStore()

public override IDictionary<string, object> CreateHeaders()
{
if (ProviderSettings.EnableMessageSerialization)
if (ProviderSettings.EnableMessageHeaders)
{
return base.CreateHeaders();
}
// Memory bus does not require headers
// Do not use headers
return null;
}

Expand Down
7 changes: 7 additions & 0 deletions src/SlimMessageBus.Host.Memory/MemoryMessageBusSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ public class MemoryMessageBusSettings
/// However, if you prefer to have Publish operations non-blocking (asynchronous), you can disable this setting.
/// </summary>
public bool EnableBlockingPublish { get; set; } = true;

/// <summary>
/// This setting allows you to enable or disable the passing of message headers from the publisher to the consumer.
/// By default, message headers are enabled for ease of use. However, if you are not using headers in the memory bus, disabling them can save on memory allocations.
/// </summary>
public bool EnableMessageHeaders { get; set; } = true;

}
40 changes: 34 additions & 6 deletions src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,34 +72,43 @@ public void When_Create_Given_MessageSerializationEnabled_And_NoSerializerProvid
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task When_Publish_Given_MessageSerializationSetting_Then_DeliversMessageInstanceToRespectiveConsumers(bool enableMessageSerialization)
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, true)]
[InlineData(true, false)]
public async Task When_Publish_Given_MessageSerializationSetting_Then_DeliversMessageInstanceToRespectiveConsumers(bool enableMessageSerialization, bool enableMessageHeaders)
{
// arrange
const string topicA = "topic-a";
const string topicA2 = "topic-a-2";
const string topicB = "topic-b";
var headers = new Dictionary<string, object>
{
["key1"] = "str",
["key2"] = 2
};

_builder.Produce<SomeMessageA>(x => x.DefaultTopic(topicA));
_builder.Produce<SomeMessageB>(x => x.DefaultTopic(topicB));
_builder.Consume<SomeMessageA>(x => x.Topic(topicA).WithConsumer<SomeMessageAConsumer>());
_builder.Consume<SomeMessageA>(x => x.Topic(topicA2).WithConsumer<SomeMessageAConsumer2>());
_builder.Consume<SomeMessageB>(x => x.Topic(topicB).WithConsumer<SomeMessageBConsumer>());

var aConsumerMock = new Mock<SomeMessageAConsumer>();
var aConsumerMock = new Mock<SomeMessageAConsumer>() { CallBase = true };
var aConsumer2Mock = new Mock<SomeMessageAConsumer2>();
var bConsumerMock = new Mock<SomeMessageBConsumer>();

_serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(SomeMessageAConsumer))).Returns(aConsumerMock.Object);
_serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(SomeMessageAConsumer2))).Returns(aConsumer2Mock.Object);
_serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(SomeMessageBConsumer))).Returns(bConsumerMock.Object);

_providerSettings.EnableMessageSerialization = enableMessageSerialization;
_providerSettings.EnableMessageHeaders = enableMessageHeaders;

var m = new SomeMessageA(Guid.NewGuid());

// act
await _subject.Value.ProducePublish(m);
await _subject.Value.ProducePublish(m, headers: headers);

// assert
if (enableMessageSerialization)
Expand All @@ -110,8 +119,21 @@ public async Task When_Publish_Given_MessageSerializationSetting_Then_DeliversMe
{
aConsumerMock.Verify(x => x.OnHandle(m), Times.Once);
}

aConsumerMock.VerifySet(x => x.Context = It.IsAny<IConsumerContext>(), Times.Once);
aConsumerMock.VerifyNoOtherCalls();

if (enableMessageHeaders)
{
// All passed headers should be present in the consumer headers
headers.Should().BeSubsetOf(aConsumerMock.Object.Context.Headers);
}
else
{
// The headers should not be present in the consumer headers
aConsumerMock.Object.Context.Headers.Should().BeNull();
}

aConsumer2Mock.Verify(x => x.OnHandle(It.IsAny<SomeMessageA>()), Times.Never);
aConsumer2Mock.VerifyNoOtherCalls();

Expand Down Expand Up @@ -172,6 +194,7 @@ public async Task When_Publish_Given_PerMessageScopeEnabled_Then_TheScopeIsCreat
scopeProviderMock.Verify(x => x.GetService(typeof(SomeMessageAConsumer)), Times.Once);
scopeProviderMock.Verify(x => x.GetService(typeof(IEnumerable<IConsumerInterceptor<SomeMessageA>>)), Times.Once);

consumerMock.VerifySet(x => x.Context = It.IsAny<IConsumerContext>(), Times.Once);
consumerMock.Verify(x => x.OnHandle(m), Times.Once);
consumerMock.Verify(x => x.Dispose(), Times.Never);
consumerMock.VerifyNoOtherCalls();
Expand Down Expand Up @@ -210,6 +233,7 @@ public async Task When_Publish_Given_PerMessageScopeDisabled_Then_TheScopeIsNotC
_serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once);
_serviceProviderMock.ProviderMock.VerifyNoOtherCalls();

consumerMock.VerifySet(x => x.Context = It.IsAny<IConsumerContext>(), Times.Once);
consumerMock.Verify(x => x.OnHandle(m), Times.Once);
consumerMock.Verify(x => x.Dispose(), Times.Once);
consumerMock.VerifyNoOtherCalls();
Expand Down Expand Up @@ -255,6 +279,7 @@ public async Task When_ProducePublish_Given_PerMessageScopeDisabledOrEnabled_And
// current scope is not changed
MessageScope.Current.Should().BeNull();

consumerMock.VerifySet(x => x.Context = It.IsAny<IConsumerContext>(), Times.Once);
consumerMock.Verify(x => x.OnHandle(m), Times.Once);
consumerMock.Verify(x => x.Dispose(), Times.Once);
consumerMock.VerifyNoOtherCalls();
Expand Down Expand Up @@ -318,6 +343,7 @@ public async Task When_Publish_Given_TwoConsumersOnSameTopic_Then_BothAreInvoked
_serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once);
_serviceProviderMock.ProviderMock.VerifyNoOtherCalls();

consumer1Mock.VerifySet(x => x.Context = It.IsAny<IConsumerContext>(), Times.Once);
consumer1Mock.Verify(x => x.OnHandle(m), Times.Once);
consumer1Mock.VerifyNoOtherCalls();

Expand Down Expand Up @@ -475,8 +501,10 @@ public record SomeMessageA(Guid Value);

public record SomeMessageB(Guid Value);

public class SomeMessageAConsumer : IConsumer<SomeMessageA>, IDisposable
public class SomeMessageAConsumer : IConsumer<SomeMessageA>, IConsumerWithContext, IDisposable
{
public virtual IConsumerContext Context { get; set; }

public virtual void Dispose()
{
// Needed to check disposing
Expand Down

0 comments on commit 50277ab

Please sign in to comment.