From 2c9c2bd9cf55d8d5d9451ab5f7480fa416ff7fc1 Mon Sep 17 00:00:00 2001 From: Maxim Samsonov Date: Mon, 23 Sep 2024 02:12:27 +0300 Subject: [PATCH] feat: refactor registration sequence (#32) --- dkg-common | 2 +- dkgNodeLibrary/Services/DkgNodeService.cs | 4 +- dkgNodesTests/NodeComparer.Tests.cs | 34 ++-- dkgServiceNode/Controllers/DControllerBase.cs | 42 ++-- dkgServiceNode/Controllers/NodesController.cs | 67 ++++--- dkgServiceNode/Controllers/OpsController.cs | 181 ++++++++---------- .../Controllers/RoundsController.cs | 55 +++--- dkgServiceNode/Controllers/UsersController.cs | 18 +- dkgServiceNode/Data/DbEnsure.cs | 157 ++++++++------- dkgServiceNode/Data/DkgContext.cs | 171 ----------------- dkgServiceNode/Data/NodeCompositeContext.cs | 134 +++++++++++++ dkgServiceNode/Data/NodeContext.cs | 76 ++++++++ dkgServiceNode/Data/RoundContext.cs | 93 +++++++++ dkgServiceNode/Data/UserContext.cs | 16 +- dkgServiceNode/Models/Node.cs | 8 +- dkgServiceNode/Models/NodesRoundHistory.cs | 8 +- dkgServiceNode/Models/Round.cs | 6 + dkgServiceNode/Program.cs | 31 ++- dkgServiceNode/Services/Cache/NodesCache.cs | 78 ++++---- .../Services/Cache/NodesRoundHistoryCache.cs | 38 ++-- .../Services/Initialization/Initializer.cs | 23 ++- .../Services/NodeComparer/NodeComparer.cs | 2 +- .../RequestLimitingMiddleware.cs | 65 ++++--- .../RequestProcessors/NodeAddProcessor.cs | 162 ++++++++++++++++ .../RequestProcessors/NrhAddProcessor.cs | 157 +++++++++++++++ dkgServiceNode/Services/RoundRunner/Runner.cs | 1 - dkgServiceNode/appsettings.Development.json | 2 +- dkgServiceNode/appsettings.json | 2 +- 28 files changed, 1064 insertions(+), 569 deletions(-) delete mode 100644 dkgServiceNode/Data/DkgContext.cs create mode 100644 dkgServiceNode/Data/NodeCompositeContext.cs create mode 100644 dkgServiceNode/Data/NodeContext.cs create mode 100644 dkgServiceNode/Data/RoundContext.cs create mode 100644 dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs create mode 100644 dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs diff --git a/dkg-common b/dkg-common index f271499..bc0aab2 160000 --- a/dkg-common +++ b/dkg-common @@ -1 +1 @@ -Subproject commit f2714994b2523f50d9d7aeb5284e2e3081bf5b2e +Subproject commit bc0aab2b36339436ba00c0bb9746ff7e6c72ed85 diff --git a/dkgNodeLibrary/Services/DkgNodeService.cs b/dkgNodeLibrary/Services/DkgNodeService.cs index 815e263..241f1f7 100644 --- a/dkgNodeLibrary/Services/DkgNodeService.cs +++ b/dkgNodeLibrary/Services/DkgNodeService.cs @@ -204,9 +204,9 @@ public async Task Register(HttpClient httpClient) } else { - Logger.LogInformation("'{Name}': registered with '{ServiceNodeUrl}' [Round {roundId}]", + Logger.LogInformation("'{Name}': waiting registration with '{ServiceNodeUrl}' [Round {roundId}]", Name, ServiceNodeUrl, roundId); - SetStatusOnly(WaitingRoundStart); + // SetStatusOnly(WaitingRoundStart); } } } diff --git a/dkgNodesTests/NodeComparer.Tests.cs b/dkgNodesTests/NodeComparer.Tests.cs index 4250546..239998f 100644 --- a/dkgNodesTests/NodeComparer.Tests.cs +++ b/dkgNodesTests/NodeComparer.Tests.cs @@ -45,12 +45,12 @@ public void Compare_BothNodesNull_ReturnsZero() [Test] public void Compare_FirstNodeNull_ReturnsOne() { - // Arrange + var comparer = new NodeComparer(0, 1, nodesRoundHistoryCache); - var node = new Node() { Id = 100181 }; + var node = new Node() { Address = "100181" }; - var history1 = new NodesRoundHistory { NodeId = 100181, RoundId = 1, NodeRandom = 5 }; - var history2 = new NodesRoundHistory { NodeId = 100182, RoundId = 1, NodeRandom = 3 }; + var history1 = new NodesRoundHistory { NodeAddress = "100181", RoundId = 1, NodeRandom = 5 }; + var history2 = new NodesRoundHistory { NodeAddress = "100182", RoundId = 1, NodeRandom = 3 }; nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history1); nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history2); @@ -61,7 +61,7 @@ public void Compare_FirstNodeNull_ReturnsOne() public void Compare_SecondNodeNull_ReturnsMinusOne() { var comparer = new NodeComparer(0, 1, nodesRoundHistoryCache); - var node = new Node { Id = 1001810 }; + var node = new Node { Address = "1001810" }; var result = comparer.Compare(node, null); Assert.That(result, Is.EqualTo(0)); } @@ -70,11 +70,11 @@ public void Compare_SecondNodeNull_ReturnsMinusOne() public void Compare_BothNodesHaveSameNodeRandom_ReturnsZero() { var comparer = new NodeComparer(0, 1,nodesRoundHistoryCache); - var node1 = new Node { Id = 1001811 }; - var node2 = new Node { Id = 1001812 }; + var node1 = new Node { Address = "1001811" }; + var node2 = new Node { Address = "1001812" }; - var history1 = new NodesRoundHistory { NodeId = 1001811, RoundId = 1, NodeRandom = 5 }; - var history2 = new NodesRoundHistory { NodeId = 1001812, RoundId = 1, NodeRandom = 5 }; + var history1 = new NodesRoundHistory { NodeAddress = "1001811", RoundId = 1, NodeRandom = 5 }; + var history2 = new NodesRoundHistory { NodeAddress = "1001812", RoundId = 1, NodeRandom = 5 }; nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history1); nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history2); @@ -87,11 +87,11 @@ public void Compare_BothNodesHaveSameNodeRandom_ReturnsZero() public void Compare_FirstNodeHasSmallerNodeRandom_ReturnsMinusOne() { var comparer = new NodeComparer(0, 1, nodesRoundHistoryCache); - var node1 = new Node { Id = 1001821 }; - var node2 = new Node { Id = 1001822 }; + var node1 = new Node { Address = "1001821" }; + var node2 = new Node { Address = "1001822" }; - var history1 = new NodesRoundHistory { NodeId = 1001821, RoundId = 1, NodeRandom = 3 }; - var history2 = new NodesRoundHistory { NodeId = 1001822, RoundId = 1, NodeRandom = 5 }; + var history1 = new NodesRoundHistory { NodeAddress = "1001821", RoundId = 1, NodeRandom = 3 }; + var history2 = new NodesRoundHistory { NodeAddress = "1001822", RoundId = 1, NodeRandom = 5 }; nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history1); nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history2); @@ -104,11 +104,11 @@ public void Compare_FirstNodeHasSmallerNodeRandom_ReturnsMinusOne() public void Compare_SecondNodeHasSmallerNodeRandom_ReturnsOne() { var comparer = new NodeComparer(0, 1, nodesRoundHistoryCache); - var node1 = new Node { Id = 1001831 }; - var node2 = new Node { Id = 1001832 }; + var node1 = new Node { Address = "1001831" }; + var node2 = new Node { Address = "1001832" }; - var history1 = new NodesRoundHistory { NodeId = 1001831, RoundId = 1, NodeRandom = 5 }; - var history2 = new NodesRoundHistory { NodeId = 1001832, RoundId = 1, NodeRandom = 3 }; + var history1 = new NodesRoundHistory { NodeAddress = "1001831", RoundId = 1, NodeRandom = 5 }; + var history2 = new NodesRoundHistory { NodeAddress = "1001832", RoundId = 1, NodeRandom = 3 }; nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history1); nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(history2); diff --git a/dkgServiceNode/Controllers/DControllerBase.cs b/dkgServiceNode/Controllers/DControllerBase.cs index 1feb8f3..5f3a7e1 100644 --- a/dkgServiceNode/Controllers/DControllerBase.cs +++ b/dkgServiceNode/Controllers/DControllerBase.cs @@ -29,6 +29,8 @@ using dkgCommon.Constants; using Microsoft.EntityFrameworkCore; using dkgServiceNode.Services.Cache; +using System.Xml.Linq; +using System.Net.NetworkInformation; namespace dkgServiceNode.Controllers { @@ -140,9 +142,10 @@ protected ObjectResult _404CurrentVersion() return StatusCode(StatusCodes.Status404NotFound, new { message = "Failed to found current database version." }); } - protected ObjectResult _404Node(int id) + protected ObjectResult _404Node(string address) { - return _404(id, "Node"); + return StatusCode(StatusCodes.Status404NotFound, + new { message = $"Failed to find node [@{address}]." }); } protected ObjectResult _404Node(string address, string name) { @@ -203,45 +206,32 @@ protected DControllerBase(IHttpContextAccessor httpContextAccessor, UserContext } } - protected async Task ResetNodeState(DkgContext dkgContext, Node node) + protected void ResetNodeState(NodeCompositeContext ncContext, Node node) { - bool needsUpdate = false; - if (node.StatusValue != (short)NStatus.NotRegistered) - { - node.StatusValue = (short)NStatus.NotRegistered; - needsUpdate = true; - } - - if (node.RoundId != null) + if (node.Status != NStatus.NotRegistered || node.RoundId != null) { + node.Status = NStatus.NotRegistered; node.RoundId = null; - needsUpdate = true; - } - - if (needsUpdate) - { - await dkgContext.UpdateNodeAsync(node); + ncContext.UpdateNode(node); } } - protected async Task ResetNodeStates(DkgContext dkgContext, List nodes) + protected void ResetNodeStates(NodeCompositeContext ncContext, List nodes) { - List tasks = []; foreach (var node in nodes) { - tasks.Add(ResetNodeState(dkgContext, node)); + ResetNodeState(ncContext, node); } - await Task.WhenAll(tasks); } - protected async Task UpdateNodeState(DkgContext dkgContext, Node node, short nStatus, int? roundId) + protected void UpdateNodeState(NodeCompositeContext ncContext, Node node, NStatus nStatus, int? roundId) { - if (node.StatusValue != nStatus || node.RoundId != roundId) + if (node.Status != nStatus || node.RoundId != roundId) { - node.StatusValue = nStatus; + node.Status = nStatus; node.RoundId = roundId; - await dkgContext.UpdateNodeAsync(node); + ncContext.UpdateNode(node); } } } -} \ No newline at end of file +} diff --git a/dkgServiceNode/Controllers/NodesController.cs b/dkgServiceNode/Controllers/NodesController.cs index 9fe0dca..c7f4b44 100644 --- a/dkgServiceNode/Controllers/NodesController.cs +++ b/dkgServiceNode/Controllers/NodesController.cs @@ -44,16 +44,21 @@ namespace dkgServiceNode.Controllers public class NodesController : DControllerBase { - protected readonly DkgContext dkgContext; + protected readonly NodeCompositeContext ncContext; + protected readonly NodeContext nodeContext; protected readonly Runner runner; protected readonly ILogger logger; public NodesController(IHttpContextAccessor httpContextAccessor, - UserContext uContext, DkgContext dContext, - Runner rnner, ILogger lgger) : + UserContext uContext, + NodeContext noContext, + NodeCompositeContext nContext, + Runner rnner, + ILogger lgger) : base(httpContextAccessor, uContext) { - dkgContext = dContext; + nodeContext = noContext; + ncContext = nContext; runner = rnner; logger = lgger; } @@ -66,8 +71,8 @@ public ActionResult FetchNodes(NodesFrame nodesFrame) Stopwatch stopwatch = new(); stopwatch.Start(); - var sf = dkgContext.GetNodeCount(); - var nf = dkgContext.GetFilteredNodes(nodesFrame.Search); + var sf = ncContext.GetNodeCount(); + var nf = ncContext.GetFilteredNodes(nodesFrame.Search); if (nodesFrame.SortBy != null && nodesFrame.SortBy.Length > 0) { @@ -80,9 +85,6 @@ public ActionResult FetchNodes(NodesFrame nodesFrame) case "name": nf = sortOrder == "asc" ? [.. nf.OrderBy(n => n.Name)] : [.. nf.OrderByDescending(n => n.Name)]; break; - case "id": - nf = sortOrder == "asc" ? [.. nf.OrderBy(n => n.Id)] : [.. nf.OrderByDescending(n => n.Id)]; - break; case "address": nf = sortOrder == "asc" ? [.. nf.OrderBy(n => n.Address)] : [.. nf.OrderByDescending(n => n.Address)]; break; @@ -112,32 +114,29 @@ public ActionResult FetchNodes(NodesFrame nodesFrame) return res; } - // RESET: api/nodes/reset/5 - [HttpPost("reset/{id}")] + // RESET: api/nodes/reset/123a5 + [HttpPost("reset/{address}")] [ProducesResponseType(StatusCodes.Status204NoContent)] - public async Task ResetNode(int id) + public async Task ResetNode(string address) { Stopwatch stopwatch = new(); stopwatch.Start(); IActionResult res; - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) { res = _403(); } else { - var node = dkgContext.GetNodeById(id); + var node = ncContext.GetNodeByAddress(address); if (node == null) { - res = _404Node(id); + res = _404Node(address); } else { - node.StatusValue = (short)NStatus.NotRegistered; - node.RoundId = null; - await dkgContext.UpdateNodeAsync(node); - + ResetNodeState(ncContext, node); res = NoContent(); } } @@ -146,33 +145,31 @@ public async Task ResetNode(int id) return res; } - // DELETE: api/nodes/5 - [HttpDelete("{id}")] + // DELETE: api/nodes/123a5 + [HttpDelete("{address}")] [ProducesResponseType(StatusCodes.Status204NoContent)] - public async Task DeleteNode(int id) + public async Task DeleteNode(string address) { Stopwatch stopwatch = new(); stopwatch.Start(); IActionResult res; - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) { res = _403(); } else { - - - var node = dkgContext.GetNodeById(id); + var node = ncContext.GetNodeByAddress(address); if (node == null) { - res = _404Node(id); + res = _404Node(address); } else { - await dkgContext.DeleteNodeAsync(node); + await nodeContext.DeleteAsync(node); res = NoContent(); } } @@ -189,23 +186,23 @@ public ActionResult> GetNodes() Stopwatch stopwatch = new(); stopwatch.Start(); - var res = dkgContext.GetAllNodesSortedById(); + var res = ncContext.GetAllNodesSortedById(); stopwatch.Stop(); UpdateE2GetAll(stopwatch.Elapsed); return Ok(res); } - // GET: api/nodes/5 - [HttpGet("{id}")] + // GET: api/nodes/123a5 + [HttpGet("{address}")] [ProducesResponseType(StatusCodes.Status200OK, Type = typeof(Node))] - public ActionResult GetNode(int id) + public ActionResult GetNode(string address) { Stopwatch stopwatch = new(); stopwatch.Start(); - var node = dkgContext.GetNodeById(id); - if (node == null) return _404Node(id); + var node = ncContext.GetNodeByAddress(address); + if (node == null) return _404Node(address); stopwatch.Stop(); UpdateE2Get(stopwatch.Elapsed); @@ -222,7 +219,7 @@ public async Task>> GetStatistics() stopwatch.Start(); ActionResult> res; - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) { res = _403(); diff --git a/dkgServiceNode/Controllers/OpsController.cs b/dkgServiceNode/Controllers/OpsController.cs index 1e1541a..e46a31a 100644 --- a/dkgServiceNode/Controllers/OpsController.cs +++ b/dkgServiceNode/Controllers/OpsController.cs @@ -36,7 +36,6 @@ using Solnet.Wallet; using System.Diagnostics; -using Microsoft.Extensions.Logging; namespace dkgServiceNode.Controllers { @@ -47,16 +46,21 @@ namespace dkgServiceNode.Controllers public class OpsController : DControllerBase { - protected readonly DkgContext dkgContext; + protected readonly RoundContext roundContext; + protected readonly NodeCompositeContext ncContext; protected readonly Runner runner; protected readonly ILogger logger; - public OpsController(IHttpContextAccessor httpContextAccessor, - UserContext uContext, DkgContext dContext, - Runner rnner, ILogger lgger) : - base(httpContextAccessor, uContext) + public OpsController( + IHttpContextAccessor httpContextAccessor, + UserContext uContext, + RoundContext dContext, + NodeCompositeContext nContext, + Runner rnner, + ILogger lgger) : base(httpContextAccessor, uContext) { - dkgContext = dContext; + roundContext = dContext; + ncContext = nContext; runner = rnner; logger = lgger; } @@ -67,14 +71,11 @@ public OpsController(IHttpContextAccessor httpContextAccessor, [AllowAnonymous] [ProducesResponseType(StatusCodes.Status200OK, Type = typeof(StatusResponse))] [ProducesResponseType(StatusCodes.Status403Forbidden, Type = typeof(ErrMessage))] - public async Task> RegisterNode(Node node) + public ActionResult RegisterNode(Node node) { - Stopwatch stopwatch = new(); stopwatch.Start(); - await UpdateRunningRoundsIfNeeded(); - ActionResult res; bool verified = false; @@ -101,77 +102,62 @@ public async Task> RegisterNode(Node node) int? roundId = null; Round? round = null; - List rounds = dkgContext.GetAllRounds().Where(r => r.StatusValue == (short)RStatus.Registration).ToList(); + List rounds = roundContext.GetAllRounds().Where(r => r.StatusValue == (short)RStatus.Registration).ToList(); if (rounds.Count != 0) { + logger.LogDebug("{count} round{s} open for registration", rounds.Count, rounds.Count != 1 ? "s" : ""); round = rounds[new Random().Next(rounds.Count)]; roundId = round.Id; - } - var xNode = dkgContext.GetNodeByAddress(node.Address); - if (xNode == null) - { - node.RoundId = roundId; - if (roundId == null) + var xNode = ncContext.GetNodeByAddress(node.Address); + if (xNode == null) { - node.StatusValue = (short)NStatus.NotRegistered; + logger.LogDebug("Registering new node for round [{roundId}]", roundId); + node.RoundId = roundId; + if (roundId == null) + { + node.StatusValue = (short)NStatus.NotRegistered; + } + else + { + node.Name = node.Name; + node.RoundId = roundId; + node.Status = NStatus.WaitingRegistration; + node.PublicKey = node.PublicKey; + node.CalculateRandom(); + ncContext.RegisterNode(node); + } + xNode = ncContext.GetNodeByAddress(node.Address); } else { - node.CalculateRandom(); - await dkgContext.AddNodeAsync(node); - } - xNode = dkgContext.GetNodeByAddress(node.Address); - } - else - { - bool modified = false; - if (xNode.Name != node.Name) - { - xNode.Name = node.Name; - modified = true; - } + logger.LogDebug("Registering known node for round [{roundId}", roundId); - if (xNode.RoundId != roundId) - { + xNode.Name = node.Name; xNode.RoundId = roundId; - modified = true; - } - - if (xNode.PublicKey != node.PublicKey) - { + xNode.Status = NStatus.WaitingRoundStart; xNode.PublicKey = node.PublicKey; xNode.CalculateRandom(); - modified = true; + ncContext.UpdateNode(xNode, true); } - if (roundId == null) + NodesRoundHistory? lastRoundHistory = null; + if (xNode is not null) { - if (xNode.StatusValue != (short)NStatus.NotRegistered) - { - xNode.StatusValue = (short)NStatus.NotRegistered; - modified = true; - } + lastRoundHistory = ncContext.GetLastNodeRoundHistory(xNode.Address, roundId ?? 0); } - if (modified) - { - await dkgContext.UpdateNodeAsync(xNode); - } - } + logger.LogDebug("Node registration round [{id}] node [{name}] -> status [{ status }]", + (round != null ? round.Id.ToString() : "null"), node.Name, xNode?.Status ?? NStatus.Unknown); - NodesRoundHistory? lastRoundHistory = null; - if (xNode is not null) + res = Ok(CreateStatusResponse(round, lastRoundHistory, xNode?.Status ?? NStatus.Unknown, node.Random)); + } + else { - lastRoundHistory = dkgContext.GetLastNodeRoundHistory(xNode.Id, roundId ?? 0); + logger.LogDebug("No rounds open for registration"); + res = Ok(CreateStatusResponse(null, null, NStatus.NotRegistered, null)); } - - logger.LogDebug("Node registration round [{id}] node [{name}] -> status [{ status }]", - (round != null ? round.Id.ToString() : "null"), node.Name, xNode?.Status ?? NStatus.Unknown); - - res = Ok(CreateStatusResponse(round, lastRoundHistory, xNode?.Status ?? NStatus.Unknown, node.Random)); } - stopwatch.Stop(); UpdateE2Register(stopwatch.Elapsed); @@ -182,7 +168,7 @@ public async Task> RegisterNode(Node node) // Acknowledges that the status report has been received internal async Task Accept(Round? round, Node node, NodesRoundHistory? lastRoundHistory, StatusReport stReport) { - await UpdateNodeState(dkgContext, node, (short)stReport.Status, round?.Id); + UpdateNodeState(ncContext, node, stReport.Status, round?.Id); if (round != null) { await UpdateRoundState(round); @@ -191,6 +177,13 @@ internal async Task Accept(Round? round, Node node, NodesRoundHist return Accepted(CreateStatusResponse(round, lastRoundHistory, stReport.Status, node.Random)); } + internal async Task TrToWaitingRoundStartConditional(Round? round, Node node, NodesRoundHistory? lastRoundHistory, StatusReport stReport) + { + await Task.Delay(0); + return Ok(CreateStatusResponse(round, lastRoundHistory, node.Status, node.Random)); + } + + internal async Task TrToNotRegistered(Round? round, Node node, NodesRoundHistory? lastRoundHistory, StatusReport stReport) { if (round != null) @@ -198,7 +191,8 @@ internal async Task TrToNotRegistered(Round? round, Node node, Nod runner.SetNoResult(round, node); } - await ResetNodeState(dkgContext, node); + ResetNodeState(ncContext, node); + await Task.Delay(0); var response = CreateStatusResponse(round, lastRoundHistory, NStatus.NotRegistered, node.Random); if (stReport.Status != NStatus.NotRegistered || stReport.RoundId != 0) { @@ -216,7 +210,7 @@ internal async Task TrToRunningStepOne(Round? round, Node node, No if (!runner.CheckNode(round, node)) { - await ResetNodeState(dkgContext, node); + ResetNodeState(ncContext, node); var response = CreateStatusResponse(round, lastRoundHistory, NStatus.NotRegistered, node.Random); if (stReport.Status != NStatus.NotRegistered || stReport.RoundId != 0) { @@ -241,7 +235,7 @@ internal async Task TrToRunningStepTwoConditional(Round? round, No if (runner.CheckTimedOutNode(round, node)) { - await UpdateNodeState(dkgContext, node, (short)NStatus.TimedOut, round.Id); + UpdateNodeState(ncContext, node, NStatus.TimedOut, round.Id); var response = CreateStatusResponse(round, lastRoundHistory, NStatus.TimedOut, node.Random); if (stReport.Status != NStatus.TimedOut) { @@ -250,7 +244,7 @@ internal async Task TrToRunningStepTwoConditional(Round? round, No } runner.SetStepTwoWaitingTime(round); - await UpdateNodeState(dkgContext, node, (short)stReport.Status, round?.Id); + UpdateNodeState(ncContext, node, stReport.Status, round?.Id); if (stReport.Data.Length != 0) { @@ -274,7 +268,7 @@ internal async Task TrToRunningStepThreeConditional(Round? round, if (runner.CheckTimedOutNode(round, node)) { - await UpdateNodeState(dkgContext, node, (short)NStatus.TimedOut, round.Id); + UpdateNodeState(ncContext, node, NStatus.TimedOut, round.Id); var response = CreateStatusResponse(round, lastRoundHistory, NStatus.TimedOut, node.Random); if (stReport.Status != NStatus.TimedOut) { @@ -283,7 +277,7 @@ internal async Task TrToRunningStepThreeConditional(Round? round, } runner.SetStepThreeWaitingTime(round); - await UpdateNodeState(dkgContext, node, (short)stReport.Status, round?.Id); + UpdateNodeState(ncContext, node, stReport.Status, round?.Id); if (stReport.Data.Length != 0) { @@ -305,7 +299,8 @@ internal async Task WrongStatus(Round? round, Node node, NodesRoun runner.SetNoResult(round, node); } - await ResetNodeState(dkgContext, node); + ResetNodeState(ncContext, node); + await Task.Delay(0); string rStatus = round == null ? "null" : GetRoundStatusById(round.StatusValue).ToString(); return _409Status(stReport.Address, stReport.Name, GetNodeStatusById(stReport.Status).ToString(), rStatus); } @@ -327,14 +322,14 @@ internal async Task AcceptFinished(Round? round, Node node, NodesR if (stReport.Data.Length != 0) { runner.SetResult(round, node, stReport.Data); - await UpdateNodeState(dkgContext, node, (short)stReport.Status, round.Id); + UpdateNodeState(ncContext, node, stReport.Status, round.Id); await UpdateRoundState(round); return Accepted(CreateStatusResponse(round, lastRoundHistory, stReport.Status, node.Random)); } else { runner.SetNoResult(round, node); - await UpdateNodeState(dkgContext, node, (short)stReport.Status, round.Id); + UpdateNodeState(ncContext, node, stReport.Status, round.Id); await UpdateRoundState(round); return _400NoResult(round.Id, node.Name, node.PublicKey); } @@ -350,7 +345,7 @@ internal async Task AcceptFailed(Round? round, Node node, NodesRou runner.SetResultWaitingTime(round); runner.SetNoResult(round, node); - await UpdateNodeState(dkgContext, node, (short)stReport.Status, round.Id); + UpdateNodeState(ncContext, node, stReport.Status, round.Id); await UpdateRoundState(round); return Accepted(CreateStatusResponse(round, lastRoundHistory, stReport.Status, node.Random)); @@ -363,7 +358,7 @@ internal StatusResponse CreateStatusResponse(Round? round, int roundId = round != null ? round.Id : 0; RStatus roundStatus = round != null ? (RStatus)round.StatusValue : RStatus.Unknown; int lastRoundId = lastRoundHistory?.RoundId ?? 0; - Round? lastRound = lastRoundId == 0 ? null : dkgContext.GetRoundById(lastRoundId); + Round? lastRound = lastRoundId == 0 ? null : roundContext.GetRoundById(lastRoundId); RStatus lastRoundStatus = lastRound != null ? (RStatus)lastRound.StatusValue : RStatus.Unknown; NStatus lastNodeStatus = lastRoundHistory != null ? (NStatus)lastRoundHistory.NodeFinalStatus : NStatus.Unknown; int? lastRoundResult = lastRound?.Result; @@ -408,9 +403,20 @@ public async Task> Status(StatusReport statusReport { (RStatus.Failed, NStatus.NotRegistered), WrongStatus }, { (RStatus.Unknown, NStatus.NotRegistered), WrongStatus }, + { (null, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.NotStarted, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.Registration, NStatus.WaitingRegistration), TrToWaitingRoundStartConditional }, + { (RStatus.CreatingDeals, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.ProcessingDeals, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.ProcessingResponses, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.Finished, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.Cancelled, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.Failed, NStatus.WaitingRegistration), WrongStatus }, + { (RStatus.Unknown, NStatus.WaitingRegistration), WrongStatus }, + { (null, NStatus.WaitingRoundStart), WrongStatus }, { (RStatus.NotStarted, NStatus.WaitingRoundStart), WrongStatus }, - { (RStatus.Registration, NStatus.WaitingRoundStart), Accept }, + { (RStatus.Registration, NStatus.WaitingRoundStart), TrToWaitingRoundStartConditional }, { (RStatus.CreatingDeals, NStatus.WaitingRoundStart), TrToRunningStepOne }, { (RStatus.ProcessingDeals, NStatus.WaitingRoundStart), TrToNotRegistered }, { (RStatus.ProcessingResponses, NStatus.WaitingRoundStart), TrToNotRegistered }, @@ -509,15 +515,15 @@ public async Task> Status(StatusReport statusReport }; - var node = dkgContext.GetNodeByAddress(statusReport.Address); + var node = ncContext.GetNodeByAddress(statusReport.Address); if (node == null) { res = _404Node(statusReport.Address, statusReport.Name); } else { - var round = statusReport.RoundId == 0 ? null : dkgContext.GetRoundById(statusReport.RoundId); - var lastRoundHistory = dkgContext.GetLastNodeRoundHistory(node.Id, statusReport.RoundId); + var round = statusReport.RoundId == 0 ? null : roundContext.GetRoundById(statusReport.RoundId); + var lastRoundHistory = ncContext.GetLastNodeRoundHistory(node.Address, statusReport.RoundId); RStatus? rStatus = null; if (round != null) @@ -579,33 +585,12 @@ internal async Task UpdateRoundState(Round round) round.ModifiedOn = DateTime.Now.ToUniversalTime(); try { - await dkgContext.UpdateRoundAsync(round); + await roundContext.UpdateRoundAsync(round); } catch { } } } - - private static DateTime lastUpdateRunningRoundsTime = DateTime.MinValue; - private static readonly TimeSpan updateRunningRoundsInterval = TimeSpan.FromSeconds(120); - - internal async Task UpdateRunningRounds() - { - var rounds = dkgContext.GetAllRounds().Where(r => RoundStatus.IsRunning(r.Status)); - foreach (var round in rounds) - { - await UpdateRoundState(round); - } - } - private async Task UpdateRunningRoundsIfNeeded() - { - if (DateTime.Now - lastUpdateRunningRoundsTime >= updateRunningRoundsInterval) - { - await UpdateRunningRounds(); - lastUpdateRunningRoundsTime = DateTime.Now; - } - } - } } \ No newline at end of file diff --git a/dkgServiceNode/Controllers/RoundsController.cs b/dkgServiceNode/Controllers/RoundsController.cs index d0d1ecb..7253414 100644 --- a/dkgServiceNode/Controllers/RoundsController.cs +++ b/dkgServiceNode/Controllers/RoundsController.cs @@ -44,20 +44,23 @@ namespace dkgServiceNode.Controllers public class RoundsController : DControllerBase { - protected readonly DkgContext dkgContext; + protected readonly RoundContext roundContext; + protected readonly NodeCompositeContext ncContext; protected readonly Runner runner; protected readonly ILogger logger; private readonly NodesRoundHistoryCache nodesRoundHistoryCache; public RoundsController(IHttpContextAccessor httpContextAccessor, UserContext uContext, - DkgContext dContext, + RoundContext rContext, + NodeCompositeContext nContext, Runner rnner, NodesRoundHistoryCache nrhc, ILogger lgger) : base(httpContextAccessor, uContext) { - dkgContext = dContext; + roundContext = rContext; + ncContext = nContext; runner = rnner; logger = lgger; nodesRoundHistoryCache = nrhc; @@ -68,11 +71,12 @@ public RoundsController(IHttpContextAccessor httpContextAccessor, [ProducesResponseType(StatusCodes.Status200OK, Type = typeof(IEnumerable))] public ActionResult> GetRounds() { - var rounds = dkgContext.GetAllRoundsSortedByIdDescending(); + var rounds = roundContext.GetAllRoundsSortedByIdDescending(); foreach (var round in rounds) { - int nodeCountWaitingRoundStart = nodesRoundHistoryCache.GetNodeCountForRound(round.Id, NStatus.WaitingRoundStart); + round.NodeCountWRegistration = nodesRoundHistoryCache.GetNodeCountForRound(round.Id, NStatus.WaitingRegistration); + round.NodeCountWRoundStart = nodesRoundHistoryCache.GetNodeCountForRound(round.Id, NStatus.WaitingRoundStart); round.NodeCountStepOne = nodesRoundHistoryCache.GetNodeCountForRound(round.Id, NStatus.RunningStepOne); round.NodeCountWStepTwo = nodesRoundHistoryCache.GetNodeCountForRound(round.Id, NStatus.WaitingStepTwo); round.NodeCountStepTwo = nodesRoundHistoryCache.GetNodeCountForRound(round.Id, NStatus.RunningStepTwo); @@ -92,7 +96,8 @@ public ActionResult> GetRounds() round.NodeCountFailed + round.NodeCountFinished + round.NodeCountTimedOut + - nodeCountWaitingRoundStart; + round.NodeCountWRoundStart + + round.NodeCountWRegistration; round.NodeCount = eaCount; } @@ -106,9 +111,9 @@ public ActionResult> GetRounds() [ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(ErrMessage))] public ActionResult GetRound(int id) { - var round = dkgContext.GetRoundById(id); + var round = roundContext.GetRoundById(id); if (round == null) return _404Round(id); - round.NodeCount = dkgContext.GetAllNodes().Count(n => n.RoundId == round.Id); + round.NodeCount = ncContext.GetAllNodes().Count(n => n.RoundId == round.Id); return round; } @@ -119,7 +124,7 @@ public ActionResult GetRound(int id) [ProducesResponseType(StatusCodes.Status403Forbidden, Type = typeof(ErrMessage))] public async Task> AddRound(RoundSettings roundSettings) { - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) return _403(); Round round = new() @@ -130,7 +135,7 @@ public async Task> AddRound(RoundSettings roundSettings) TimeoutR = roundSettings.TimeoutR }; - await dkgContext.AddRoundAsync(round); + await roundContext.AddRoundAsync(round); var reference = new Reference(round.Id); return CreatedAtAction(nameof(AddRound), new { id = round.Id }, reference); @@ -144,10 +149,10 @@ public async Task> AddRound(RoundSettings roundSettings) [ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(ErrMessage))] public async Task> NextRoundStep(int id) { - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) return _403(); - Round? round = dkgContext.GetRoundById(id); + Round? round = roundContext.GetRoundById(id); if (round == null) return _404Round(id); round.ModifiedOn = DateTime.Now.ToUniversalTime(); @@ -182,7 +187,7 @@ public async Task> NextRoundStep(int id) break; } - return await UpdateRoundState(dkgContext, round); + return await UpdateRoundState(roundContext, round); } // POST: api/rounds/cancel/5 @@ -193,10 +198,10 @@ public async Task> NextRoundStep(int id) [ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(ErrMessage))] public async Task> CancelRound(int id) { - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) return _403(); - Round? round = dkgContext.GetRoundById(id); + Round? round = roundContext.GetRoundById(id); if (round == null) return _404Round(id); round.ModifiedOn = DateTime.Now.ToUniversalTime(); @@ -204,52 +209,54 @@ public async Task> CancelRound(int id) round.Status = RoundStatusConstants.Cancelled; runner.CancelRound(round); - return await UpdateRoundState(dkgContext, round); + return await UpdateRoundState(roundContext, round); } internal async Task TryRunRound(Round round) { - List rNodes = dkgContext.GetAllNodes() + List rNodes = ncContext.GetAllNodes() .Where(n => n.RoundId == round.Id && n.Status == NStatus.WaitingRoundStart) .ToList(); List fiNodes = rNodes - .Where(node => dkgContext.CheckNodeQualification(node.Id, round.Id - 1)) + .Where(node => ncContext.CheckNodeQualification(node.Address, round.Id - 1)) .ToList(); + await Task.Delay(0); + if (fiNodes.Count < 3) { logger.LogWarning("Not enough nodes has been qualified to start a round. Count = {count}, minimum = 3", fiNodes.Count); - await ResetNodeStates(dkgContext, rNodes); + ResetNodeStates(ncContext, rNodes); round.Result = null; round.StatusValue = (short)RStatus.Failed; } else { List reNodes = rNodes.Except(fiNodes).ToList(); - await ResetNodeStates(dkgContext, reNodes); + ResetNodeStates(ncContext, reNodes); List fi2Nodes = fiNodes; reNodes = []; if (round.MaxNodes < fiNodes.Count) { - int lastRR = dkgContext.LastRoundResult() ?? new Random().Next(); + int lastRR = roundContext.LastRoundResult() ?? new Random().Next(); fiNodes.Sort(new NodeComparer(lastRR, round.Id - 1, nodesRoundHistoryCache)); fi2Nodes = fiNodes.Take(round.MaxNodes).ToList(); reNodes = fiNodes.Skip(round.MaxNodes).ToList(); } runner.RunRound(round, fi2Nodes); - await ResetNodeStates(dkgContext, reNodes); + ResetNodeStates(ncContext, reNodes); } } - internal async Task> UpdateRoundState(DkgContext dkgContext, Round round) + internal async Task> UpdateRoundState(RoundContext roundContext, Round round) { try { - await dkgContext.UpdateRoundAsync(round); + await roundContext.UpdateRoundAsync(round); } catch { diff --git a/dkgServiceNode/Controllers/UsersController.cs b/dkgServiceNode/Controllers/UsersController.cs index cb9a41a..e5e9b7a 100644 --- a/dkgServiceNode/Controllers/UsersController.cs +++ b/dkgServiceNode/Controllers/UsersController.cs @@ -45,20 +45,20 @@ public UsersController(IHttpContextAccessor httpContextAccessor, UserContext uCo [HttpGet] public async Task>> GetUsers() { - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) return _403(); - return await userContext.UserViewItems(); + return await userContext.UserViewItemsAsync(); } // GET: api/users/5 [HttpGet("{id}")] public async Task> GetUser(int id) { - var ch = await userContext.CheckAdminOrSameUser(id, curUserId); + var ch = await userContext.CheckAdminOrSameUserAsync(id, curUserId); if (ch == null || !ch.Value) return _403(); - var user = await userContext.UserViewItem(id); + var user = await userContext.UserViewItemAsync(id); return (user == null) ? _404User(id) : user; } @@ -66,10 +66,10 @@ public async Task> GetUser(int id) [HttpPost("add")] public async Task> AddUser(User user) { - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) return _403(); - if (await userContext.Exists(user.Email)) return _409Email(user.Email); + if (await userContext.ExistsAsync(user.Email)) return _409Email(user.Email); user.Password = BCrypt.Net.BCrypt.HashPassword(user.Password); @@ -92,8 +92,8 @@ public async Task UpdateUser(int id, UserUpdateItem update) bool adminRequired = (user.IsEnabled != update.IsEnabled) || (user.IsAdmin != update.IsAdmin); ActionResult ch; - ch = adminRequired ? await userContext.CheckAdmin(curUserId) : - await userContext.CheckAdminOrSameUser(id, curUserId); + ch = adminRequired ? await userContext.CheckAdminAsync(curUserId) : + await userContext.CheckAdminOrSameUserAsync(id, curUserId); if (ch == null || !ch.Value) return _403(); user.Name = update.Name; @@ -115,7 +115,7 @@ public async Task DeleteUser(int id) { if (id==1) return _403Protect(); - var ch = await userContext.CheckAdmin(curUserId); + var ch = await userContext.CheckAdminAsync(curUserId); if (ch == null || !ch.Value) return _403(); var user = await userContext.Users.FindAsync(id); diff --git a/dkgServiceNode/Data/DbEnsure.cs b/dkgServiceNode/Data/DbEnsure.cs index 9cf497c..a3d718e 100644 --- a/dkgServiceNode/Data/DbEnsure.cs +++ b/dkgServiceNode/Data/DbEnsure.cs @@ -29,7 +29,7 @@ namespace dkgServiceNode.Data { public static class DbEnsure { - readonly static string sqlScript_0_12_1 = @" + readonly static string sqlScript_0_14_0 = @" START TRANSACTION; DROP TABLE IF EXISTS ""users""; @@ -67,32 +67,25 @@ public static class DbEnsure DROP TABLE IF EXISTS ""nodes""; CREATE TABLE ""nodes"" ( - ""id"" SERIAL PRIMARY KEY, - ""address"" VARCHAR(128) NOT NULL, - ""name"" VARCHAR(64) NOT NULL DEFAULT '--', - ""public_key"" VARCHAR(128) NOT NULL DEFAULT '', - ""status"" SMALLINT NOT NULL DEFAULT 0, - ""random"" INTEGER, - ""round_id"" INTEGER REFERENCES ""rounds"" (""id"") ON DELETE RESTRICT + ""address"" VARCHAR(128) NOT NULL PRIMARY KEY, + ""name"" VARCHAR(64) NOT NULL DEFAULT '--' ); - CREATE UNIQUE INDEX ""idx_nodes_address"" ON ""nodes"" (""address""); - DROP TABLE IF EXISTS ""nodes_round_history""; CREATE TABLE ""nodes_round_history"" ( ""id"" SERIAL PRIMARY KEY, ""round_id"" INTEGER NOT NULL REFERENCES ""rounds"" (""id"") ON DELETE CASCADE, - ""node_id"" INTEGER NOT NULL REFERENCES ""nodes"" (""id"") ON DELETE CASCADE, + ""node_address"" VARCHAR(128) NOT NULL REFERENCES ""nodes"" (""address"") ON DELETE CASCADE, ""node_final_status"" SMALLINT NOT NULL DEFAULT 0, ""node_random"" INTEGER ); CREATE INDEX ""idx_nodes_round_history_round_id"" ON ""nodes_round_history"" (""round_id""); - CREATE INDEX ""idx_nodes_round_history_node_id"" ON ""nodes_round_history"" (""node_id""); + CREATE INDEX ""idx_nodes_round_history_node_address"" ON ""nodes_round_history"" (""node_address""); CREATE OR REPLACE PROCEDURE upsert_node_round_history( - p_node_id INT, + p_node_address VARCHAR(128), p_round_id INT, p_node_final_status SMALLINT, p_node_random INT @@ -104,72 +97,106 @@ LANGUAGE plpgsql IF EXISTS( SELECT 1 FROM nodes_round_history - WHERE node_id = p_node_id AND round_id = p_round_id + WHERE node_address = p_node_address AND round_id = p_round_id ) THEN -- Update the existing record and return it UPDATE nodes_round_history SET node_final_status = p_node_final_status, node_random = p_node_random - WHERE node_id = p_node_id AND round_id = p_round_id; + WHERE node_address = p_node_address AND round_id = p_round_id; ELSE -- Insert a new record and return it - INSERT INTO nodes_round_history(node_id, round_id, node_final_status, node_random) - VALUES(p_node_id, p_round_id, p_node_final_status, p_node_random); + INSERT INTO nodes_round_history(node_address, round_id, node_final_status, node_random) + VALUES(p_node_address, p_round_id, p_node_final_status, p_node_random); END IF; END; $$; - DROP TABLE IF EXISTS ""versions""; - - CREATE TABLE ""versions"" ( - ""id"" SERIAL PRIMARY KEY, - ""version"" VARCHAR(16) NOT NULL, - ""date"" DATE NOT NULL DEFAULT now() - ); - - INSERT INTO ""versions"" (""version"", ""date"") VALUES - ('0.12.1', '" + DateTime.Now.ToString("yyyy-MM-dd") + @"'); + CREATE OR REPLACE PROCEDURE bulk_upsert_node_round_history( + p_items JSON + ) + LANGUAGE plpgsql + AS $$ + DECLARE + item JSON; + node_address VARCHAR(128); + round_id INT; + node_final_status SMALLINT; + node_random INT; + BEGIN + FOR item IN SELECT * FROM json_array_elements(p_items) + LOOP + node_address := (item->>'node_address')::VARCHAR(128); + round_id := (item->>'round_id')::INT; + node_final_status := (item->>'node_final_status')::SMALLINT; + node_random := (item->>'node_random')::INT; + + -- Call the existing upsert_node_round_history procedure + CALL upsert_node_round_history(node_address, round_id, node_final_status, node_random); + END LOOP; + END; + $$; - COMMIT; - "; + CREATE OR REPLACE PROCEDURE insert_node_with_round_history( + p_node_address VARCHAR(128), + p_node_name VARCHAR(64), + p_round_id INT, + p_node_final_status SMALLINT, + p_node_random INT + ) + LANGUAGE plpgsql + AS $$ + BEGIN + -- + INSERT INTO nodes(address, name) + VALUES(p_node_address, p_node_name) + ON CONFLICT (address) DO NOTHING; - readonly static string sqlScript_0_13_0 = @" - START TRANSACTION; + -- + INSERT INTO nodes_round_history(node_address, round_id, node_final_status, node_random) + VALUES(p_node_address, p_round_id, p_node_final_status, p_node_random); - ALTER TABLE nodes - DROP COLUMN IF EXISTS public_key, - DROP COLUMN IF EXISTS status, - DROP COLUMN IF EXISTS random; + END; + $$; - CREATE OR REPLACE FUNCTION get_node_counts() - RETURNS TABLE (round_id INT, status INT, count INT) AS $$ + CREATE OR REPLACE PROCEDURE bulk_insert_node_with_round_history( + p_items JSON + ) + LANGUAGE plpgsql + AS $$ + DECLARE + item JSON; + node_address VARCHAR(128); + node_name VARCHAR(64); + round_id INT; + node_final_status SMALLINT; + node_random INT; BEGIN - RETURN QUERY - SELECT - round_id, - node_final_status AS status, - COUNT(*) AS count - FROM - nodes_round_history - GROUP BY - round_id, - node_final_status; + FOR item IN SELECT * FROM json_array_elements(p_items) + LOOP + node_address := (item->>'node_address')::VARCHAR(128); + node_name := (item->>'node_name')::VARCHAR(64); + round_id := (item->>'round_id')::INT; + node_final_status := (item->>'node_final_status')::SMALLINT; + node_random := (item->>'node_random')::INT; + + -- Call the existing insert_node_with_round_history procedure + CALL insert_node_with_round_history(node_address, node_name, round_id, node_final_status, node_random); + END LOOP; END; - $$ LANGUAGE plpgsql; - - INSERT INTO ""versions"" (""version"", ""date"") VALUES - ('0.13.0', '" + DateTime.Now.ToString("yyyy-MM-dd") + @"'); - - COMMIT; - "; + $$; - readonly static string sqlScript_0_13_1 = @" - START TRANSACTION; + DROP TABLE IF EXISTS ""versions""; - DROP FUNCTION IF EXISTS update_nodes_round_history(); + CREATE TABLE ""versions"" ( + ""id"" SERIAL PRIMARY KEY, + ""version"" VARCHAR(16) NOT NULL, + ""date"" DATE NOT NULL DEFAULT now() + ); INSERT INTO ""versions"" (""version"", ""date"") VALUES - ('0.13.1', '" + DateTime.Now.ToString("yyyy-MM-dd") + @"'); + ('0.14.0', '" + DateTime.Now.ToString("yyyy-MM-dd") + @"'); + COMMIT; "; @@ -194,7 +221,7 @@ private static bool VCheck(string v, NpgsqlConnection connection) return (rows != null && (long)rows != 0); } - public static int Ensure_0_12_1(NpgsqlConnection connection) + public static int Ensure_0_14_0(NpgsqlConnection connection) { // Check if table 'versions' exists var sql = "SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'versions';"; @@ -205,14 +232,14 @@ public static int Ensure_0_12_1(NpgsqlConnection connection) if (rows != null && (long)rows != 0) { - sql = "SELECT COUNT(*) FROM versions WHERE version = '0.12.1';"; + sql = "SELECT COUNT(*) FROM versions WHERE version = '0.14.0';"; command = new NpgsqlCommand(sql, connection); rows = command.ExecuteScalar(); } if (rows == null || (long)rows == 0) { - var scriptCommand = new NpgsqlCommand(sqlScript_0_12_1, connection); + var scriptCommand = new NpgsqlCommand(sqlScript_0_14_0, connection); r = scriptCommand.ExecuteNonQuery(); } @@ -239,12 +266,8 @@ public static void Ensure(NpgsqlConnection connection, ILogger logger) { try { - logger.LogInformation("Initializing database at 0.12.1"); - Ensure_0_12_1(connection); - logger.LogInformation("Update to 0.13.0"); - EnsureVersion("0.13.0", sqlScript_0_13_0, connection); - logger.LogInformation("Update to 0.13.1"); - EnsureVersion("0.13.1", sqlScript_0_13_1, connection); + logger.LogInformation("Initializing database at 0.14.0"); + Ensure_0_14_0(connection); } catch (Exception ex) { diff --git a/dkgServiceNode/Data/DkgContext.cs b/dkgServiceNode/Data/DkgContext.cs deleted file mode 100644 index 3f07e3b..0000000 --- a/dkgServiceNode/Data/DkgContext.cs +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright (C) 2024 Maxim [maxirmx] Samsonov (www.sw.consulting) -// All rights reserved. -// This file is a part of dkg service node -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions -// are met: -// 1. Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// 2. Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED -// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS -// BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. - -using dkgCommon.Constants; -using dkgServiceNode.Models; -using dkgServiceNode.Services.Cache; -using Microsoft.EntityFrameworkCore; - -namespace dkgServiceNode.Data -{ - public class DkgContext : DbContext - { - private DbSet Nodes { get; set; } - private DbSet Rounds { get; set; } - private DbSet NodesRoundHistory { get; set; } - - private readonly NodesCache nodesCache; - private readonly RoundsCache roundsCache; - private readonly NodesRoundHistoryCache nodesRoundHistoryCache; - - private readonly ILogger logger; - public DkgContext(DbContextOptions options, - NodesCache nc, - RoundsCache rc, - NodesRoundHistoryCache nrhc, - ILogger lggr) : base(options) - { - nodesCache = nc; - roundsCache = rc; - nodesRoundHistoryCache = nrhc; - - logger = lggr; - } - public Node? GetNodeById(int id) => nodesCache.GetNodeById(id); - public Node? GetNodeByAddress(string address) => nodesCache.GetNodeByAddress(address); - public async Task AddNodeAsync(Node node) - { - try - { - Nodes.Add(node); - await SaveChangesAsync(); - nodesCache.SaveNodeToCache(node); - await SaveNodesRoundHistory(node); - nodesRoundHistoryCache.UpdateNodeCounts(null, NStatus.NotRegistered, node.RoundId, node.Status); - } - catch (Exception ex) - { - logger.LogError("Error adding node: {msg}", ex.Message); - } - } - - private async Task SaveNodesRoundHistory(Node node) - { - if (node.RoundId != null && node.Random != null) - { - nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(new NodesRoundHistory(node)); - - await Database.ExecuteSqlRawAsync( - "CALL upsert_node_round_history({0}, {1}, {2}, {3})", - node.Id, node.RoundId, node.StatusValue, node.Random); - } - } - public List GetAllNodes() => nodesCache.GetAllNodes(); - - public int GetNodeCount() => nodesCache.GetNodeCount(); - public List GetAllNodesSortedById() => nodesCache.GetAllNodesSortedById(); - public List GetFilteredNodes(string search = "") => nodesCache.GetFilteredNodes(search); - public async Task UpdateNodeAsync(Node node) - { - try - { - NodesRoundHistory? nrh = GetLastNodeRoundHistory(node.Id, -1); - nodesRoundHistoryCache.UpdateNodeCounts(nrh?.RoundId, nrh != null ? nrh.NodeFinalStatus : NStatus.NotRegistered, - node.RoundId, node.Status); - - await SaveNodesRoundHistory(node); - nodesCache.UpdateNodeInCache(node); - } - catch (Exception ex) - { - logger.LogError("Error updating node: {msg}", ex.Message); - } - } - - public async Task DeleteNodeAsync(Node node) - { - try - { - nodesRoundHistoryCache.UpdateNodeCounts(node.RoundId, node.Status, null, NStatus.NotRegistered); - Nodes.Remove(node); - await SaveChangesAsync(); - nodesCache.DeleteNodeFromCache(node); - } - catch (Exception ex) - { - logger.LogError("Error deleting node: {msg}", ex.Message); - } - } - - public Round? GetRoundById(int id) => roundsCache.GetRoundById(id); - public List GetAllRounds() => roundsCache.GetAllRounds(); - public List GetAllRoundsSortedByIdDescending() => roundsCache.GetAllRoundsSortedByIdDescending(); - public async Task AddRoundAsync(Round round) - { - try - { - Rounds.Add(round); - await SaveChangesAsync(); - roundsCache.SaveRoundToCache(round); - } - catch (Exception ex) - { - logger.LogError("Error adding round: {msg}", ex.Message); - } - } - public async Task UpdateRoundAsync(Round round) - { - try - { - Rounds.Update(round); - await SaveChangesAsync(); - roundsCache.UpdateRoundInCache(round); - } - catch (Exception ex) - { - logger.LogError("Error updating round: {msg}", ex.Message); - } - } - public async Task DeleteRoundAsync(Round round) - { - try - { - Rounds.Remove(round); - await SaveChangesAsync(); - roundsCache.DeleteRoundFromCache(round.Id); - } - catch (Exception ex) - { - logger.LogError("Error deleting round: {msg}", ex.Message); - } - } - public bool RoundExists(int id) => roundsCache.RoundExists(id); - public int? LastRoundResult() => roundsCache.LastRoundResult(); - public NodesRoundHistory? GetLastNodeRoundHistory(int nodeId, int RoundId) => nodesRoundHistoryCache.GetLastNodeRoundHistory(nodeId, RoundId); - public bool CheckNodeQualification(int nodeId, int previousRoundId) => nodesRoundHistoryCache.CheckNodeQualification(nodeId, previousRoundId); - public int? GetNodeRandomForRound(int nodeId, int roundId) => nodesRoundHistoryCache.GetNodeRandomForRound(nodeId, roundId); - - } -} diff --git a/dkgServiceNode/Data/NodeCompositeContext.cs b/dkgServiceNode/Data/NodeCompositeContext.cs new file mode 100644 index 0000000..bf8b133 --- /dev/null +++ b/dkgServiceNode/Data/NodeCompositeContext.cs @@ -0,0 +1,134 @@ +// Copyright (C) 2024 Maxim [maxirmx] Samsonov (www.sw.consulting) +// All rights reserved. +// This file is a part of dkg service node +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS +// BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +using dkgCommon.Constants; +using dkgServiceNode.Models; +using dkgServiceNode.Services.Cache; +using dkgServiceNode.Services.RequestProcessors; + +namespace dkgServiceNode.Data +{ + public class NodeCompositeContext + { + private readonly NodesCache nodesCache; + private readonly NodesRoundHistoryCache nodesRoundHistoryCache; + private readonly NodeAddProcessor nodeRequestProcessor; + private readonly NrhAddProcessor nrhRequestProcessor; + + private readonly ILogger logger; + public NodeCompositeContext( + NodesCache nc, + NodesRoundHistoryCache nrhc, + NodeAddProcessor nodep, + NrhAddProcessor nrhp, + ILogger lggr) + { + nodesCache = nc; + nodesRoundHistoryCache = nrhc; + nodeRequestProcessor = nodep; + nrhRequestProcessor = nrhp; + + logger = lggr; + } + public Node? GetNodeByAddress(string address) => nodesCache.GetNodeByAddress(address); + public List GetAllNodes() => nodesCache.GetAllNodes(); + public int GetNodeCount() => nodesCache.GetNodeCount(); + public List GetAllNodesSortedById() => nodesCache.GetAllNodesSortedById(); + public List GetFilteredNodes(string search = "") => nodesCache.GetFilteredNodes(search); + public NodesRoundHistory? GetLastNodeRoundHistory(string nodeAddress, int RoundId) => nodesRoundHistoryCache.GetLastNodeRoundHistory(nodeAddress, RoundId); + public bool CheckNodeQualification(string nodeAddress, int previousRoundId) => nodesRoundHistoryCache.CheckNodeQualification(nodeAddress, previousRoundId); + public int? GetNodeRandomForRound(string nodeAddress, int roundId) => nodesRoundHistoryCache.GetNodeRandomForRound(nodeAddress, roundId); + public void RegisterNode(Node node) + { + try + { + nodeRequestProcessor.EnqueueRequest(new Node(node)); + nodesCache.SaveNodeToCache(node); + SaveNodesRoundHistory(node); + nodesRoundHistoryCache.UpdateNodeCounts(null, NStatus.NotRegistered, node.RoundId, node.Status); + } + catch (Exception ex) + { + logger.LogError("Error adding node: {msg}", ex.Message); + } + } + public void UpdateNode(Node node, bool fullUpdate = false, bool regCompletion = false) + { + try + { + NodesRoundHistory? nrh = GetLastNodeRoundHistory(node.Address, -1); + if (regCompletion) + { + nodesRoundHistoryCache.UpdateNodeCounts( + node.RoundId, + NStatus.WaitingRegistration, + node.RoundId, + node.Status); + + } + else + { + nodesRoundHistoryCache.UpdateNodeCounts( + nrh?.RoundId, + nrh != null ? nrh.NodeFinalStatus : NStatus.NotRegistered, + node.RoundId, + node.Status); + } + + SaveNodesRoundHistory(node); + if (fullUpdate) + { + nodesCache.UpdateNodeInCache(node); + } + else + { + nodesCache.UpdateNodeInCache(node.Address, node.Status, node.RoundId); + } + } + catch (Exception ex) + { + logger.LogError("Error updating node: {msg}", ex.Message); + } + } + + private void SaveNodesRoundHistory(Node node) + { + if (node.RoundId != null && + node.Random != null && + node.Status != NStatus.NotRegistered && + node.Status != NStatus.WaitingRegistration) + { + nodesRoundHistoryCache.SaveNodesRoundHistoryToCache(new NodesRoundHistory(node)); + nrhRequestProcessor.EnqueueRequest(new Node(node)); + } + } + + public void FinalizeRegistration(Node node) + { + node.Status = NStatus.WaitingRoundStart; + UpdateNode(node, false, true); + } + } +} diff --git a/dkgServiceNode/Data/NodeContext.cs b/dkgServiceNode/Data/NodeContext.cs new file mode 100644 index 0000000..e5c9847 --- /dev/null +++ b/dkgServiceNode/Data/NodeContext.cs @@ -0,0 +1,76 @@ +// Copyright (C) 2024 Maxim [maxirmx] Samsonov (www.sw.consulting) +// All rights reserved. +// This file is a part of dkg service node +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS +// BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +using Microsoft.EntityFrameworkCore; +using dkgServiceNode.Models; +using dkgServiceNode.Services.Cache; +using dkgCommon.Constants; + +namespace dkgServiceNode.Data +{ + public class NodeContext : DbContext + { + private readonly ILogger _logger; + private readonly NodesCache _nodesCache; + private readonly NodesRoundHistoryCache _nodesRoundHistoryCache; + + public NodeContext( + DbContextOptions options, + NodesCache nc, + NodesRoundHistoryCache nrhc, + ILogger lggr) : base(options) + { + _logger = lggr; + _nodesCache = nc; + _nodesRoundHistoryCache = nrhc; + } + public DbSet Nodes { get; set; } + public async Task ExistsAsync(string address) + { + return await Nodes.AnyAsync(e => e.Address == address); + } + + public async Task DeleteAsync(string address) + { + return await Nodes.AnyAsync(e => e.Address == address); + } + + public async Task DeleteAsync(Node node) + { + try + { + _nodesRoundHistoryCache.UpdateNodeCounts(node.RoundId, node.Status, null, NStatus.NotRegistered); + Nodes.Remove(node); + await SaveChangesAsync(); + _nodesCache.DeleteNodeFromCache(node); + } + catch (Exception ex) + { + _logger.LogError("Error deleting node: {msg}", ex.Message); + } + } + + } +} diff --git a/dkgServiceNode/Data/RoundContext.cs b/dkgServiceNode/Data/RoundContext.cs new file mode 100644 index 0000000..33ec647 --- /dev/null +++ b/dkgServiceNode/Data/RoundContext.cs @@ -0,0 +1,93 @@ +// Copyright (C) 2024 Maxim [maxirmx] Samsonov (www.sw.consulting) +// All rights reserved. +// This file is a part of dkg service node +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS +// BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +using dkgServiceNode.Models; +using dkgServiceNode.Services.Cache; +using Microsoft.EntityFrameworkCore; + +namespace dkgServiceNode.Data +{ + public class RoundContext : DbContext + { + private DbSet Rounds { get; set; } + + private readonly RoundsCache roundsCache; + + private readonly ILogger logger; + public RoundContext( + DbContextOptions options, + RoundsCache rc, + ILogger lggr) : base(options) + { + roundsCache = rc; + logger = lggr; + } + public Round? GetRoundById(int id) => roundsCache.GetRoundById(id); + public List GetAllRounds() => roundsCache.GetAllRounds(); + public List GetAllRoundsSortedByIdDescending() => roundsCache.GetAllRoundsSortedByIdDescending(); + public async Task AddRoundAsync(Round round) + { + try + { + Rounds.Add(round); + await SaveChangesAsync(); + roundsCache.SaveRoundToCache(round); + } + catch (Exception ex) + { + logger.LogError("Error adding round: {msg}", ex.Message); + } + } + public async Task UpdateRoundAsync(Round round) + { + try + { + Rounds.Update(round); + await SaveChangesAsync(); + roundsCache.UpdateRoundInCache(round); + } + catch (Exception ex) + { + logger.LogError("Error updating round: {msg}", ex.Message); + } + } + public async Task DeleteRoundAsync(Round round) + { + try + { + Rounds.Remove(round); + await SaveChangesAsync(); + roundsCache.DeleteRoundFromCache(round.Id); + } + catch (Exception ex) + { + logger.LogError("Error deleting round: {msg}", ex.Message); + } + } + public bool RoundExists(int id) => roundsCache.RoundExists(id); + public int? LastRoundResult() => roundsCache.LastRoundResult(); + + } +} diff --git a/dkgServiceNode/Data/UserContext.cs b/dkgServiceNode/Data/UserContext.cs index 161801f..f6b3b01 100644 --- a/dkgServiceNode/Data/UserContext.cs +++ b/dkgServiceNode/Data/UserContext.cs @@ -34,33 +34,33 @@ public class UserContext : DbContext { public UserContext(DbContextOptions options) : base(options) { } public DbSet Users { get; set; } - public async Task Exists(int id) + public async Task ExistsAsync(int id) { return await Users.AnyAsync(e => e.Id == id); } - public async Task Exists(string email) + public async Task ExistsAsync(string email) { return await Users.AnyAsync(e => e.Email == email); } - public async Task> UserViewItems() + public async Task> UserViewItemsAsync() { return await Users.AsNoTracking().Select(x => new UserViewItem(x)).ToListAsync(); } - public async Task UserViewItem(int id) + public async Task UserViewItemAsync(int id) { var user = await Users.AsNoTracking().Where(x => x.Id == id).Select(x => new UserViewItem(x)).FirstOrDefaultAsync(); return user ?? null; } - public async Task> CheckAdmin(int cuid) + public async Task> CheckAdminAsync(int cuid) { - var curUser = await UserViewItem(cuid); + var curUser = await UserViewItemAsync(cuid); return curUser != null && curUser.IsAdmin; } - public async Task> CheckAdminOrSameUser(int id, int cuid) + public async Task> CheckAdminOrSameUserAsync(int id, int cuid) { if (cuid == 0) return false; if (cuid == id) return true; - return await CheckAdmin(cuid); + return await CheckAdminAsync(cuid); } public bool CheckSameUser(int id, int cuid) { diff --git a/dkgServiceNode/Models/Node.cs b/dkgServiceNode/Models/Node.cs index 4e7760f..fc680a3 100644 --- a/dkgServiceNode/Models/Node.cs +++ b/dkgServiceNode/Models/Node.cs @@ -34,14 +34,10 @@ namespace dkgServiceNode.Models [Table("nodes")] public class Node { - [Key] - [DatabaseGenerated(DatabaseGeneratedOption.Identity)] - [Column("id")] - public int Id { get; set; } - [Column("name")] public string Name { get; set; } = "--"; + [Key] [Column("address")] // Solana Wallet Address public string Address { get; set; } = string.Empty; @@ -85,7 +81,7 @@ public Node() } public Node(Node other) { - Id = other.Id; + Address = other.Address; Name = other.Name; PublicKey = other.PublicKey; RoundId = other.RoundId; diff --git a/dkgServiceNode/Models/NodesRoundHistory.cs b/dkgServiceNode/Models/NodesRoundHistory.cs index 282eea1..218f790 100644 --- a/dkgServiceNode/Models/NodesRoundHistory.cs +++ b/dkgServiceNode/Models/NodesRoundHistory.cs @@ -43,8 +43,8 @@ public class NodesRoundHistory public int RoundId { get; set; } [Required] - [Column("node_id")] - public int NodeId { get; set; } + [Column("node_address")] + public string NodeAddress { get; set; } = ""; [Required] [Column("node_final_status")] @@ -65,7 +65,7 @@ public NodesRoundHistory() } public NodesRoundHistory(Node node) { - NodeId = node.Id; + NodeAddress = node.Address; NodeFinalStatus = node.Status; NodeRandom = node.Random; RoundId = node.RoundId ?? 0; @@ -74,7 +74,7 @@ public NodesRoundHistory(NodesRoundHistory other) { Id = other.Id; RoundId = other.RoundId; - NodeId = other.NodeId; + NodeAddress = other.NodeAddress; NodeFinalStatusValue = other.NodeFinalStatusValue; NodeRandom = other.NodeRandom; NodeFinalStatus = other.NodeFinalStatus; diff --git a/dkgServiceNode/Models/Round.cs b/dkgServiceNode/Models/Round.cs index 25fa006..3fd4bc1 100644 --- a/dkgServiceNode/Models/Round.cs +++ b/dkgServiceNode/Models/Round.cs @@ -57,6 +57,12 @@ public class Round [NotMapped] public int NodeCount { get; set; } = 0; + [NotMapped] + public int NodeCountWRegistration { get; set; } = 0; + + [NotMapped] + public int NodeCountWRoundStart { get; set; } = 0; + [NotMapped] public int NodeCountStepOne { get; set; } = 0; diff --git a/dkgServiceNode/Program.cs b/dkgServiceNode/Program.cs index d496086..e064f47 100644 --- a/dkgServiceNode/Program.cs +++ b/dkgServiceNode/Program.cs @@ -2,9 +2,11 @@ using dkgServiceNode.Data; using dkgServiceNode.Services.Authorization; -using dkgServiceNode.Services.RoundRunner; using dkgServiceNode.Services.Cache; using dkgServiceNode.Services.Initialization; +using dkgServiceNode.Services.RequestProcessors; +using dkgServiceNode.Services.RequestLimitingMiddleware; +using dkgServiceNode.Services.RoundRunner; var builder = WebApplication.CreateBuilder(args); @@ -31,18 +33,33 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); +builder.Services.AddSingleton(); var connectionString = configuration.GetConnectionString("DefaultConnection") ?? ""; builder.Services.AddDbContext(options => options.UseNpgsql(connectionString)); builder.Services.AddDbContext(options => options.UseNpgsql(connectionString)); -builder.Services.AddDbContext(options => { +builder.Services.AddDbContext(options => options.UseNpgsql(connectionString)); +builder.Services.AddDbContext(options => { options.UseNpgsql(connectionString); options.EnableSensitiveDataLogging(); }); builder.Services.AddSingleton(); +builder.Services.AddSingleton(serviceProvider => +{ + var logger = serviceProvider.GetRequiredService>(); + return new NrhAddProcessor(connectionString, logger); +}); + +builder.Services.AddSingleton(serviceProvider => +{ + var logger = serviceProvider.GetRequiredService>(); + return new NodeAddProcessor(connectionString, logger); +}); + + var app = builder.Build(); // -------------- Initialize database and caches --------------- @@ -54,8 +71,16 @@ app.Services.GetRequiredService>() ); initializer.Initialize(connectionString); -// ------------------------------------------------------------- +app.Services + .GetRequiredService() + .Start(); + +app.Services + .GetRequiredService() + .Start(app.Services.GetRequiredService()); + +// ------------------------------------------------------------- app.UseMiddleware(controllers); diff --git a/dkgServiceNode/Services/Cache/NodesCache.cs b/dkgServiceNode/Services/Cache/NodesCache.cs index e933f54..bebac85 100644 --- a/dkgServiceNode/Services/Cache/NodesCache.cs +++ b/dkgServiceNode/Services/Cache/NodesCache.cs @@ -25,37 +25,32 @@ using dkgCommon.Constants; using dkgServiceNode.Models; -using Npgsql; -using System.Collections.Concurrent; namespace dkgServiceNode.Services.Cache { - public class NodesCache + public class NodesCache { - private readonly Dictionary _cacheNodes = new(); - private readonly Dictionary _addressToId = new(); + private readonly Dictionary _cacheNodes = new(); private readonly object _cacheNodesLock = new(); public void SaveNodeToCacheNoLock(Node node) { - _cacheNodes[node.Id] = new Node(node); - _addressToId[node.Address] = node.Id; + _cacheNodes[node.Address] = new Node(node); } - public void SaveNodeToCache(Node node) + public void SaveNodeToCache(Node node) { lock (_cacheNodesLock) { - _cacheNodes[node.Id] = new Node(node); - _addressToId[node.Address] = node.Id; + _cacheNodes[node.Address] = new Node(node); } } - public Node? GetNodeById(int id) + public Node? GetNodeByAddress(string address) { Node? res = null; lock (_cacheNodesLock) { - if (_cacheNodes.TryGetValue(id, out Node? node)) + if (_cacheNodes.TryGetValue(address, out Node? node)) { res = new Node(node); } @@ -63,23 +58,7 @@ public void SaveNodeToCache(Node node) return res; } - public Node? GetNodeByAddress(string address) - { - Node? res = null; - lock (_cacheNodesLock) - { - if (_addressToId.TryGetValue(address, out var id)) - { - if (_cacheNodes.TryGetValue(id, out Node? node)) - { - res = new Node(node); - } - } - } - return res; - } - - public List GetAllNodes() + public List GetAllNodes() { List copiedNodes; lock (_cacheNodesLock) @@ -89,7 +68,7 @@ public List GetAllNodes() return copiedNodes; } - public int GetNodeCount() + public int GetNodeCount() { int res = 0; lock (_cacheNodesLock) @@ -99,7 +78,7 @@ public int GetNodeCount() return res; } - public List GetAllNodesSortedById() + public List GetAllNodesSortedById() { List copiedNodes; lock (_cacheNodesLock) @@ -111,12 +90,12 @@ public List GetAllNodesSortedById() return copiedNodes; } - public List GetFilteredNodes(string search = "") + public List GetFilteredNodes(string search = "") { return string.IsNullOrWhiteSpace(search) ? GetAllNodes() : GetFilteredNodesInternal(search); } - private List GetFilteredNodesInternal(string search) + private List GetFilteredNodesInternal(string search) { List filteredNodes; lock (_cacheNodesLock) @@ -124,7 +103,6 @@ private List GetFilteredNodesInternal(string search) filteredNodes = _cacheNodes .Where(kvp => kvp.Value.Name.Contains(search) || - kvp.Value.Id.ToString().Contains(search) || kvp.Value.Address.Contains(search) || (kvp.Value.RoundId != null && kvp.Value.RoundId.ToString()!.Contains(search)) || (kvp.Value.RoundId == null && ("null".Contains(search) || "--".Contains(search))) || @@ -135,26 +113,40 @@ private List GetFilteredNodesInternal(string search) return filteredNodes; } - public void UpdateNodeInCache(Node node) + public void UpdateNodeInCache(Node node) + { + lock (_cacheNodesLock) + { + _cacheNodes[node.Address] = new Node(node); + } + } + public void UpdateNodeInCache(string address, NodeStatus status) + { + lock (_cacheNodesLock) + { + if (_cacheNodes.TryGetValue(address, out Node? node)) + { + node.Status = status; + } + } + } + public void UpdateNodeInCache(string address, NodeStatus status, int? roundId) { lock (_cacheNodesLock) { - var addr = _cacheNodes[node.Id]?.Address; - if (addr != null && addr!= node.Address) + if (_cacheNodes.TryGetValue(address, out Node? node)) { - _addressToId.Remove(addr); + node.Status = status; + node.RoundId = roundId; } - _cacheNodes[node.Id] = new Node(node); - _addressToId[node.Address] = node.Id; } } - public void DeleteNodeFromCache(Node node) + public void DeleteNodeFromCache(Node node) { lock (_cacheNodesLock) { - _addressToId.Remove(node.Address); - _cacheNodes.Remove(node.Id); + _cacheNodes.Remove(node.Address); } } } diff --git a/dkgServiceNode/Services/Cache/NodesRoundHistoryCache.cs b/dkgServiceNode/Services/Cache/NodesRoundHistoryCache.cs index 2b97a4e..da8ffe6 100644 --- a/dkgServiceNode/Services/Cache/NodesRoundHistoryCache.cs +++ b/dkgServiceNode/Services/Cache/NodesRoundHistoryCache.cs @@ -30,11 +30,11 @@ namespace dkgServiceNode.Services.Cache { public class NodesRoundHistoryCache { - private readonly Dictionary> _cacheNodesRoundHistory = []; - private readonly object _cacheNodesRoundHistoryLock = new(); + private readonly Dictionary> _cacheNodesRoundHistory = []; + private readonly object _cacheNodesRoundHistoryLock = new(); private readonly object _cacheCountsRoundHistoryLock = new(); - private Dictionary<(int RoundId, NStatus Status), int> _nodeCounts = []; + private readonly Dictionary<(int RoundId, NStatus Status), int> _nodeCounts = []; public void SetHistoryCounterNoLock(int roundId, NStatus status, int count) { @@ -83,20 +83,20 @@ public int GetNodeCountForRound(int roundId, NStatus status) public void SaveNodesRoundHistoryToCacheNoLock(NodesRoundHistory history) { - if (!_cacheNodesRoundHistory.ContainsKey(history.NodeId)) + if (!_cacheNodesRoundHistory.ContainsKey(history.NodeAddress)) { - _cacheNodesRoundHistory[history.NodeId] = []; + _cacheNodesRoundHistory[history.NodeAddress] = []; } - _cacheNodesRoundHistory[history.NodeId].Add(new NodesRoundHistory(history)); + _cacheNodesRoundHistory[history.NodeAddress].Add(new NodesRoundHistory(history)); } public void SaveNodesRoundHistoryToCache(NodesRoundHistory history) { lock (_cacheNodesRoundHistoryLock) { - if (!_cacheNodesRoundHistory.TryGetValue(history.NodeId, out List? histories)) + if (!_cacheNodesRoundHistory.TryGetValue(history.NodeAddress, out List? histories)) { - _cacheNodesRoundHistory[history.NodeId] = [new NodesRoundHistory(history)]; + _cacheNodesRoundHistory[history.NodeAddress] = [new NodesRoundHistory(history)]; } else { @@ -106,25 +106,25 @@ public void SaveNodesRoundHistoryToCache(NodesRoundHistory history) // If the existing record is found, replace it with the new history if (index != -1) { - _cacheNodesRoundHistory[history.NodeId][index] = new NodesRoundHistory(history); + _cacheNodesRoundHistory[history.NodeAddress][index] = new NodesRoundHistory(history); } else { // Add the new history to the list if it doesn't exist - _cacheNodesRoundHistory[history.NodeId].Add(new NodesRoundHistory(history)); + _cacheNodesRoundHistory[history.NodeAddress].Add(new NodesRoundHistory(history)); // Remove excess items if the list has more than 10 items - if (_cacheNodesRoundHistory[history.NodeId].Count > 10) + if (_cacheNodesRoundHistory[history.NodeAddress].Count > 10) { // Find the two items with the largest RoundId - var largestRoundIds = _cacheNodesRoundHistory[history.NodeId] + var largestRoundIds = _cacheNodesRoundHistory[history.NodeAddress] .OrderByDescending(nrh => nrh.RoundId) .Take(2) .Select(nrh => nrh.RoundId) .ToList(); // Remove all items except the two with the largest RoundId - _cacheNodesRoundHistory[history.NodeId] = _cacheNodesRoundHistory[history.NodeId] + _cacheNodesRoundHistory[history.NodeAddress] = _cacheNodesRoundHistory[history.NodeAddress] .Where(nrh => largestRoundIds.Contains(nrh.RoundId)) .ToList(); } @@ -133,12 +133,12 @@ public void SaveNodesRoundHistoryToCache(NodesRoundHistory history) } } - public NodesRoundHistory? GetLastNodeRoundHistory(int nodeId, int RoundId) + public NodesRoundHistory? GetLastNodeRoundHistory(string nodeAddress, int RoundId) { NodesRoundHistory? res = null; lock (_cacheNodesRoundHistoryLock) { - if (_cacheNodesRoundHistory.TryGetValue(nodeId, out List? histories)) + if (_cacheNodesRoundHistory.TryGetValue(nodeAddress, out List? histories)) { var history = histories.Where(nrh => nrh.RoundId != RoundId) .OrderByDescending(nrh => nrh.RoundId) @@ -151,16 +151,16 @@ public void SaveNodesRoundHistoryToCache(NodesRoundHistory history) } return res; } - public bool CheckNodeQualification(int nodeId, int previousRoundId) + public bool CheckNodeQualification(string nodeAddress, int previousRoundId) { - return previousRoundId > 0 && GetNodeRandomForRound(nodeId, previousRoundId) != null; + return previousRoundId > 0 && GetNodeRandomForRound(nodeAddress, previousRoundId) != null; } - public int? GetNodeRandomForRound(int nodeId, int roundId) + public int? GetNodeRandomForRound(string nodeAddress, int roundId) { int? res = null; lock (_cacheNodesRoundHistoryLock) { - if (_cacheNodesRoundHistory.TryGetValue(nodeId, out List? histories)) + if (_cacheNodesRoundHistory.TryGetValue(nodeAddress, out List? histories)) { var history = histories.Where(nrh => nrh.RoundId == roundId) .FirstOrDefault(); diff --git a/dkgServiceNode/Services/Initialization/Initializer.cs b/dkgServiceNode/Services/Initialization/Initializer.cs index 1332547..fc702e3 100644 --- a/dkgServiceNode/Services/Initialization/Initializer.cs +++ b/dkgServiceNode/Services/Initialization/Initializer.cs @@ -35,7 +35,7 @@ public class Initializer( NodesCache nodesCache, RoundsCache roundsCache, NodesRoundHistoryCache nodesRoundHistoryCache, - ILogger logger) + ILogger logger): IDisposable { private readonly NodesCache _nodesCache = nodesCache; private readonly RoundsCache _roundsCache = roundsCache; @@ -80,13 +80,11 @@ private void InitializeNodesCache(NpgsqlConnection connection) while (reader.Read()) { - int id = reader.GetInt32(0); - string address = reader.GetString(1); - string name = reader.GetString(2); + string address = reader.GetString(reader.GetOrdinal("address")); + string name = reader.GetString(reader.GetOrdinal("name")); Node node = new() { - Id = id, Address = address, Name = name }; @@ -167,16 +165,16 @@ private void InitializeNodesRoundHistoryCache(NpgsqlConnection connection) try { string selectQuery = @" - SELECT DISTINCT ON (node_id) + SELECT DISTINCT ON (node_address) id, round_id, - node_id, + node_address, node_final_status, node_random FROM nodes_round_history ORDER BY - node_id, + node_address, round_id DESC, id ASC;"; @@ -191,9 +189,9 @@ ORDER BY { Id = reader.GetInt32(reader.GetOrdinal("id")), RoundId = reader.GetInt32(reader.GetOrdinal("round_id")), - NodeId = reader.GetInt32(reader.GetOrdinal("node_id")), + NodeAddress = reader.GetString(reader.GetOrdinal("node_address")), NodeFinalStatusValue = reader.GetInt16(reader.GetOrdinal("node_final_status")), - NodeRandom = reader.IsDBNull(reader.GetOrdinal("node_random")) ? (int?)null : reader.GetInt32(reader.GetOrdinal("node_random")) + NodeRandom = reader.IsDBNull(reader.GetOrdinal("node_random")) ? null : reader.GetInt32(reader.GetOrdinal("node_random")) }; _nodesRoundHistoryCache.SaveNodesRoundHistoryToCacheNoLock(history); @@ -242,6 +240,11 @@ GROUP BY _logger.LogError("Failed to populate history counters from database: {msg}", ex.Message); } } + + public void Dispose() + { + throw new NotImplementedException(); + } } } diff --git a/dkgServiceNode/Services/NodeComparer/NodeComparer.cs b/dkgServiceNode/Services/NodeComparer/NodeComparer.cs index a2352c5..68b6ce9 100644 --- a/dkgServiceNode/Services/NodeComparer/NodeComparer.cs +++ b/dkgServiceNode/Services/NodeComparer/NodeComparer.cs @@ -42,7 +42,7 @@ public NodeComparer(int zPoint, int roundId, NodesRoundHistoryCache nrhc) private int? GetInt(Node? x) { - return x != null ? nodesRoundHistoryCache.GetNodeRandomForRound(x.Id, RoundId) : null; + return x != null ? nodesRoundHistoryCache.GetNodeRandomForRound(x.Address, RoundId) : null; } public int Compare(Node? x, Node? y) { diff --git a/dkgServiceNode/Services/RequestLimitingMiddleware/RequestLimitingMiddleware.cs b/dkgServiceNode/Services/RequestLimitingMiddleware/RequestLimitingMiddleware.cs index 8195042..cda6028 100644 --- a/dkgServiceNode/Services/RequestLimitingMiddleware/RequestLimitingMiddleware.cs +++ b/dkgServiceNode/Services/RequestLimitingMiddleware/RequestLimitingMiddleware.cs @@ -1,33 +1,54 @@ -// RequestLimitingMiddleware.cs -public class RequestLimitingMiddleware -{ - private readonly RequestDelegate _next; - private readonly SemaphoreSlim _semaphore; +// Copyright (C) 2024 Maxim [maxirmx] Samsonov (www.sw.consulting) +// All rights reserved. +// This file is a part of dkg service node +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS +// BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. - public RequestLimitingMiddleware(RequestDelegate next, int maxConcurrentRequests) +namespace dkgServiceNode.Services.RequestLimitingMiddleware +{ + public class RequestLimitingMiddleware(RequestDelegate next, int maxConcurrentRequests) { - _next = next; - _semaphore = new SemaphoreSlim(maxConcurrentRequests, maxConcurrentRequests); - } + private readonly RequestDelegate _next = next; + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(maxConcurrentRequests, maxConcurrentRequests); - public async Task InvokeAsync(HttpContext context) - { - if (context.Request.Path.StartsWithSegments($"/api/ops", StringComparison.OrdinalIgnoreCase)) + public async Task InvokeAsync(HttpContext context) { - await _semaphore.WaitAsync(); - - try + if (context.Request.Path.StartsWithSegments($"/api/ops", StringComparison.OrdinalIgnoreCase)) { - await _next(context); + await _semaphore.WaitAsync(); + + try + { + await _next(context); + } + finally + { + _semaphore.Release(); + } } - finally + else { - _semaphore.Release(); + await _next(context); } } - else - { - await _next(context); - } } } \ No newline at end of file diff --git a/dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs b/dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs new file mode 100644 index 0000000..0074dca --- /dev/null +++ b/dkgServiceNode/Services/RequestProcessors/NodeAddProcessor.cs @@ -0,0 +1,162 @@ +using System.Collections.Concurrent; +using dkgCommon.Constants; +using dkgServiceNode.Data; +using dkgServiceNode.Models; +using dkgServiceNode.Services.Cache; +using Npgsql; +using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database; + +namespace dkgServiceNode.Services.RequestProcessors +{ + public class NodeAddProcessor : IDisposable + { + private readonly int _database_reconnect_delay = 3000; + private readonly int _queue_reparse_delay = 1000; + private readonly int _bulk_insert_limit = 10000; + + private readonly ConcurrentQueue requestQueue = new(); + private readonly CancellationTokenSource cancellationTokenSource = new(); + + private Task? backgroundTask = null; + private NodeCompositeContext? ncContext = null; + + private volatile bool isRunning = false; + private readonly ILogger logger; + private readonly string connectionString; + private bool disposed = false; + + public NodeAddProcessor( + string connectionStr, + ILogger lgger + ) + { + connectionString = connectionStr; + logger = lgger; + } + + public void Start(NodeCompositeContext nContext) + { + if (isRunning) + { + logger.LogWarning("Node Request Processor is already running. 'Start' ignored."); + } + else + { + isRunning = true; + ncContext = nContext; + backgroundTask = Task.Run(ProcessRequests, cancellationTokenSource.Token); + logger.LogInformation("Request Processor has been started."); + } + } + + public void Stop() + { + if (!isRunning) + { + logger.LogWarning("Request Processor is not running. 'Stop' ignored."); + } + else + { + cancellationTokenSource.Cancel(); + backgroundTask?.Wait(); + isRunning = false; + logger.LogInformation("Request Processor has been stopped."); + } + } + + public void EnqueueRequest(Node request) + { + requestQueue.Enqueue(request); + } + + private async Task ProcessRequests() + { + using var dbConnection = new NpgsqlConnection(connectionString); + while (!cancellationTokenSource.Token.IsCancellationRequested) + { + try + { + await dbConnection.OpenAsync(cancellationTokenSource.Token); + break; + } + catch (Exception ex) + { + logger.LogError("Failed to create database connection: {msg}", ex.Message); + await Task.Delay(_database_reconnect_delay, cancellationTokenSource.Token); + } + } + + while (!cancellationTokenSource.Token.IsCancellationRequested) + { + var requests = new List(); + + while (requestQueue.TryDequeue(out var request) && requests.Count < _bulk_insert_limit) + { + requests.Add(request); + } + + if (requests.Count > 0) + { + try + { + // Convert the list of requests to a JSON array + var jsonItems = System.Text.Json.JsonSerializer.Serialize(requests.Select(r => new + { + node_address = r.Address, + node_name = r.Name, + round_id = r.RoundId, + node_final_status = r.StatusValue, + node_random = r.Random + })); + + // Call the bulk_insert_node_with_round_history procedure + using (var command = new NpgsqlCommand("CALL bulk_insert_node_with_round_history(@p_items)", dbConnection)) + { + command.Parameters.AddWithValue("p_items", NpgsqlTypes.NpgsqlDbType.Json, jsonItems); + await command.ExecuteNonQueryAsync(); + } + + // Finalize registration for each request + foreach (var request in requests) + { + ncContext!.FinalizeRegistration(request); + } + } + catch (Exception ex) + { + logger.LogError("An error occurred while processing requests: {msg}.", ex.Message); + } + } + else + { + await Task.Delay(_queue_reparse_delay, cancellationTokenSource.Token); + } + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + protected virtual void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + Stop(); + cancellationTokenSource.Dispose(); + backgroundTask?.Dispose(); + } + ncContext = null; + disposed = true; + } + } + + ~NodeAddProcessor() + { + Dispose(false); + } + } +} diff --git a/dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs b/dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs new file mode 100644 index 0000000..a54f5a2 --- /dev/null +++ b/dkgServiceNode/Services/RequestProcessors/NrhAddProcessor.cs @@ -0,0 +1,157 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using dkgServiceNode.Data; +using dkgServiceNode.Models; +using Npgsql; + +namespace dkgServiceNode.Services.RequestProcessors +{ + public class NrhAddProcessor : IDisposable + { + private readonly int _database_reconnect_delay = 3000; + private readonly int _queue_reparse_delay = 1000; + private readonly int _bulk_upsert_limit = 10000; + + private readonly ConcurrentQueue requestQueue = new(); + private readonly CancellationTokenSource cancellationTokenSource = new(); + private Task? backgroundTask = null; + private volatile bool isRunning = false; + private readonly ILogger logger; + private readonly string connectionString; + private bool disposed = false; + + public NrhAddProcessor(string connectionStr, ILogger lgger) + { + connectionString = connectionStr; + logger = lgger; + } + + public void Start() + { + if (isRunning) + { + logger.LogWarning("Nodes Round History Request Processor is already running. 'Start' ignored."); + } + else + { + isRunning = true; + backgroundTask = Task.Run(ProcessRequests, cancellationTokenSource.Token); + logger.LogInformation("Request Processor has been started."); + } + } + + public void Stop() + { + if (!isRunning) + { + logger.LogWarning("Request Processor is not running. 'Stop' ignored."); + } + else + { + cancellationTokenSource.Cancel(); + backgroundTask?.Wait(); + isRunning = false; + logger.LogInformation("Request Processor has been stopped."); + } + } + + public void EnqueueRequest(Node request) + { + if (request.RoundId != null) + { + requestQueue.Enqueue(request); + } + else + { + logger.LogWarning("Ignoring history record with null RoundId."); + } + } + + private async Task ProcessRequests() + { + using var dbConnection = new NpgsqlConnection(connectionString); + while (!cancellationTokenSource.Token.IsCancellationRequested) + { + try + { + await dbConnection.OpenAsync(cancellationTokenSource.Token); + break; + } + catch (Exception ex) + { + logger.LogError("Failed to create database connection: {msg}", ex.Message); + await Task.Delay(_database_reconnect_delay, cancellationTokenSource.Token); + } + } + + while (!cancellationTokenSource.Token.IsCancellationRequested) + { + var requests = new List(); + + while (requestQueue.TryDequeue(out var request) && requests.Count < _bulk_upsert_limit) + { + requests.Add(request); + } + + if (requests.Count > 0) + { + try + { + var items = requests.Select(r => new + { + node_address = r.Address, + round_id = r.RoundId, + node_final_status = r.StatusValue, + node_random = r.Random + }).ToArray(); + + var jsonItems = JsonSerializer.Serialize(items); + + using var command = new NpgsqlCommand("CALL bulk_upsert_node_round_history(@p_items)", dbConnection); + var parameter = new NpgsqlParameter("p_items", NpgsqlTypes.NpgsqlDbType.Json) + { + Value = jsonItems + }; + command.Parameters.Add(parameter); + + await command.ExecuteNonQueryAsync(cancellationTokenSource.Token); + } + catch (Exception ex) + { + logger.LogError("An error occurred while processing requests: {msg}.", ex.Message); + } + } + else + { + await Task.Delay(_queue_reparse_delay, cancellationTokenSource.Token); + } + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + Stop(); + cancellationTokenSource.Dispose(); + backgroundTask?.Dispose(); + } + disposed = true; + } + } + + ~NrhAddProcessor() + { + Dispose(false); + } + } +} + diff --git a/dkgServiceNode/Services/RoundRunner/Runner.cs b/dkgServiceNode/Services/RoundRunner/Runner.cs index a1d67d8..4b050e8 100644 --- a/dkgServiceNode/Services/RoundRunner/Runner.cs +++ b/dkgServiceNode/Services/RoundRunner/Runner.cs @@ -26,7 +26,6 @@ using dkgServiceNode.Models; using System.Runtime.CompilerServices; -using System.Text; [assembly: InternalsVisibleTo("dkgNodesTests")] namespace dkgServiceNode.Services.RoundRunner diff --git a/dkgServiceNode/appsettings.Development.json b/dkgServiceNode/appsettings.Development.json index 5016258..834e8d3 100644 --- a/dkgServiceNode/appsettings.Development.json +++ b/dkgServiceNode/appsettings.Development.json @@ -7,7 +7,7 @@ } }, "AllowedHosts": "*", - "Controllers": "9", + "Controllers": "50", "ConnectionStrings": { "DefaultConnection": "Host=dkgservice_db;Port=5432;Database=dkgservice;Username=postgres;Password=postgres" }, diff --git a/dkgServiceNode/appsettings.json b/dkgServiceNode/appsettings.json index 381a367..1775066 100644 --- a/dkgServiceNode/appsettings.json +++ b/dkgServiceNode/appsettings.json @@ -7,7 +7,7 @@ } }, "AllowedHosts": "*", - "Controllers": "5", + "Controllers": "50", "ConnectionStrings": { "DefaultConnection": "Host=dkgservice_db;Port=5432;Database=dkgservice;Username=postgres;Password=postgres" },