Skip to content

Commit

Permalink
Fix retry logic in LockableContext when trying to acquire a lock (#889)
Browse files Browse the repository at this point in the history
* Fix retry

* Fix retry logic

* Fix
  • Loading branch information
yangmsft authored Nov 10, 2023
1 parent 8ba122e commit 0116754
Showing 1 changed file with 21 additions and 16 deletions.
37 changes: 21 additions & 16 deletions cs/src/core/ClientSession/LockableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ internal static unsafe bool DoInternalTryLock<FasterSession, TLockableKey>(Faste
{
ref var key = ref keys[keyIdx];
if (keyIdx == start || clientSession.fht.LockTable.GetBucketIndex(key.KeyHash) != clientSession.fht.LockTable.GetBucketIndex(keys[keyIdx - 1].KeyHash))
{
{
for (int numRetriesForKey = 0; ;)
{
{
OperationStatus status = clientSession.fht.InternalLock(key.KeyHash, new(LockOperationType.Lock, key.LockType));
bool fail = false;
if (status == OperationStatus.SUCCESS)
Expand All @@ -125,8 +125,13 @@ internal static unsafe bool DoInternalTryLock<FasterSession, TLockableKey>(Faste
// CancellationToken can accompany either of the other two mechanisms
fail |= timeout.Ticks > 0 && DateTime.UtcNow.Ticks - startTime.Ticks > timeout.Ticks;
fail |= cancellationToken.IsCancellationRequested;
if (!fail && status == OperationStatus.SUCCESS)
break; // out of the retry loop
if (!fail)
{
if (status == OperationStatus.SUCCESS)
break; // Out of the retry loop

continue; // Retry in next iteration
}

clientSession.fht.HandleImmediateNonPendingRetryStatus<Input, Output, Context, FasterSession>(status, fasterSession);

Expand Down Expand Up @@ -202,7 +207,7 @@ private static unsafe LockType DoLockOp<FasterSession, TLockableKey>(FasterSessi

/// <inheritdoc/>
public void Lock<TLockableKey>(TLockableKey[] keys) where TLockableKey : ILockableKey => Lock(keys, 0, keys.Length);

/// <inheritdoc/>
public void Lock<TLockableKey>(TLockableKey[] keys, int start, int count)
where TLockableKey : ILockableKey
Expand All @@ -222,8 +227,8 @@ public void Lock<TLockableKey>(TLockableKey[] keys, int start, int count)
}

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, int maxRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
public bool TryLock<TLockableKey>(TLockableKey[] keys, int maxRetriesPerKey, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, maxRetriesPerKey, default, cancellationToken);

/// <inheritdoc/>
Expand All @@ -233,7 +238,7 @@ public bool TryLock<TLockableKey>(TLockableKey[] keys, int start, int count, int

/// <inheritdoc/>
public bool TryLock<TLockableKey>(TLockableKey[] keys, TimeSpan timeout, CancellationToken cancellationToken = default)
where TLockableKey : ILockableKey
where TLockableKey : ILockableKey
=> TryLock(keys, 0, keys.Length, -1, timeout, cancellationToken);

/// <inheritdoc/>
Expand Down Expand Up @@ -392,7 +397,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, ref ReadOptions readOptions, Context userContext = default, long serialNo = 0)
public Status Read(ref Key key, ref Input input, ref Output output, ref ReadOptions readOptions, Context userContext = default, long serialNo = 0)
=> Read(ref key, ref input, ref output, ref readOptions, out _, userContext, serialNo);

/// <inheritdoc/>
Expand Down Expand Up @@ -521,7 +526,7 @@ public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> R

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(Key key, Input input, ref ReadOptions readOptions, Context context = default, long serialNo = 0, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(Key key, Input input, ref ReadOptions readOptions, Context context = default, long serialNo = 0, CancellationToken token = default)
=> ReadAsync(ref key, ref input, ref readOptions, context, serialNo, token);

/// <inheritdoc/>
Expand All @@ -543,7 +548,7 @@ public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> R

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(Key key, Context context = default, long serialNo = 0, CancellationToken token = default)
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(Key key, Context context = default, long serialNo = 0, CancellationToken token = default)
=> ReadAsync(ref key, context, serialNo, token);

/// <inheritdoc/>
Expand Down Expand Up @@ -720,7 +725,7 @@ public Status RMW(ref Key key, ref Input input, ref Output output, out RecordMet

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input, ref Output output, ref RMWOptions rmwOptions, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0)
public Status RMW(ref Key key, ref Input input, ref Output output, ref RMWOptions rmwOptions, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0)
=> RMW(ref key, rmwOptions.KeyHash ?? clientSession.fht.comparer.GetHashCode64(ref key), ref input, ref output, out recordMetadata, userContext, serialNo);

/// <inheritdoc/>
Expand Down Expand Up @@ -821,7 +826,7 @@ public Status Delete(ref Key key, Context userContext = default, long serialNo =
/// <inheritdoc/>
/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Delete(ref Key key, ref DeleteOptions deleteOptions, Context userContext = default, long serialNo = 0)
public Status Delete(ref Key key, ref DeleteOptions deleteOptions, Context userContext = default, long serialNo = 0)
=> Delete(ref key, deleteOptions.KeyHash ?? clientSession.fht.comparer.GetHashCode64(ref key), userContext, serialNo);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -985,7 +990,7 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);
Expand Down Expand Up @@ -1015,7 +1020,7 @@ public bool SingleDeleter(ref Key key, ref Value value, ref RecordInfo recordInf
=> _clientSession.functions.SingleDeleter(ref key, ref value, ref deleteInfo);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostSingleDeleter(ref key, ref deleteInfo);
Expand Down Expand Up @@ -1116,4 +1121,4 @@ public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, I
}
#endregion IFasterSession
}
}
}

0 comments on commit 0116754

Please sign in to comment.