Skip to content

Commit

Permalink
Added public session and transaction control to mongo context. (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgernand authored Dec 28, 2022
1 parent e4abf5e commit feba89b
Showing 1 changed file with 118 additions and 14 deletions.
132 changes: 118 additions & 14 deletions src/Fluxera.Repository.MongoDB/MongoContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@
/// A base class for context implementations for the MongoDB repository.
/// </summary>
[PublicAPI]
public abstract class MongoContext : Disposable
public abstract class MongoContext : Disposable, IDisposable
{
private static readonly ClientSessionOptions SessionOptions = new ClientSessionOptions
{
DefaultTransactionOptions = new TransactionOptions(ReadConcern.Majority, ReadPreference.Primary, WriteConcern.WMajority)
};

private static readonly ConcurrentDictionary<string, IMongoClient> Clients = new ConcurrentDictionary<string, IMongoClient>();
private readonly object lockObject = new object();

private ConcurrentQueue<Func<Task>> commands;

private bool isConfigured;

private Task<IClientSessionHandle> sessionTask;

/// <summary>
/// Initializes a new instance of the <see cref="MongoContext" /> type.
/// </summary>
Expand Down Expand Up @@ -93,15 +101,15 @@ public async Task SaveChangesAsync(CancellationToken cancellationToken = default
// Commit the transaction if a session with a transaction exists.
if(this.Session is not null && this.Session.IsInTransaction)
{
await this.Session.CommitTransactionAsync(cancellationToken);
await this.CommitTransactionAsync(cancellationToken);
}
}
catch
{
// Abort the transaction on error, if a session with a transaction exists
if(this.Session is not null && this.Session.IsInTransaction)
{
await this.Session.AbortTransactionAsync(cancellationToken);
await this.AbortTransactionAsync(cancellationToken);
}
}
finally
Expand All @@ -112,7 +120,7 @@ public async Task SaveChangesAsync(CancellationToken cancellationToken = default
// Start a new transaction, if a session without transaction exists.
if(this.Session is not null && !this.Session.IsInTransaction)
{
this.Session.StartTransaction();
await this.BeginTransactionAsync(cancellationToken);
}
}
}
Expand All @@ -134,6 +142,103 @@ public void DiscardChanges()
this.ClearCommands();
}

/// <summary>
/// Starts a client session.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<IClientSessionHandle> StartSessionAsync(CancellationToken cancellationToken = default)
{
async Task<IClientSessionHandle> Start()
{
IClientSessionHandle handle = await this.Client.StartSessionAsync(SessionOptions, cancellationToken);

this.Session = handle;

return handle;
}

lock(this.lockObject)
{
if(this.sessionTask != null)
{
return this.sessionTask;
}

this.sessionTask = Start();

return this.sessionTask;
}
}

/// <summary>
/// Starts a transaction.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task BeginTransactionAsync(CancellationToken cancellationToken = default)
{
IClientSessionHandle session = await this.StartSessionAsync(cancellationToken).ConfigureAwait(false);

if(!session.IsInTransaction)
{
session.StartTransaction();
}
}

/// <summary>
/// Commits the transaction.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public Task CommitTransactionAsync(CancellationToken cancellationToken = default)
{
if(this.Session is null)
{
throw new InvalidOperationException("The session hasn't been created.");
}

if(!this.Session.IsInTransaction)
{
throw new InvalidOperationException("The session isn't in an active transaction.");
}

return this.Session.CommitTransactionAsync(cancellationToken);
}

/// <summary>
/// Aborts the transaction.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public Task AbortTransactionAsync(CancellationToken cancellationToken = default)
{
if(this.Session is null)
{
throw new InvalidOperationException("The session hasn't been created.");
}

if(!this.Session.IsInTransaction)
{
throw new InvalidOperationException("The session isn't in an active transaction.");
}

return this.Session.AbortTransactionAsync(cancellationToken);
}

/// <summary>
/// Gets a collection.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public IMongoCollection<T> GetCollection<T>()
{
string collectionName = typeof(T).Name.Pluralize();
return this.Database.GetCollection<T>(collectionName);
}

/// <inheritdoc />
protected override void DisposeManaged()
{
Expand Down Expand Up @@ -195,8 +300,8 @@ internal void Configure(RepositoryName repositoryName, IServiceProvider serviceP
{
if(this.Client.Cluster.Description.Type == ClusterType.ReplicaSet)
{
this.Session = this.Client.StartSession();
this.Session.StartTransaction();
this.Session ??= this.StartSession();
this.BeginTransaction();
}
else
{
Expand All @@ -213,15 +318,14 @@ internal void Configure(RepositoryName repositoryName, IServiceProvider serviceP
}
}

/// <summary>
/// Gets a collection.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
internal IMongoCollection<T> GetCollection<T>()
private IClientSessionHandle StartSession()
{
string collectionName = typeof(T).Name.Pluralize();
return this.Database.GetCollection<T>(collectionName);
return AsyncHelper.RunSync(() => this.StartSessionAsync());
}

private void BeginTransaction()
{
AsyncHelper.RunSync(() => this.BeginTransactionAsync());
}

private void ClearCommands()
Expand Down

0 comments on commit feba89b

Please sign in to comment.