From bd8253bfa3ac2396a6c58a950388c0e2f6235112 Mon Sep 17 00:00:00 2001 From: Maxim Samsonov Date: Mon, 23 Sep 2024 13:45:34 +0300 Subject: [PATCH] fix: relax request processors load --- dkgServiceNode/Data/DbEnsure.cs | 2 ++ .../Services/RequestProcessors/NodeAddProcessor.cs | 12 +++++++++++- .../Services/RequestProcessors/NrhAddProcessor.cs | 7 +++++++ .../RequestProcessors/RequestProcessorBase.cs | 1 + 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/dkgServiceNode/Data/DbEnsure.cs b/dkgServiceNode/Data/DbEnsure.cs index 3a83101..308bf5a 100644 --- a/dkgServiceNode/Data/DbEnsure.cs +++ b/dkgServiceNode/Data/DbEnsure.cs @@ -270,6 +270,8 @@ public static void Ensure(NpgsqlConnection connection, ILogger logger) Ensure_0_14_0(connection); logger.LogInformation("Tagging 0.14.1"); PuVersionUpdate("0.14.1", connection); + logger.LogInformation("Tagging 0.14.2"); + PuVersionUpdate("0.14.2", connection); } catch (Exception ex) { diff --git a/dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs b/dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs index 9861b2e..b9b3c85 100644 --- a/dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs +++ b/dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs @@ -65,10 +65,15 @@ protected override async Task ProcessRequests() while (!cancellationTokenSource.Token.IsCancellationRequested) { var requests = new List(); - + int counter = 0; while (requestQueue.TryDequeue(out var request) && requests.Count < bulkInsertLimit) { requests.Add(request); + if (counter++ > bulkRestLimit) + { + counter = 0; + await Task.Delay(0); + } } if (requests.Count > 0) @@ -96,6 +101,11 @@ protected override async Task ProcessRequests() foreach (var request in requests) { ncContext!.FinalizeRegistration(request); + if (counter++ > bulkRestLimit) + { + counter = 0; + await Task.Delay(0); + } } } catch (Exception ex) diff --git a/dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs b/dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs index 602dd9a..0d22423 100644 --- a/dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs +++ b/dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs @@ -24,6 +24,7 @@ // POSSIBILITY OF SUCH DAMAGE. using System.Collections.Concurrent; +using System.Diagnostics.Metrics; using System.Text.Json; using dkgServiceNode.Models; using Npgsql; @@ -73,10 +74,16 @@ protected override async Task ProcessRequests() while (!cancellationTokenSource.Token.IsCancellationRequested) { var requests = new List(); + int counter = 0; while (requestQueue.TryDequeue(out var request) && requests.Count < bulkInsertLimit) { requests.Add(request); + if (counter++ > bulkRestLimit) + { + counter = 0; + await Task.Delay(0); + } } if (requests.Count > 0) diff --git a/dkgServiceNode/Services/RequestProcessors/RequestProcessorBase.cs b/dkgServiceNode/Services/RequestProcessors/RequestProcessorBase.cs index d7c5dfd..59ae171 100644 --- a/dkgServiceNode/Services/RequestProcessors/RequestProcessorBase.cs +++ b/dkgServiceNode/Services/RequestProcessors/RequestProcessorBase.cs @@ -34,6 +34,7 @@ public abstract class RequestProcessorBase : IDisposable protected readonly int databaseReconnectDelay = 3000; protected readonly int queueReparseDelay; protected readonly int bulkInsertLimit; + protected readonly int bulkRestLimit = 100; protected readonly CancellationTokenSource cancellationTokenSource = new();