Skip to content

Commit

Permalink
Sqlite pooling for connections and commands (#193)
Browse files Browse the repository at this point in the history
* add ServerObjectManagerFactory

* add usage of a command pool

* add more disposal

* save saving increase

* fix tests

* fixes

* push out concurrency and disposablity

* Add a custom task scheduler

* Better usage, don't wait to enqueue to save to channels

* Completely pre-cal batch size to avoid spinning issues

* Try to fix cache counting

* properly dispose things

* format

* clean up

* adjust count and save on current thread

* move batch it's own file

* update a few packages

* fix build and add batch tests

* revert and format

* Revert "save saving increase"

This reverts commit 3b50c85.

* revert change

* adjust and add tests

* Dispose sqlite manager properly

* Make Batch a IMemoryOwner to allow for pooling

* Fix tests

* Upgrade some deps

* try to make tests more explicit

* remove return value

* Use named tuple for all objects
  • Loading branch information
adamhathcock authored Jan 8, 2025
1 parent 11fe8e8 commit ed5bdc9
Show file tree
Hide file tree
Showing 28 changed files with 489 additions and 206 deletions.
6 changes: 3 additions & 3 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<!-- Keep at exactly 7.0.5 for side by side with V2 -->
<PackageVersion Include="Microsoft.Data.Sqlite" Version="7.0.5" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="8.0.11" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="9.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
Expand All @@ -19,15 +19,15 @@
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageVersion Include="NUnit" Version="4.2.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.2.0" />
<PackageVersion Include="Open.ChannelExtensions" Version="8.6.0" />
<PackageVersion Include="Open.ChannelExtensions" Version="9.0.0" />
<PackageVersion Include="Polly" Version="7.2.3" />
<PackageVersion Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageVersion Include="Polly.Extensions.Http" Version="3.0.0" />
<PackageVersion Include="Shouldly" Version="4.2.1" />
<PackageVersion Include="Speckle.Newtonsoft.Json" Version="13.0.2" />
<PackageVersion Include="Speckle.DoubleNumerics" Version="4.0.1" />
<PackageVersion Include="SimpleExec" Version="12.0.0" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
<PackageVersion Include="System.Threading.Channels" Version="9.0.0" />
<GlobalPackageReference Include="PolySharp" Version="1.15.0" />
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="GitVersion.MsBuild" Version="5.12.0" />
Expand Down
10 changes: 6 additions & 4 deletions src/Speckle.Sdk.Dependencies/Pools.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Text;
using Microsoft.Extensions.ObjectPool;

namespace Speckle.Sdk.Dependencies;

public static class Pools
{
public const int DefaultCapacity = 50;

public static Pool<Dictionary<string, object?>> ObjectDictionaries { get; } = new(new ObjectDictionaryPolicy());

private sealed class ObjectDictionaryPolicy : IPooledObjectPolicy<Dictionary<string, object?>>
Expand All @@ -25,7 +27,7 @@ public bool Return(Dictionary<string, object?> obj)
private sealed class ObjectDictionaryPolicy<TKey, TValue> : IPooledObjectPolicy<Dictionary<TKey, TValue>>
where TKey : notnull
{
public Dictionary<TKey, TValue> Create() => new(50);
public Dictionary<TKey, TValue> Create() => new(DefaultCapacity);

public bool Return(Dictionary<TKey, TValue> obj)
{
Expand All @@ -38,7 +40,7 @@ private sealed class ObjectConcurrentDictionaryPolicy<TKey, TValue>
: IPooledObjectPolicy<ConcurrentDictionary<TKey, TValue>>
where TKey : notnull
{
public ConcurrentDictionary<TKey, TValue> Create() => new(Environment.ProcessorCount, 50);
public ConcurrentDictionary<TKey, TValue> Create() => new(Environment.ProcessorCount, DefaultCapacity);

public bool Return(ConcurrentDictionary<TKey, TValue> obj)
{
Expand All @@ -49,7 +51,7 @@ public bool Return(ConcurrentDictionary<TKey, TValue> obj)

private sealed class ObjectListPolicy<T> : IPooledObjectPolicy<List<T>>
{
public List<T> Create() => new(50);
public List<T> Create() => new(DefaultCapacity);

public bool Return(List<T> obj)
{
Expand Down
14 changes: 11 additions & 3 deletions src/Speckle.Sdk.Dependencies/Serialization/Batch.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
namespace Speckle.Sdk.Serialisation.V2.Send;
using System.Buffers;
using Speckle.Sdk.Dependencies;

public class Batch<T>(int capacity) : IHasSize
namespace Speckle.Sdk.Serialisation.V2.Send;

public sealed class Batch<T> : IHasSize, IMemoryOwner<T>
where T : IHasSize
{
private static readonly Pool<List<T>> _pool = Pools.CreateListPool<T>();
#pragma warning disable IDE0032
private readonly List<T> _items = new(capacity);
private readonly List<T> _items = _pool.Get();
private int _batchSize;
#pragma warning restore IDE0032

Expand All @@ -22,4 +26,8 @@ public void TrimExcess()

public int Size => _batchSize;
public List<T> Items => _items;

public void Dispose() => _pool.Return(_items);

public Memory<T> Memory => new(_items.ToArray());
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System.Threading.Channels;
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;

namespace Speckle.Sdk.Serialisation.V2.Send;

public static class ChannelExtensions
{
public static BatchingChannelReader<T, Batch<T>> BatchBySize<T>(
public static BatchingChannelReader<T, IMemoryOwner<T>> BatchBySize<T>(
this ChannelReader<T> source,
int batchSize,
bool singleReader = false,
Expand Down
11 changes: 9 additions & 2 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;
Expand All @@ -13,7 +14,7 @@ public abstract class ChannelSaver<T>
private const int MAX_PARALLELISM_HTTP = 4;
private const int HTTP_CAPACITY = 500;
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
private const int MAX_CACHE_BATCH = 200;
private const int MAX_CACHE_BATCH = 500;

private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
Expand Down Expand Up @@ -46,7 +47,13 @@ public Task Start(CancellationToken cancellationToken = default) =>
public ValueTask Save(T item, CancellationToken cancellationToken = default) =>
_checkCacheChannel.Writer.WriteAsync(item, cancellationToken);

public abstract Task<List<T>> SendToServer(Batch<T> batch, CancellationToken cancellationToken);
public async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch, CancellationToken cancellationToken)
{
await SendToServer((Batch<T>)batch, cancellationToken).ConfigureAwait(false);
return batch;
}

public abstract Task SendToServer(Batch<T> batch, CancellationToken cancellationToken);

public Task Done()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;

Expand All @@ -13,20 +14,20 @@ public class SizeBatchingChannelReader<T>(
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<T, Batch<T>>(x => new(x), source, batchSize, singleReader, syncCont)
) : BatchingChannelReader<T, IMemoryOwner<T>>(x => new Batch<T>(), source, batchSize, singleReader, syncCont)
where T : IHasSize
{
protected override Batch<T> CreateBatch(int capacity) => new(capacity);
protected override IMemoryOwner<T> CreateBatch(int capacity) => new Batch<T>();

protected override void TrimBatch(ref Batch<T> batch, bool isVerifiedFull)
protected override void TrimBatch(ref IMemoryOwner<T> batch, bool isVerifiedFull)
{
if (!isVerifiedFull)
{
batch.TrimExcess();
((Batch<T>)batch).TrimExcess();
}
}

protected override void AddBatchItem(Batch<T> batch, T item) => batch.Add(item);
protected override void AddBatchItem(IMemoryOwner<T> batch, T item) => ((Batch<T>)batch).Add(item);

protected override int GetBatchSize(Batch<T> batch) => batch.Size;
protected override int GetBatchSize(IMemoryOwner<T> batch) => ((Batch<T>)batch).Size;
}
51 changes: 26 additions & 25 deletions src/Speckle.Sdk.Dependencies/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
},
"Microsoft.Extensions.ObjectPool": {
"type": "Direct",
"requested": "[8.0.11, )",
"resolved": "8.0.11",
"contentHash": "6ApKcHNJigXBfZa6XlDQ8feJpq7SG1ogZXg6M4FiNzgd6irs3LUAzo0Pfn4F2ZI9liGnH1XIBR/OtSbZmJAV5w=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "UbsU/gYe4nv1DeqMXIVzDfNNek7Sk2kKuAOXL/Y+sLcAR0HwFUqzg1EPiU88jeHNe0g81aPvvHbvHarQr3r9IA=="
},
"Microsoft.SourceLink.GitHub": {
"type": "Direct",
Expand All @@ -44,13 +44,13 @@
},
"Open.ChannelExtensions": {
"type": "Direct",
"requested": "[8.6.0, )",
"resolved": "8.6.0",
"contentHash": "g5axz417bA6FXifJaBlB0l62gV7dYmknXx0n8lT/LSA3+7isaGMsOjJp5J+H/yXDRe4r+KZrE+bzQcs4Ets2kA==",
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "DP+l5S6G46wcuY4I4kNXE+RDOmJr0DKuMienOdt0mMBN9z7vmLSC8YQbqCyb9i9LNjXj1tgCx5LyitJiRr/v7g==",
"dependencies": {
"Microsoft.Bcl.AsyncInterfaces": "8.0.0",
"System.Collections.Immutable": "8.0.0",
"System.Threading.Channels": "8.0.0"
"Microsoft.Bcl.AsyncInterfaces": "9.0.0",
"System.Collections.Immutable": "9.0.0",
"System.Threading.Channels": "9.0.0"
}
},
"Polly": {
Expand Down Expand Up @@ -88,10 +88,11 @@
},
"System.Threading.Channels": {
"type": "Direct",
"requested": "[8.0.0, )",
"resolved": "8.0.0",
"contentHash": "CMaFr7v+57RW7uZfZkPExsPB6ljwzhjACWW1gfU35Y56rk72B/Wu+sTqxVmGSk4SFUlPc3cjeKND0zktziyjBA==",
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "hzACdIf1C+4Dqos5ijV404b94+LqfIC8nfS3mNpCDFWowb1N3PNfJPopneq32ahWlDeyaPZJqjBk76YFR69Rpg==",
"dependencies": {
"Microsoft.Bcl.AsyncInterfaces": "9.0.0",
"System.Threading.Tasks.Extensions": "4.5.4"
}
},
Expand Down Expand Up @@ -122,8 +123,8 @@
},
"System.Collections.Immutable": {
"type": "Transitive",
"resolved": "8.0.0",
"contentHash": "AurL6Y5BA1WotzlEvVaIDpqzpIPvYnnldxru8oXJU2yFxFUy3+pNXjXd1ymO+RA0rq0+590Q8gaz2l3Sr7fmqg==",
"resolved": "9.0.0",
"contentHash": "QhkXUl2gNrQtvPmtBTQHb0YsUrDiDQ2QS09YbtTTiSjGcf7NBqtYbrG/BE06zcBPCKEwQGzIv13IVdXNOSub2w==",
"dependencies": {
"System.Memory": "4.5.5",
"System.Runtime.CompilerServices.Unsafe": "6.0.0"
Expand Down Expand Up @@ -160,8 +161,8 @@
"Microsoft.Bcl.AsyncInterfaces": {
"type": "CentralTransitive",
"requested": "[5.0.0, )",
"resolved": "8.0.0",
"contentHash": "3WA9q9yVqJp222P3x1wYIGDAkpjAku0TMUaaQV22g6L67AI0LdOIrVS7Ht2vJfLHGSPVuqN94vIr15qn+HEkHw==",
"resolved": "9.0.0",
"contentHash": "owmu2Cr3IQ8yQiBleBHlGk8dSQ12oaF2e7TpzwJKEl4m84kkZJjEY1n33L67Y3zM5jPOjmmbdHjbfiL0RqcMRQ==",
"dependencies": {
"System.Threading.Tasks.Extensions": "4.5.4"
}
Expand All @@ -185,9 +186,9 @@
},
"Microsoft.Extensions.ObjectPool": {
"type": "Direct",
"requested": "[8.0.11, )",
"resolved": "8.0.11",
"contentHash": "6ApKcHNJigXBfZa6XlDQ8feJpq7SG1ogZXg6M4FiNzgd6irs3LUAzo0Pfn4F2ZI9liGnH1XIBR/OtSbZmJAV5w=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "UbsU/gYe4nv1DeqMXIVzDfNNek7Sk2kKuAOXL/Y+sLcAR0HwFUqzg1EPiU88jeHNe0g81aPvvHbvHarQr3r9IA=="
},
"Microsoft.SourceLink.GitHub": {
"type": "Direct",
Expand All @@ -201,9 +202,9 @@
},
"Open.ChannelExtensions": {
"type": "Direct",
"requested": "[8.6.0, )",
"resolved": "8.6.0",
"contentHash": "g5axz417bA6FXifJaBlB0l62gV7dYmknXx0n8lT/LSA3+7isaGMsOjJp5J+H/yXDRe4r+KZrE+bzQcs4Ets2kA=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "DP+l5S6G46wcuY4I4kNXE+RDOmJr0DKuMienOdt0mMBN9z7vmLSC8YQbqCyb9i9LNjXj1tgCx5LyitJiRr/v7g=="
},
"Polly": {
"type": "Direct",
Expand Down Expand Up @@ -240,9 +241,9 @@
},
"System.Threading.Channels": {
"type": "Direct",
"requested": "[8.0.0, )",
"resolved": "8.0.0",
"contentHash": "CMaFr7v+57RW7uZfZkPExsPB6ljwzhjACWW1gfU35Y56rk72B/Wu+sTqxVmGSk4SFUlPc3cjeKND0zktziyjBA=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "hzACdIf1C+4Dqos5ijV404b94+LqfIC8nfS3mNpCDFWowb1N3PNfJPopneq32ahWlDeyaPZJqjBk76YFR69Rpg=="
},
"ILRepack": {
"type": "Transitive",
Expand Down
17 changes: 13 additions & 4 deletions src/Speckle.Sdk/Credentials/AccountManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

namespace Speckle.Sdk.Credentials;

public partial interface IAccountManager : IDisposable;

/// <summary>
/// Manage accounts locally for desktop applications.
/// </summary>
[GenerateAutoInterface]
public class AccountManager(
public sealed class AccountManager(
ISpeckleApplication application,
ILogger<AccountManager> logger,
ISpeckleHttp speckleHttp,
Expand All @@ -40,6 +42,13 @@ ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory
"AccountAddFlow"
);

[AutoInterfaceIgnore]
public void Dispose()
{
_accountStorage.Dispose();
_accountAddLockStorage.Dispose();
}

/// <summary>
/// Gets the basic information about a server.
/// </summary>
Expand Down Expand Up @@ -321,7 +330,7 @@ public IEnumerable<Account> GetAccounts()
{
static bool IsInvalid(Account ac) => ac.userInfo == null || ac.serverInfo == null;

var sqlAccounts = _accountStorage.GetAllObjects().Select(x => JsonConvert.DeserializeObject<Account>(x));
var sqlAccounts = _accountStorage.GetAllObjects().Select(x => JsonConvert.DeserializeObject<Account>(x.Json));
var localAccounts = GetLocalAccounts();

foreach (var acc in sqlAccounts)
Expand Down Expand Up @@ -642,7 +651,7 @@ private void TryLockAccountAddFlow(TimeSpan timespan)
}

// this uses the SQLite transport to store locks
var lockIds = _accountAddLockStorage.GetAllObjects().OrderByDescending(d => d).ToList();
var lockIds = _accountAddLockStorage.GetAllObjects().Select(x => x.Id).OrderByDescending(d => d).ToList();
var now = DateTime.Now;
foreach (var l in lockIds)
{
Expand Down Expand Up @@ -674,7 +683,7 @@ private void UnlockAccountAddFlow()
{
s_isAddingAccount = false;
// make sure all old locks are removed
foreach (var id in _accountAddLockStorage.GetAllObjects())
foreach (var (id, _) in _accountAddLockStorage.GetAllObjects())
{
_accountAddLockStorage.DeleteObject(id);
}
Expand Down
Loading

0 comments on commit ed5bdc9

Please sign in to comment.