Skip to content

Commit

Permalink
CSHARP-2964: Reduce Client Time To Recovery On Topology Changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryLukyanov committed Jul 23, 2020
1 parent 21e68b5 commit e0568c0
Show file tree
Hide file tree
Showing 100 changed files with 6,120 additions and 471 deletions.
5 changes: 4 additions & 1 deletion src/MongoDB.Driver.Core/Core/Configuration/ClusterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,17 @@ private IServerMonitorFactory CreateServerMonitorFactory()
);

var serverMonitorStreamFactory = CreateTcpStreamFactory(serverMonitorTcpStreamSettings);
var serverMonitorSettings = new ServerMonitorSettings(
connectTimeout: serverMonitorTcpStreamSettings.ConnectTimeout,
heartbeatInterval: _serverSettings.HeartbeatInterval);

var serverMonitorConnectionFactory = new BinaryConnectionFactory(
serverMonitorConnectionSettings,
serverMonitorStreamFactory,
new EventAggregator());

return new ServerMonitorFactory(
_serverSettings,
serverMonitorSettings,
serverMonitorConnectionFactory,
_eventAggregator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,11 @@ public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncod
{
return _connection.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken);
}

public void SetReadTimeout(TimeSpan timeout)
{
_connection.SetReadTimeout(timeout);
}
}

private sealed class AcquiredConnection : IConnectionHandle
Expand Down Expand Up @@ -711,6 +716,12 @@ public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncod
return _reference.Instance.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken);
}

public void SetReadTimeout(TimeSpan timeout)
{
ThrowIfDisposed();
_reference.Instance.SetReadTimeout(timeout);
}

private void ThrowIfDisposed()
{
if (_disposed)
Expand Down
96 changes: 59 additions & 37 deletions src/MongoDB.Driver.Core/Core/Connections/BinaryConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ namespace MongoDB.Driver.Core.Connections
internal class BinaryConnection : IConnection
{
// fields
private readonly CancellationToken _backgroundTaskCancellationToken;
private readonly CancellationTokenSource _backgroundTaskCancellationTokenSource;
private readonly CommandEventHelper _commandEventHelper;
private readonly ICompressorSource _compressorSource;
private ConnectionId _connectionId;
Expand Down Expand Up @@ -85,9 +83,6 @@ public BinaryConnection(ServerId serverId, EndPoint endPoint, ConnectionSettings
_connectionInitializer = Ensure.IsNotNull(connectionInitializer, nameof(connectionInitializer));
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));

_backgroundTaskCancellationTokenSource = new CancellationTokenSource();
_backgroundTaskCancellationToken = _backgroundTaskCancellationTokenSource.Token;

_connectionId = new ConnectionId(serverId);
_receiveLock = new SemaphoreSlim(1);
_sendLock = new SemaphoreSlim(1);
Expand Down Expand Up @@ -199,8 +194,6 @@ private void Dispose(bool disposing)
}

var stopwatch = Stopwatch.StartNew();
_backgroundTaskCancellationTokenSource.Cancel();
_backgroundTaskCancellationTokenSource.Dispose();
_receiveLock.Dispose();
_sendLock.Dispose();

Expand Down Expand Up @@ -237,7 +230,7 @@ private void EnsureMessageSizeIsValid(int messageSize)

public void Open(CancellationToken cancellationToken)
{
ThrowIfDisposed();
ThrowIfCancelledOrDisposed(cancellationToken);

TaskCompletionSource<bool> taskCompletionSource = null;
var connecting = false;
Expand Down Expand Up @@ -274,7 +267,7 @@ public void Open(CancellationToken cancellationToken)

public Task OpenAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
ThrowIfCancelledOrDisposed(cancellationToken);

lock (_openLock)
{
Expand Down Expand Up @@ -329,19 +322,19 @@ private async Task OpenHelperAsync(CancellationToken cancellationToken)
}
}

private IByteBuffer ReceiveBuffer()
private IByteBuffer ReceiveBuffer(CancellationToken cancellationToken)
{
try
{
var messageSizeBytes = new byte[4];
_stream.ReadBytes(messageSizeBytes, 0, 4, _backgroundTaskCancellationToken);
_stream.ReadBytes(messageSizeBytes, 0, 4, cancellationToken);
var messageSize = BitConverter.ToInt32(messageSizeBytes, 0);
EnsureMessageSizeIsValid(messageSize);
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
buffer.Length = messageSize;
buffer.SetBytes(0, messageSizeBytes, 0, 4);
_stream.ReadBytes(buffer, 4, messageSize - 4, _backgroundTaskCancellationToken);
_stream.ReadBytes(buffer, 4, messageSize - 4, cancellationToken);
_lastUsedAtUtc = DateTime.UtcNow;
buffer.MakeReadOnly();
return buffer;
Expand Down Expand Up @@ -370,7 +363,7 @@ private IByteBuffer ReceiveBuffer(int responseTo, CancellationToken cancellation
receiveLockRequest.Task.GetAwaiter().GetResult(); // propagate exceptions
while (true)
{
var buffer = ReceiveBuffer();
var buffer = ReceiveBuffer(cancellationToken);
_dropbox.AddMessage(buffer);

if (messageTask.IsCompleted)
Expand All @@ -391,20 +384,20 @@ private IByteBuffer ReceiveBuffer(int responseTo, CancellationToken cancellation
}
}

private async Task<IByteBuffer> ReceiveBufferAsync()
private async Task<IByteBuffer> ReceiveBufferAsync(CancellationToken cancellationToken)
{
try
{
var messageSizeBytes = new byte[4];
var readTimeout = _stream.CanTimeout ? TimeSpan.FromMilliseconds(_stream.ReadTimeout) : Timeout.InfiniteTimeSpan;
await _stream.ReadBytesAsync(messageSizeBytes, 0, 4, readTimeout, _backgroundTaskCancellationToken).ConfigureAwait(false);
await _stream.ReadBytesAsync(messageSizeBytes, 0, 4, readTimeout, cancellationToken).ConfigureAwait(false);
var messageSize = BitConverter.ToInt32(messageSizeBytes, 0);
EnsureMessageSizeIsValid(messageSize);
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
buffer.Length = messageSize;
buffer.SetBytes(0, messageSizeBytes, 0, 4);
await _stream.ReadBytesAsync(buffer, 4, messageSize - 4, readTimeout, _backgroundTaskCancellationToken).ConfigureAwait(false);
await _stream.ReadBytesAsync(buffer, 4, messageSize - 4, readTimeout, cancellationToken).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
buffer.MakeReadOnly();
return buffer;
Expand Down Expand Up @@ -433,7 +426,7 @@ private async Task<IByteBuffer> ReceiveBufferAsync(int responseTo, CancellationT
receiveLockRequest.Task.GetAwaiter().GetResult(); // propagate exceptions
while (true)
{
var buffer = await ReceiveBufferAsync().ConfigureAwait(false);
var buffer = await ReceiveBufferAsync(cancellationToken).ConfigureAwait(false);
_dropbox.AddMessage(buffer);

if (messageTask.IsCompleted)
Expand Down Expand Up @@ -461,7 +454,7 @@ public ResponseMessage ReceiveMessage(
CancellationToken cancellationToken)
{
Ensure.IsNotNull(encoderSelector, nameof(encoderSelector));
ThrowIfDisposedOrNotOpen();
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);

var helper = new ReceiveMessageHelper(this, responseTo, messageEncoderSettings, _compressorSource);
try
Expand All @@ -477,7 +470,7 @@ public ResponseMessage ReceiveMessage(
catch (Exception ex)
{
helper.FailedReceivingMessage(ex);
throw;
throw WrapInOperationCanceledExceptionIfRequired(ex, cancellationToken);
}
}

Expand All @@ -488,7 +481,7 @@ public async Task<ResponseMessage> ReceiveMessageAsync(
CancellationToken cancellationToken)
{
Ensure.IsNotNull(encoderSelector, nameof(encoderSelector));
ThrowIfDisposedOrNotOpen();
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);

var helper = new ReceiveMessageHelper(this, responseTo, messageEncoderSettings, _compressorSource);
try
Expand All @@ -504,7 +497,7 @@ public async Task<ResponseMessage> ReceiveMessageAsync(
catch (Exception ex)
{
helper.FailedReceivingMessage(ex);
throw;
throw WrapInOperationCanceledExceptionIfRequired(ex, cancellationToken);
}
}

Expand All @@ -520,8 +513,7 @@ private void SendBuffer(IByteBuffer buffer, CancellationToken cancellationToken)

try
{
// don't use the caller's cancellationToken because once we start writing a message we have to write the whole thing
_stream.WriteBytes(buffer, 0, buffer.Length, _backgroundTaskCancellationToken);
_stream.WriteBytes(buffer, 0, buffer.Length, cancellationToken);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
Expand Down Expand Up @@ -549,9 +541,8 @@ private async Task SendBufferAsync(IByteBuffer buffer, CancellationToken cancell

try
{
// don't use the caller's cancellationToken because once we start writing a message we have to write the whole thing
var writeTimeout = _stream.CanTimeout ? TimeSpan.FromMilliseconds(_stream.WriteTimeout) : Timeout.InfiniteTimeSpan;
await _stream.WriteBytesAsync(buffer, 0, buffer.Length, writeTimeout, _backgroundTaskCancellationToken).ConfigureAwait(false);
await _stream.WriteBytesAsync(buffer, 0, buffer.Length, writeTimeout, cancellationToken).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
Expand All @@ -570,7 +561,7 @@ private async Task SendBufferAsync(IByteBuffer buffer, CancellationToken cancell
public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
{
Ensure.IsNotNull(messages, nameof(messages));
ThrowIfDisposedOrNotOpen();
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);

var helper = new SendMessagesHelper(this, messages, messageEncoderSettings);
try
Expand Down Expand Up @@ -599,14 +590,14 @@ public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSet
catch (Exception ex)
{
helper.FailedSendingMessages(ex);
throw;
throw WrapInOperationCanceledExceptionIfRequired(ex, cancellationToken);
}
}

public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
{
Ensure.IsNotNull(messages, nameof(messages));
ThrowIfDisposedOrNotOpen();
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);

var helper = new SendMessagesHelper(this, messages, messageEncoderSettings);
try
Expand Down Expand Up @@ -635,10 +626,16 @@ public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, Messag
catch (Exception ex)
{
helper.FailedSendingMessages(ex);
throw;
throw WrapInOperationCanceledExceptionIfRequired(ex, cancellationToken);
}
}

public void SetReadTimeout(TimeSpan timeout)
{
ThrowIfDisposed();
_stream.ReadTimeout = (int)timeout.TotalMilliseconds;
}

// private methods
private bool AnyMessageNeedsToBeCompressed(IEnumerable<RequestMessage> messages)
{
Expand Down Expand Up @@ -698,17 +695,15 @@ private void CompressMessage(
compressedMessageEncoder.WriteMessage(compressedMessage);
}

private void ThrowIfDisposed()
private void ThrowIfCancelledOrDisposed(CancellationToken cancellationToken = default)
{
if (_state.Value == State.Disposed)
{
throw new ObjectDisposedException(GetType().Name);
}
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
}

private void ThrowIfDisposedOrNotOpen()
private void ThrowIfCancelledOrDisposedOrNotOpen(CancellationToken cancellationToken)
{
ThrowIfDisposed();
ThrowIfCancelledOrDisposed(cancellationToken);
if (_state.Value == State.Failed)
{
throw new MongoConnectionClosedException(_connectionId);
Expand All @@ -719,6 +714,14 @@ private void ThrowIfDisposedOrNotOpen()
}
}

private void ThrowIfDisposed()
{
if (_state.Value == State.Disposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}

private Exception WrapException(Exception ex, string action)
{
if (
Expand All @@ -727,7 +730,9 @@ ex is ThreadAbortException ||
ex is StackOverflowException ||
#endif
ex is MongoAuthenticationException ||
ex is OutOfMemoryException)
ex is OutOfMemoryException ||
ex is OperationCanceledException ||
ex is ObjectDisposedException)
{
return ex;
}
Expand All @@ -738,6 +743,23 @@ ex is MongoAuthenticationException ||
}
}

private Exception WrapInOperationCanceledExceptionIfRequired(Exception exception, CancellationToken cancellationToken)
{
if (exception is ObjectDisposedException objectDisposedException)
{
// We expect two cases here:
// objectDisposedException.ObjectName == GetType().Name
// objectDisposedException.Message == "The semaphore has been disposed."
// but since the last one is language-specific, the only option we have is avoiding any additional conditions for ObjectDisposedException
// TODO: this logic should be reviewed in the scope of https://jira.mongodb.org/browse/CSHARP-3165
return new OperationCanceledException($"The {nameof(BinaryConnection)} operation has been cancelled.", exception);
}
else
{
return exception;
}
}

// nested classes
private class Dropbox
{
Expand Down
9 changes: 6 additions & 3 deletions src/MongoDB.Driver.Core/Core/Connections/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson.Serialization;
using MongoDB.Driver.Core.Configuration;
using MongoDB.Driver.Core.WireProtocol.Messages;
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
Expand Down Expand Up @@ -81,6 +78,12 @@ public interface IConnection : IDisposable
/// </value>
ConnectionSettings Settings { get; }

/// <summary>
/// Set read timeout value.
/// </summary>
/// <param name="timeout">The timeout.</param>
void SetReadTimeout(TimeSpan timeout);

// methods
/// <summary>
/// Opens the connection.
Expand Down
Loading

0 comments on commit e0568c0

Please sign in to comment.