Skip to content

Commit

Permalink
fix: 0.14.0 minor issues
Browse files Browse the repository at this point in the history
  • Loading branch information
maxirmx committed Sep 23, 2024
1 parent 2c9c2bd commit 66804d3
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 168 deletions.
11 changes: 8 additions & 3 deletions dkgServiceNode/Controllers/OpsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,15 @@ public ActionResult<StatusResponse> RegisterNode(Node node)
int? roundId = null;
Round? round = null;
List<Round> rounds = roundContext.GetAllRounds().Where(r => r.StatusValue == (short)RStatus.Registration).ToList();
var xNode = ncContext.GetNodeByAddress(node.Address);
NodesRoundHistory? lastRoundHistory = null;

if (rounds.Count != 0)
{
logger.LogDebug("{count} round{s} open for registration", rounds.Count, rounds.Count != 1 ? "s" : "");
round = rounds[new Random().Next(rounds.Count)];
roundId = round.Id;

var xNode = ncContext.GetNodeByAddress(node.Address);
if (xNode == null)
{
logger.LogDebug("Registering new node for round [{roundId}]", roundId);
Expand Down Expand Up @@ -141,7 +143,6 @@ public ActionResult<StatusResponse> RegisterNode(Node node)
ncContext.UpdateNode(xNode, true);
}

NodesRoundHistory? lastRoundHistory = null;
if (xNode is not null)
{
lastRoundHistory = ncContext.GetLastNodeRoundHistory(xNode.Address, roundId ?? 0);
Expand All @@ -154,8 +155,12 @@ public ActionResult<StatusResponse> RegisterNode(Node node)
}
else
{
if (xNode is not null)
{
lastRoundHistory = ncContext.GetLastNodeRoundHistory(xNode.Address, roundId ?? 0);
}
logger.LogDebug("No rounds open for registration");
res = Ok(CreateStatusResponse(null, null, NStatus.NotRegistered, null));
res = Ok(CreateStatusResponse(null, lastRoundHistory, NStatus.NotRegistered, node.Random));
}
}
stopwatch.Stop();
Expand Down
2 changes: 2 additions & 0 deletions dkgServiceNode/Data/DbEnsure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ public static void Ensure(NpgsqlConnection connection, ILogger logger)
{
logger.LogInformation("Initializing database at 0.14.0");
Ensure_0_14_0(connection);
logger.LogInformation("Tagging 0.14.1");
PuVersionUpdate("0.14.1", connection);
}
catch (Exception ex)
{
Expand Down
33 changes: 30 additions & 3 deletions dkgServiceNode/Program.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
// Copyright (C) 2024 Maxim [maxirmx] Samsonov (www.sw.consulting)
// All rights reserved.
// This file is a part of dkg service node
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS
// BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

using Microsoft.EntityFrameworkCore;

using dkgServiceNode.Data;
Expand All @@ -13,6 +38,8 @@
var configuration = builder.Configuration;

int controllers = configuration.GetValue<int?>("Controllers") ?? 5;
int bulkInsertLimit = configuration.GetValue<int?>("BulkInsertLimit") ?? 10000;
int queueReparseDelay = configuration.GetValue<int?>("QueueReparseDelay") ?? 1000;

// Add services to the container.

Expand Down Expand Up @@ -50,13 +77,13 @@
builder.Services.AddSingleton<NrhAddProcessor>(serviceProvider =>
{
var logger = serviceProvider.GetRequiredService<ILogger<NrhAddProcessor>>();
return new NrhAddProcessor(connectionString, logger);
return new NrhAddProcessor(connectionString, bulkInsertLimit, queueReparseDelay, logger);
});

builder.Services.AddSingleton<NodeAddProcessor>(serviceProvider =>
{
var logger = serviceProvider.GetRequiredService<ILogger<NodeAddProcessor>>();
return new NodeAddProcessor(connectionString, logger);
return new NodeAddProcessor(connectionString, bulkInsertLimit, queueReparseDelay, logger);
});


Expand All @@ -74,7 +101,7 @@

app.Services
.GetRequiredService<NrhAddProcessor>()
.Start();
.Start(app.Services.GetRequiredService<NodeCompositeContext>());

app.Services
.GetRequiredService<NodeAddProcessor>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace dkgServiceNode.Services.RequestLimitingMiddleware
public class RequestLimitingMiddleware(RequestDelegate next, int maxConcurrentRequests)
{
private readonly RequestDelegate _next = next;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(maxConcurrentRequests, maxConcurrentRequests);
private readonly SemaphoreSlim _semaphore = new(maxConcurrentRequests, maxConcurrentRequests);

public async Task InvokeAsync(HttpContext context)
{
Expand Down
117 changes: 34 additions & 83 deletions dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,75 +1,51 @@
using System.Collections.Concurrent;
using dkgCommon.Constants;
using dkgServiceNode.Data;
// Copyright (C) 2024 Maxim [maxirmx] Samsonov (www.sw.consulting)
// All rights reserved.
// This file is a part of dkg service node
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS
// BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

using System.Collections.Concurrent;
using dkgServiceNode.Models;
using dkgServiceNode.Services.Cache;
using Npgsql;
using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;

namespace dkgServiceNode.Services.RequestProcessors
{
public class NodeAddProcessor : IDisposable
public class NodeAddProcessor : RequestProcessorBase
{
private readonly int _database_reconnect_delay = 3000;
private readonly int _queue_reparse_delay = 1000;
private readonly int _bulk_insert_limit = 10000;

private readonly ConcurrentQueue<Node> requestQueue = new();
private readonly CancellationTokenSource cancellationTokenSource = new();

private Task? backgroundTask = null;
private NodeCompositeContext? ncContext = null;

private volatile bool isRunning = false;
private readonly ILogger logger;
private readonly string connectionString;
private bool disposed = false;

public NodeAddProcessor(
string connectionStr,
int bInsertLimit,
int qReparseDelay,
ILogger<NodeAddProcessor> lgger
)
) : base(connectionStr, bInsertLimit, qReparseDelay, lgger)
{
connectionString = connectionStr;
logger = lgger;
}

public void Start(NodeCompositeContext nContext)
{
if (isRunning)
{
logger.LogWarning("Node Request Processor is already running. 'Start' ignored.");
}
else
{
isRunning = true;
ncContext = nContext;
backgroundTask = Task.Run(ProcessRequests, cancellationTokenSource.Token);
logger.LogInformation("Request Processor has been started.");
}
}

public void Stop()
{
if (!isRunning)
{
logger.LogWarning("Request Processor is not running. 'Stop' ignored.");
}
else
{
cancellationTokenSource.Cancel();
backgroundTask?.Wait();
isRunning = false;
logger.LogInformation("Request Processor has been stopped.");
}
}

public void EnqueueRequest(Node request)
{
requestQueue.Enqueue(request);
}

private async Task ProcessRequests()
protected override async Task ProcessRequests()
{
using var dbConnection = new NpgsqlConnection(connectionString);
while (!cancellationTokenSource.Token.IsCancellationRequested)
Expand All @@ -82,15 +58,15 @@ private async Task ProcessRequests()
catch (Exception ex)
{
logger.LogError("Failed to create database connection: {msg}", ex.Message);
await Task.Delay(_database_reconnect_delay, cancellationTokenSource.Token);
await Task.Delay(databaseReconnectDelay, cancellationTokenSource.Token);
}
}

while (!cancellationTokenSource.Token.IsCancellationRequested)
{
var requests = new List<Node>();

while (requestQueue.TryDequeue(out var request) && requests.Count < _bulk_insert_limit)
while (requestQueue.TryDequeue(out var request) && requests.Count < bulkInsertLimit)
{
requests.Add(request);
}
Expand Down Expand Up @@ -129,34 +105,9 @@ private async Task ProcessRequests()
}
else
{
await Task.Delay(_queue_reparse_delay, cancellationTokenSource.Token);
await Task.Delay(queueReparseDelay, cancellationTokenSource.Token);
}
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
Stop();
cancellationTokenSource.Dispose();
backgroundTask?.Dispose();
}
ncContext = null;
disposed = true;
}
}

~NodeAddProcessor()
{
Dispose(false);
}
}
}
Loading

0 comments on commit 66804d3

Please sign in to comment.