Skip to content

Commit

Permalink
Expose CreaedOn and Metadata to event wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
NeilMountford committed Jan 16, 2025
1 parent 731d07f commit a8977cd
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 6 deletions.
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<ItemGroup>
<PackageVersion Include="Azure.Identity" Version="1.13.1" />
<PackageVersion Include="Bogus" Version="35.6.1" />
<PackageVersion Include="FluentAssertions" Version="6.12.1" />
<PackageVersion Include="FluentAssertions" Version="[7.0.0]" allowedVersions="[7.0.0]" />
<PackageVersion Include="Meziantou.Extensions.Logging.Xunit" Version="1.0.7" />
<PackageVersion Include="Microsoft.ApplicationInsights.AspNetCore" Version="2.22.0" />
<PackageVersion Include="Microsoft.ApplicationInsights.WorkerService" Version="2.22.0" />
Expand Down Expand Up @@ -35,4 +35,4 @@
<PackageVersion Include="Xunit.SkippableFact" Version="1.4.13" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.2.0" />
</ItemGroup>
</Project>
</Project>
12 changes: 10 additions & 2 deletions src/LogOtter.CosmosDb.EventStore/CatchUpSubscriptions/Event.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
namespace LogOtter.CosmosDb.EventStore;

public class Event<TBaseEvent>(string streamId, int eventNumber, TBaseEvent body)
public class Event<TBaseEvent>(string streamId, int eventNumber, TBaseEvent body, DateTimeOffset createdOn, Dictionary<string, string> metadata)
where TBaseEvent : class
{
public string StreamId { get; } = streamId;
public int EventNumber { get; } = eventNumber;
public TBaseEvent Body { get; } = body;
public DateTimeOffset CreatedOn { get; } = createdOn;
public Dictionary<string, string> Metadata { get; } = metadata;

public static Event<TBaseEvent> FromStorageEvent(StorageEvent<TBaseEvent> storageEvent)
{
return new Event<TBaseEvent>(storageEvent.StreamId, storageEvent.EventNumber, (TBaseEvent)storageEvent.EventBody);
return new Event<TBaseEvent>(
storageEvent.StreamId,
storageEvent.EventNumber,
(TBaseEvent)storageEvent.EventBody,
storageEvent.CreatedOn,
storageEvent.Metadata
);
}
}
36 changes: 36 additions & 0 deletions src/LogOtter.CosmosDb.EventStore/Repositories/EventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,40 @@ public async Task<TSnapshot> ApplyEvents(string id, int? expectedRevision, Cance

return entity;
}

public async Task<(TSnapshot, EventData<TBaseEvent>[])> ApplyAndGetEvents(string id, int? expectedRevision, params TBaseEvent[] events)
{
return await ApplyAndGetEvents(id, expectedRevision, CancellationToken.None, events);
}

public async Task<(TSnapshot, EventData<TBaseEvent>[])> ApplyAndGetEvents(
string id,
int? expectedRevision,
CancellationToken cancellationToken,
params TBaseEvent[] events
)
{
if (events.Any(e => e.EventStreamId != id))
{
throw new ArgumentException("All events must be for the same entity", nameof(events));
}

var entity = await Get(id, null, true, cancellationToken) ?? new TSnapshot();

foreach (var eventToApply in events)
{
eventToApply.Apply(entity);
}

var streamId = _options.EscapeIdIfRequired(id);

var now = DateTimeOffset.Now;
var eventData = events.Select(e => new EventData<TBaseEvent>(Guid.NewGuid(), e, now)).ToArray();

await eventStore.AppendToStream(streamId, expectedRevision ?? 0, cancellationToken, eventData);

entity.Revision += events.Length;

return (entity, eventData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ public async Task<TSnapshot> ApplyEventsAndUpdateSnapshotImmediately(
params TBaseEvent[] events
)
{
var snapshot = await eventRepository.ApplyEvents(id, expectedRevision, cancellationToken, events);
var (snapshot, eventData) = await eventRepository.ApplyAndGetEvents(id, expectedRevision, cancellationToken, events);

try
{
var partitionKey = _snapshotPartitionKeyResolver(events.First());
var eventRevision = expectedRevision.GetValueOrDefault(0);
var eventsToUpdate = events.Select(e => new Event<TBaseEvent>(id, eventRevision++, e)).ToList();
var eventsToUpdate = eventData.Select(e => new Event<TBaseEvent>(id, eventRevision++, e.Body, e.CreatedOn, e.Metadata)).ToList();

await snapshotRepository.ApplyEventsToSnapshot(id, partitionKey, eventsToUpdate, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using LogOtter.CosmosDb.EventStore.Tests.TestEvents;

namespace LogOtter.CosmosDb.EventStore.Tests;

public class EventDataConversionTests
{
[Fact]
public void WrapperEventIsCreatedFromStorageEvent()
{
var id = Guid.NewGuid().ToString();
var name = "Bob Bobertson";

var testEvent = new TestEventCreated(id, name);
var eventId = Guid.NewGuid();

var createdOn = DateTimeOffset.UtcNow;
var eventData = new EventData<TestEvent>(eventId, testEvent, createdOn, new Dictionary<string, string> { { "key1", "value1" } });
var storageEvent = new StorageEvent<TestEvent>(id, eventData, 3);
var wrapperEvent = Event<TestEvent>.FromStorageEvent(storageEvent);

wrapperEvent.StreamId.Should().Be(id);
wrapperEvent.EventNumber.Should().Be(3);
wrapperEvent.CreatedOn.Should().Be(createdOn);
wrapperEvent.Metadata.Should().ContainKey("key1").WhoseValue.Should().Be("value1");
}
}

0 comments on commit a8977cd

Please sign in to comment.