Skip to content

Commit

Permalink
fix: relax request processors load
Browse files Browse the repository at this point in the history
  • Loading branch information
maxirmx committed Sep 23, 2024
1 parent 66804d3 commit bd8253b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 1 deletion.
2 changes: 2 additions & 0 deletions dkgServiceNode/Data/DbEnsure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ public static void Ensure(NpgsqlConnection connection, ILogger logger)
Ensure_0_14_0(connection);
logger.LogInformation("Tagging 0.14.1");
PuVersionUpdate("0.14.1", connection);
logger.LogInformation("Tagging 0.14.2");
PuVersionUpdate("0.14.2", connection);
}
catch (Exception ex)
{
Expand Down
12 changes: 11 additions & 1 deletion dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ protected override async Task ProcessRequests()
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
var requests = new List<Node>();

int counter = 0;
while (requestQueue.TryDequeue(out var request) && requests.Count < bulkInsertLimit)
{
requests.Add(request);
if (counter++ > bulkRestLimit)
{
counter = 0;
await Task.Delay(0);
}
}

if (requests.Count > 0)
Expand Down Expand Up @@ -96,6 +101,11 @@ protected override async Task ProcessRequests()
foreach (var request in requests)
{
ncContext!.FinalizeRegistration(request);
if (counter++ > bulkRestLimit)
{
counter = 0;
await Task.Delay(0);
}
}
}
catch (Exception ex)
Expand Down
7 changes: 7 additions & 0 deletions dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// POSSIBILITY OF SUCH DAMAGE.

using System.Collections.Concurrent;
using System.Diagnostics.Metrics;
using System.Text.Json;
using dkgServiceNode.Models;
using Npgsql;
Expand Down Expand Up @@ -73,10 +74,16 @@ protected override async Task ProcessRequests()
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
var requests = new List<Node>();
int counter = 0;

while (requestQueue.TryDequeue(out var request) && requests.Count < bulkInsertLimit)
{
requests.Add(request);
if (counter++ > bulkRestLimit)
{
counter = 0;
await Task.Delay(0);
}
}

if (requests.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public abstract class RequestProcessorBase : IDisposable
protected readonly int databaseReconnectDelay = 3000;
protected readonly int queueReparseDelay;
protected readonly int bulkInsertLimit;
protected readonly int bulkRestLimit = 100;

protected readonly CancellationTokenSource cancellationTokenSource = new();

Expand Down

0 comments on commit bd8253b

Please sign in to comment.