From 918d242358ffbde4864ac568b77c3dcc04b4e487 Mon Sep 17 00:00:00 2001 From: kgrudzien Date: Fri, 22 Mar 2024 15:12:14 +0100 Subject: [PATCH 1/6] CoinAPI.WebSocket.Stats.Console --- .../CoinAPI.WebSocket.Stats.Console.csproj | 27 +++ .../Program.cs | 174 ++++++++++++++++++ .../Properties/launchSettings.json | 8 + .../appsettings.json | 3 + .../CoinAPI.WebSocket.V1/DataModels/Hello.cs | 1 + data-api/csharp-ws/CoinAPI.WebSocket.sln | 14 +- 6 files changed, 223 insertions(+), 4 deletions(-) create mode 100644 data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/CoinAPI.WebSocket.Stats.Console.csproj create mode 100644 data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs create mode 100644 data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Properties/launchSettings.json create mode 100644 data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/appsettings.json diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/CoinAPI.WebSocket.Stats.Console.csproj b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/CoinAPI.WebSocket.Stats.Console.csproj new file mode 100644 index 0000000000..a512348f8a --- /dev/null +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/CoinAPI.WebSocket.Stats.Console.csproj @@ -0,0 +1,27 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + + + + PreserveNewest + + + + diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs new file mode 100644 index 0000000000..c1ebb7d779 --- /dev/null +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs @@ -0,0 +1,174 @@ +using Cocona; +using CoinAPI.WebSocket.V1; +using CoinAPI.WebSocket.V1.DataModels; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Serilog; +using System.Globalization; +using System.Text; +using System.Xml; +using System.Xml.Xsl; + +internal enum SubType +{ + book5, + book20, + book50, + book, + book_l, + quote, + trade, + ohlcv, + exrate, +} + +internal class Program +{ + public static IConfiguration Configuration { get; private set; } + static async Task Main(string[] args) + { + Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture; + Thread.CurrentThread.CurrentUICulture = CultureInfo.InvariantCulture; + + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .WriteTo.Console() + //.WriteTo.File("logs/myapp.txt", rollingInterval: RollingInterval.Day) + .CreateLogger(); + + var confBuilder = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); + + Configuration = confBuilder.Build(); + + var coconaBuilder = CoconaApp.CreateBuilder(); + var host = coconaBuilder.Host; + host.ConfigureLogging(logging => + { + logging.AddSerilog(); + }) + .ConfigureServices((context, services) => + { + + // + }); + //host.Run(args); + var coconaApp = coconaBuilder.Build(); + await coconaApp.RunAsync(); + } + + public async Task MakeRequest([FromService] IConfiguration configuration, + string subscribe_data_type = null, string asset = null, string symbol = null, + string exchange = null, string apikey = null, string type = "hello") + { + var typeNames = Enum.GetNames().ToList(); + if (!typeNames.Any(x => x == subscribe_data_type)) + { + Console.WriteLine($"Invalid subscribe_data_type, valid values: {string.Join(",", typeNames)}"); + return; + } + + using (var wsClient = new CoinApiWsClient()) + { + int msgCount = 0; + List<(DateTime, DateTime)> latencyList = new List<(DateTime, DateTime)>(); + + void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) + { + msgCount++; + if (time_coinapi.HasValue && time_exchange.HasValue) + { + latencyList.Add((time_exchange.Value, time_coinapi.Value)); + } + } + + switch (subscribe_data_type) + { + case "book5": + wsClient.OrderBook5Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); }; + break; + case "book20": + wsClient.OrderBook20Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); }; + break; + case "book50": + wsClient.OrderBook50Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); }; + break; + case "book": + wsClient.OrderBookEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); }; + break; + case "book_l": + wsClient.OrderBookL3Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); }; + break; + case "quote": + wsClient.QuoteEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); }; + break; + case "trade": + wsClient.TradeEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); }; + break; + case "ohlcv": + wsClient.OHLCVEvent += (s, i) => { ProcessMsg(null, null); }; + break; + case "exrate": + wsClient.ExchangeRateEvent += (s, i) => { msgCount++; }; + break; + } + + var hello = new Hello() + { + apikey = new Guid(configuration["ApiKey"] ?? apikey ?? throw new ArgumentNullException("ApiKey is required")), + type = type, + subscribe_data_type = new string[] { subscribe_data_type }, + subscribe_filter_asset_id = string.IsNullOrWhiteSpace(asset) ? null : new string[] { asset }, + subscribe_filter_symbol_id = string.IsNullOrWhiteSpace(symbol) ? null : new string[] { symbol }, + subscribe_filter_exchange_id = string.IsNullOrWhiteSpace(exchange) ? null : new string[] { exchange }, + }; + wsClient.SendHelloMessage(hello); + + Task.Run(async () => + { + if (!wsClient.ConnectedEvent.WaitOne(10000)) return; + + msgCount = 0; + latencyList.Clear(); + + while (true) + { + await Task.Delay(1000); + var count = msgCount; + msgCount = 0; + var latencies = latencyList.Select(x => x.Item2 - x.Item1).ToList(); + latencyList.Clear(); + Console.WriteLine($"Time: {DateTime.UtcNow}"); + var strbld = new StringBuilder(); + strbld.Append($"Subscribed to: subscribe_data_type = {subscribe_data_type}"); + if (!string.IsNullOrWhiteSpace(exchange)) + { + strbld.Append($", exchange = {exchange}"); + } + if (!string.IsNullOrWhiteSpace(asset)) + { + strbld.Append($", asset = {asset}"); + } + if (!string.IsNullOrWhiteSpace(symbol)) + { + strbld.Append($", symbol = {symbol}"); + } + strbld.AppendLine(""); + strbld.Append($"Processed messsages: {count}"); + + if (latencies.Any()) + { + strbld.Append($" Latency min: {latencies.Min().TotalMilliseconds}ms, Latency max: {latencies.Max().TotalMilliseconds}ms"); + } + Console.WriteLine(strbld.ToString()); + + } + } + ); + + await Task.Run(() => Console.ReadKey()); + } + } + +} \ No newline at end of file diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Properties/launchSettings.json b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Properties/launchSettings.json new file mode 100644 index 0000000000..36053d8f3a --- /dev/null +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Properties/launchSettings.json @@ -0,0 +1,8 @@ +{ + "profiles": { + "CoinAPI.WebSocket.Stats.Console": { + "commandName": "Project", + "commandLineArgs": "--subscribe_data_type book --symbol COINBASE_SPOT_BTC_USD$" + } + } +} \ No newline at end of file diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/appsettings.json b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/appsettings.json new file mode 100644 index 0000000000..9944fc50b3 --- /dev/null +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/appsettings.json @@ -0,0 +1,3 @@ +{ + "ApiKey": "a54fdcca-2d82-44c2-ae53-b95ab2363309" +} \ No newline at end of file diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.V1/DataModels/Hello.cs b/data-api/csharp-ws/CoinAPI.WebSocket.V1/DataModels/Hello.cs index 4bb3ea81d7..012d22e2f4 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.V1/DataModels/Hello.cs +++ b/data-api/csharp-ws/CoinAPI.WebSocket.V1/DataModels/Hello.cs @@ -4,6 +4,7 @@ namespace CoinAPI.WebSocket.V1.DataModels { public class Hello { + public string type { get; set; } = "hello"; public Guid apikey { get; set; } public bool heartbeat { get; set; } public string[] subscribe_data_type { get; set; } diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.sln b/data-api/csharp-ws/CoinAPI.WebSocket.sln index 11ee4046b0..27a14b2292 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.sln +++ b/data-api/csharp-ws/CoinAPI.WebSocket.sln @@ -1,11 +1,13 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29123.88 +# Visual Studio Version 17 +VisualStudioVersion = 17.8.34330.188 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CoinApi.WEBSOCKET.V1", "CoinAPI.WebSocket.V1\CoinAPI.WebSocket.V1.csproj", "{3A91EE7F-C11C-48E9-8260-6ABFE5C1953E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CoinAPI.WebSocket.V1", "CoinAPI.WebSocket.V1\CoinAPI.WebSocket.V1.csproj", "{3A91EE7F-C11C-48E9-8260-6ABFE5C1953E}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CoinApi.WEBSOCKET.V1.Tests", "CoinAPI.WebSocket.V1.Tests\CoinAPI.WebSocket.V1.Tests.csproj", "{FD8A69DD-C3FE-4186-AA37-90B9FD4F97EB}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CoinAPI.WebSocket.V1.Tests", "CoinAPI.WebSocket.V1.Tests\CoinAPI.WebSocket.V1.Tests.csproj", "{FD8A69DD-C3FE-4186-AA37-90B9FD4F97EB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CoinAPI.WebSocket.Stats.Console", "CoinAPI.WebSocket.Stats.Console\CoinAPI.WebSocket.Stats.Console.csproj", "{F4E48B9A-A124-4F2A-B2C6-934B2288A533}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -21,6 +23,10 @@ Global {FD8A69DD-C3FE-4186-AA37-90B9FD4F97EB}.Debug|Any CPU.Build.0 = Debug|Any CPU {FD8A69DD-C3FE-4186-AA37-90B9FD4F97EB}.Release|Any CPU.ActiveCfg = Release|Any CPU {FD8A69DD-C3FE-4186-AA37-90B9FD4F97EB}.Release|Any CPU.Build.0 = Release|Any CPU + {F4E48B9A-A124-4F2A-B2C6-934B2288A533}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F4E48B9A-A124-4F2A-B2C6-934B2288A533}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F4E48B9A-A124-4F2A-B2C6-934B2288A533}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F4E48B9A-A124-4F2A-B2C6-934B2288A533}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE From 4cb59cc14442b494a6187fd26f1ca964b99203ab Mon Sep 17 00:00:00 2001 From: kgrudzien Date: Mon, 25 Mar 2024 12:55:01 +0100 Subject: [PATCH 2/6] SDK & console stats update --- .../Program.cs | 130 +++++++++++++----- .../CoinAPI.WebSocket.V1/CoinApiWsClient.cs | 74 +++++++++- 2 files changed, 166 insertions(+), 38 deletions(-) diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs index c1ebb7d779..1b5d2aed16 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Serilog; +using System.Diagnostics; using System.Globalization; using System.Text; using System.Xml; @@ -22,9 +23,23 @@ internal enum SubType exrate, } +internal enum Endpoints +{ + emea, + apac, + ncsa +} + internal class Program { public static IConfiguration Configuration { get; private set; } + + public static Dictionary Endpoints = new Dictionary + { + { global::Endpoints.emea.ToString(), "ws://api-emea.coinapi.net" }, + { global::Endpoints.apac.ToString(), "ws://api-apac.coinapi.net" }, + { global::Endpoints.ncsa.ToString(), "ws://api.ncsa.coinapi.net" } + }; static async Task Main(string[] args) { Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture; @@ -58,7 +73,7 @@ static async Task Main(string[] args) await coconaApp.RunAsync(); } - public async Task MakeRequest([FromService] IConfiguration configuration, + public async Task MakeRequest([FromService] IConfiguration configuration, string endpoint_name = null, string subscribe_data_type = null, string asset = null, string symbol = null, string exchange = null, string apikey = null, string type = "hello") { @@ -69,16 +84,23 @@ public async Task MakeRequest([FromService] IConfiguration configuration, return; } - using (var wsClient = new CoinApiWsClient()) + var endpointNames = Enum.GetNames().ToList(); + if (!string.IsNullOrWhiteSpace(endpoint_name) && !endpointNames.Any(x => x == endpoint_name)) + { + Console.WriteLine($"Invalid endpoint_name, valid values: {string.Join(",", endpointNames)}"); + return; + } + using (var wsClient = string.IsNullOrWhiteSpace(endpoint_name) ? new CoinApiWsClient() : new CoinApiWsClient(Endpoints[endpoint_name])) { int msgCount = 0; + List<(DateTime, DateTime)> latencyList = new List<(DateTime, DateTime)>(); void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) { msgCount++; if (time_coinapi.HasValue && time_exchange.HasValue) - { + { latencyList.Add((time_exchange.Value, time_coinapi.Value)); } } @@ -112,13 +134,13 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) case "exrate": wsClient.ExchangeRateEvent += (s, i) => { msgCount++; }; break; - } + } var hello = new Hello() { apikey = new Guid(configuration["ApiKey"] ?? apikey ?? throw new ArgumentNullException("ApiKey is required")), type = type, - subscribe_data_type = new string[] { subscribe_data_type }, + subscribe_data_type = new string[] { subscribe_data_type }, subscribe_filter_asset_id = string.IsNullOrWhiteSpace(asset) ? null : new string[] { asset }, subscribe_filter_symbol_id = string.IsNullOrWhiteSpace(symbol) ? null : new string[] { symbol }, subscribe_filter_exchange_id = string.IsNullOrWhiteSpace(exchange) ? null : new string[] { exchange }, @@ -128,41 +150,87 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) Task.Run(async () => { if (!wsClient.ConnectedEvent.WaitOne(10000)) return; - - msgCount = 0; - latencyList.Clear(); - + + var iterations = 0; + + Console.WriteLine($"Time: {DateTime.UtcNow}"); + var strbld = new StringBuilder(); + strbld.AppendLine($"Endpoint: {(string.IsNullOrEmpty(endpoint_name) ? "global" : Endpoints[endpoint_name])}"); + + strbld.Append($"Subscribed to: subscribe_data_type = {subscribe_data_type}"); + if (!string.IsNullOrWhiteSpace(exchange)) + { + strbld.Append($", exchange = {exchange}"); + } + if (!string.IsNullOrWhiteSpace(asset)) + { + strbld.Append($", asset = {asset}"); + } + if (!string.IsNullOrWhiteSpace(symbol)) + { + strbld.Append($", symbol = {symbol}"); + } + + Console.WriteLine(strbld.ToString()); + + var process = Process.GetCurrentProcess(); + while (true) { + strbld.Clear(); + + if (iterations % 10 == 0) + { + strbld.AppendLine(""); + } + iterations++; + + var msgCountPrev = msgCount; + var totalBytesReceivedPrev = wsClient.TotalBytesReceived; + + (TimeSpan cpuWaiting, TimeSpan cpuParsing, TimeSpan cpuHandling) cpuUsagePrev + = (wsClient.TotalWaitTime, wsClient.TotalParseTime, wsClient.TotalHandleTime); + + //TimeSpan totalCpuTimePrev = process.TotalProcessorTime; + await Task.Delay(1000); - var count = msgCount; - msgCount = 0; + (TimeSpan cpuWaiting, TimeSpan cpuParsing, TimeSpan cpuHandling) cpuUsage + = (wsClient.TotalWaitTime, wsClient.TotalParseTime, wsClient.TotalHandleTime); + + //TimeSpan totalCpuTime = process.TotalProcessorTime; + + var deltaCpuWaiting = cpuUsage.cpuWaiting - cpuUsagePrev.cpuWaiting; + var deltaCpuParsing = cpuUsage.cpuParsing - cpuUsagePrev.cpuParsing; + var deltaCpuHandling = cpuUsage.cpuHandling - cpuUsagePrev.cpuHandling; + //var deltaCpuTime = totalCpuTime - totalCpuTimePrev; + var deltaCpuTime = deltaCpuWaiting + deltaCpuParsing + deltaCpuHandling; + + + var cpuWaitingPercent = 100 * deltaCpuWaiting.TotalMilliseconds / deltaCpuTime.TotalMilliseconds; + var cpuParsingPercent = 100 * deltaCpuParsing.TotalMilliseconds / deltaCpuTime.TotalMilliseconds; + var cpuHandlingPercent = 100 * deltaCpuHandling.TotalMilliseconds / deltaCpuTime.TotalMilliseconds; + + + var msgCountOnInterval = msgCount - msgCountPrev; + var bytesCountOnInterval = wsClient.TotalBytesReceived - totalBytesReceivedPrev; var latencies = latencyList.Select(x => x.Item2 - x.Item1).ToList(); latencyList.Clear(); - Console.WriteLine($"Time: {DateTime.UtcNow}"); - var strbld = new StringBuilder(); - strbld.Append($"Subscribed to: subscribe_data_type = {subscribe_data_type}"); - if (!string.IsNullOrWhiteSpace(exchange)) - { - strbld.Append($", exchange = {exchange}"); - } - if (!string.IsNullOrWhiteSpace(asset)) - { - strbld.Append($", asset = {asset}"); - } - if (!string.IsNullOrWhiteSpace(symbol)) - { - strbld.Append($", symbol = {symbol}"); - } - strbld.AppendLine(""); - strbld.Append($"Processed messsages: {count}"); - + + + strbld.AppendFormat($"Messages: {msgCountOnInterval,-8}"); + strbld.AppendFormat($"| Recv bytes: {bytesCountOnInterval,-8}"); + strbld.Append($"| CPU: wait: {cpuWaitingPercent:F2}% | parse: {cpuParsingPercent:F2}% | process: {cpuHandlingPercent:F2}%"); + if (latencies.Any()) { - strbld.Append($" Latency min: {latencies.Min().TotalMilliseconds}ms, Latency max: {latencies.Max().TotalMilliseconds}ms"); + strbld.AppendFormat($"| Latency min: {latencies.Min().TotalMilliseconds,-8}ms"); + strbld.AppendFormat($"| max: {latencies.Max().TotalMilliseconds,-8}ms"); + } + + Console.WriteLine(strbld.ToString()); - + } } ); diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs b/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs index ca4379716c..3bd81f35e8 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs +++ b/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs @@ -11,6 +11,10 @@ namespace CoinAPI.WebSocket.V1 { public class CoinApiWsClient : ICoinApiWsClient, IDisposable { + private readonly Stopwatch _waitStopwatch = new Stopwatch(); + private readonly Stopwatch _parseStopwatch = new Stopwatch(); + private readonly Stopwatch _handleStopwatch = new Stopwatch(); + private static readonly int ReceiveBufferSize = 8192; private const string UrlProduction = "wss://ws.coinapi.io/"; @@ -24,20 +28,22 @@ public class CoinApiWsClient : ICoinApiWsClient, IDisposable private readonly TimeSpan _reconnectInterval = TimeSpan.FromSeconds(1); private int _hbLastAction; private int _hbLastActionMaxCount; + private Hello HelloMessage { get; set; } // client reference is leaked here only for testing purposes (forcing reconnects) #pragma warning disable IDE0069 // Disposable fields should be disposed protected ClientWebSocket _client = null; #pragma warning restore IDE0069 // Disposable fields should be disposed - private Hello HelloMessage { get; set; } + protected bool? ForceOverrideHeartbeat { get; set; } = true; public long UnprocessedMessagesQueueSize => _queueThread.QueueSize; public event EventHandler Error; public AutoResetEvent ConnectedEvent { get; } = new AutoResetEvent(false); public DateTime? ConnectedTime { get; private set; } - protected bool? ForceOverrideHeartbeat { get; set; } = true; - - + public ulong TotalBytesReceived { get; private set; } + public TimeSpan TotalWaitTime => _waitStopwatch.Elapsed; + public TimeSpan TotalParseTime => _parseStopwatch.Elapsed; + public TimeSpan TotalHandleTime => _handleStopwatch.Elapsed; public CoinApiWsClient(double hbTimeoutSecs, double reconnectIntervalSecs) : this(UrlProduction) { _hbTimeout = TimeSpan.FromSeconds(hbTimeoutSecs); @@ -78,8 +84,9 @@ public void SendHelloMessage(Hello msg) private void _queueThread_ItemDequeuedEvent(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); - + _parseStopwatch.Stop(); if (!data.type.TryParse(out var messageType)) { // unknown type @@ -129,65 +136,108 @@ private void _queueThread_ItemDequeuedEvent(object sender, MessageData item) private void HandleBookItem(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); OrderBookEvent?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleBook5Item(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); OrderBook5Event?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleBook20Item(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); OrderBook20Event?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleBook50Item(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); OrderBook50Event?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleBookL3Item(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); OrderBookL3Event?.Invoke(sender, data); + _handleStopwatch.Stop(); } - private void HandleOHLCVItem(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); OHLCVEvent?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleQuoteItem(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); QuoteEvent?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleTradeItem(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); TradeEvent?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleVolumeItem(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); VolumeEvent?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleExchangeRateItem(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); ExchangeRateEvent?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleTickerItem(object sender, MessageData item) { + _parseStopwatch.Start(); var data = JsonSerializer.Deserialize(item.Data); + _parseStopwatch.Stop(); + _handleStopwatch.Start(); TickerEvent?.Invoke(sender, data); + _handleStopwatch.Stop(); } private void HandleErrorItem(object sender, MessageData item) @@ -203,10 +253,16 @@ private async Task Connect() using (var connectionCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token)) { await HandleConnection(connectionCts); + ConnectedTime = null; + TotalBytesReceived = 0; + _waitStopwatch.Reset(); + _parseStopwatch.Reset(); + _handleStopwatch.Reset(); + connectionCts.Cancel(); } - + await Task.Delay(_reconnectInterval); } } @@ -261,7 +317,11 @@ private async Task HandleConnection(CancellationTokenSource connectionCts) await _client.SendAsync(helloAs, WebSocketMessageType.Text, true, connectionCts.Token); Interlocked.Exchange(ref _hbLastAction, 0); } + _waitStopwatch.Start(); var messageData = await WSUtils.ReceiveMessage(_client, connectionCts.Token, bufferArray); + _waitStopwatch.Stop(); + + TotalBytesReceived += (ulong)messageData.Data.Length; Interlocked.Exchange(ref _hbLastAction, 0); if (messageData.MessageType == WebSocketMessageType.Close) From 470b21a0d9efa21a8d25cc440f5c43a3398f7991 Mon Sep 17 00:00:00 2001 From: kgrudzien Date: Mon, 25 Mar 2024 14:54:37 +0100 Subject: [PATCH 3/6] fix --- .../Program.cs | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs index 1b5d2aed16..9011d806c1 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs @@ -38,7 +38,7 @@ internal class Program { { global::Endpoints.emea.ToString(), "ws://api-emea.coinapi.net" }, { global::Endpoints.apac.ToString(), "ws://api-apac.coinapi.net" }, - { global::Endpoints.ncsa.ToString(), "ws://api.ncsa.coinapi.net" } + { global::Endpoints.ncsa.ToString(), "ws://api-ncsa.coinapi.net" } }; static async Task Main(string[] args) { @@ -80,19 +80,29 @@ public async Task MakeRequest([FromService] IConfiguration configuration, string var typeNames = Enum.GetNames().ToList(); if (!typeNames.Any(x => x == subscribe_data_type)) { - Console.WriteLine($"Invalid subscribe_data_type, valid values: {string.Join(",", typeNames)}"); + Serilog.Log.Error($"Invalid subscribe_data_type, valid values: {string.Join(",", typeNames)}"); return; } var endpointNames = Enum.GetNames().ToList(); if (!string.IsNullOrWhiteSpace(endpoint_name) && !endpointNames.Any(x => x == endpoint_name)) { - Console.WriteLine($"Invalid endpoint_name, valid values: {string.Join(",", endpointNames)}"); + Serilog.Log.Error($"Invalid endpoint_name, valid values: {string.Join(",", endpointNames)}"); return; } + using (var wsClient = string.IsNullOrWhiteSpace(endpoint_name) ? new CoinApiWsClient() : new CoinApiWsClient(Endpoints[endpoint_name])) { int msgCount = 0; + int errorCount = 0; + + void WsClient_Error(object? sender, Exception e) + { + Serilog.Log.Error(e, "Error in websocket client"); + errorCount++; + } + + wsClient.Error += WsClient_Error; List<(DateTime, DateTime)> latencyList = new List<(DateTime, DateTime)>(); @@ -103,6 +113,7 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) { latencyList.Add((time_exchange.Value, time_coinapi.Value)); } + //Thread.Sleep(100); } switch (subscribe_data_type) @@ -152,10 +163,9 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) if (!wsClient.ConnectedEvent.WaitOne(10000)) return; var iterations = 0; - - Console.WriteLine($"Time: {DateTime.UtcNow}"); + var endpoint = (string.IsNullOrEmpty(endpoint_name) ? "global" : Endpoints[endpoint_name]); + Serilog.Log.Information($"Time: {DateTime.UtcNow}"); var strbld = new StringBuilder(); - strbld.AppendLine($"Endpoint: {(string.IsNullOrEmpty(endpoint_name) ? "global" : Endpoints[endpoint_name])}"); strbld.Append($"Subscribed to: subscribe_data_type = {subscribe_data_type}"); if (!string.IsNullOrWhiteSpace(exchange)) @@ -171,7 +181,7 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) strbld.Append($", symbol = {symbol}"); } - Console.WriteLine(strbld.ToString()); + Serilog.Log.Information(strbld.ToString()); var process = Process.GetCurrentProcess(); @@ -181,7 +191,7 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) if (iterations % 10 == 0) { - strbld.AppendLine(""); + Serilog.Log.Information($"Endpoint {endpoint}, {iterations} iterations, {msgCount} messages received, {wsClient.TotalBytesReceived} bytes received, Error count {errorCount}"); } iterations++; @@ -223,13 +233,13 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) if (latencies.Any()) { - strbld.AppendFormat($"| Latency min: {latencies.Min().TotalMilliseconds,-8}ms"); - strbld.AppendFormat($"| max: {latencies.Max().TotalMilliseconds,-8}ms"); + strbld.AppendFormat($" | Latency min: {latencies.Min().TotalMilliseconds,-8}ms"); + strbld.AppendFormat($" | max: {latencies.Max().TotalMilliseconds,-8}ms"); } - Console.WriteLine(strbld.ToString()); + Serilog.Log.Information(strbld.ToString()); } } From 4fe25a8968c52c8954a53adefa8f48e6556aec1f Mon Sep 17 00:00:00 2001 From: kgrudzien Date: Mon, 25 Mar 2024 15:45:26 +0100 Subject: [PATCH 4/6] Supress Heartbeat --- .../csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs | 5 +++-- data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs | 5 +++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs index 9011d806c1..1ad2c37d9a 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs @@ -75,7 +75,7 @@ static async Task Main(string[] args) public async Task MakeRequest([FromService] IConfiguration configuration, string endpoint_name = null, string subscribe_data_type = null, string asset = null, string symbol = null, - string exchange = null, string apikey = null, string type = "hello") + string exchange = null, string apikey = null, string type = "hello", bool supressHb = false) { var typeNames = Enum.GetNames().ToList(); if (!typeNames.Any(x => x == subscribe_data_type)) @@ -93,6 +93,7 @@ public async Task MakeRequest([FromService] IConfiguration configuration, string using (var wsClient = string.IsNullOrWhiteSpace(endpoint_name) ? new CoinApiWsClient() : new CoinApiWsClient(Endpoints[endpoint_name])) { + wsClient.SupressHeartbeat(supressHb); int msgCount = 0; int errorCount = 0; @@ -149,7 +150,7 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi) var hello = new Hello() { - apikey = new Guid(configuration["ApiKey"] ?? apikey ?? throw new ArgumentNullException("ApiKey is required")), + apikey = new Guid(apikey ?? configuration["ApiKey"] ?? throw new ArgumentNullException("ApiKey is required")), type = type, subscribe_data_type = new string[] { subscribe_data_type }, subscribe_filter_asset_id = string.IsNullOrWhiteSpace(asset) ? null : new string[] { asset }, diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs b/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs index 3bd81f35e8..51f27990aa 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs +++ b/data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs @@ -61,6 +61,11 @@ public CoinApiWsClient(string url) _url = url; } + public void SupressHeartbeat(bool supress) + { + ForceOverrideHeartbeat = !supress; + } + public void SendHelloMessage(Hello msg) { if (msg == null) From fd58bc6a5cabc99069139b4f487bbc8700e9226d Mon Sep 17 00:00:00 2001 From: kgrudzien Date: Mon, 25 Mar 2024 15:57:56 +0100 Subject: [PATCH 5/6] supress_hb --- data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs index 1ad2c37d9a..47a14a5864 100644 --- a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs @@ -75,7 +75,7 @@ static async Task Main(string[] args) public async Task MakeRequest([FromService] IConfiguration configuration, string endpoint_name = null, string subscribe_data_type = null, string asset = null, string symbol = null, - string exchange = null, string apikey = null, string type = "hello", bool supressHb = false) + string exchange = null, string apikey = null, string type = "hello", bool supress_hb = false) { var typeNames = Enum.GetNames().ToList(); if (!typeNames.Any(x => x == subscribe_data_type)) @@ -93,7 +93,7 @@ public async Task MakeRequest([FromService] IConfiguration configuration, string using (var wsClient = string.IsNullOrWhiteSpace(endpoint_name) ? new CoinApiWsClient() : new CoinApiWsClient(Endpoints[endpoint_name])) { - wsClient.SupressHeartbeat(supressHb); + wsClient.SupressHeartbeat(supress_hb); int msgCount = 0; int errorCount = 0; From 6f5dcaf6941d5b0292beb697a5a75a0ff5763b32 Mon Sep 17 00:00:00 2001 From: kgrudzien Date: Tue, 26 Mar 2024 12:35:41 +0100 Subject: [PATCH 6/6] README.md --- .../CoinAPI.WebSocket.Stats.Console/README.md | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/README.md diff --git a/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/README.md b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/README.md new file mode 100644 index 0000000000..25a7c94dc2 --- /dev/null +++ b/data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/README.md @@ -0,0 +1,80 @@ + +# CoinAPI WebSocket Client Application + +This application is a console-based tool for subscribing to various data types from the CoinAPI WebSocket service. It supports multiple subscription types and endpoints, and provides detailed output including message counts, received bytes, CPU usage statistics, and latency measurements. + +## Prerequisites + +- .NET 5.0 or higher +- A valid CoinAPI API key + +## Configuration + +Before running the application, ensure you have an `appsettings.json` file in the application's root directory with the following content: + +```json +{ + "ApiKey": "API_KEY_HERE" +} +``` + +Replace `API_KEY_HERE` with your actual CoinAPI API key. + +## Building the Application + +To build the application, navigate to the project directory in your terminal and run: + +```bash +dotnet build +``` + +This command compiles the application and prepares it for execution. + +## Running the Application + +Run the application with the following command: + +```bash +dotnet run --project CoinAPI.WebSocket.Stats.Console.csproj +``` + +You can also navigate to the `bin/Debug` or `bin/Release` directory and run the application directly using: + +```bash +dotnet CoinAPI.WebSocket.Stats.Console.dll +``` + +## Usage + +The application supports command-line arguments for specifying the endpoint, subscription data type, and other parameters. Here's how you can specify these parameters: + +```bash +dotnet run -- [options] +``` + +### Options + +- `--endpoint_name `: The name of the endpoint (emea, apac, ncsa). Default is the global endpoint. +- `--subscribe_data_type `: The type of data to subscribe to (e.g., book5, book20, quote, trade). Required. +- `--asset `: The asset identifier to filter data by. Optional. +- `--symbol `: The symbol identifier to filter data by. Optional. +- `--exchange `: The exchange identifier to filter data by. Optional. +- `--apikey `: Your CoinAPI API key. Optional if specified in `appsettings.json`. +- `--type `: The type of message to send. Default is "hello". +- `--supress_hb `: Whether to suppress heartbeat messages. Default is false. + +### Example + +```bash +dotnet run -- --endpoint_name emea --subscribe_data_type quote --symbol COINBASE_SPOT_BTC_USD$ +``` + +This command subscribes to quote data for the BTC_USD spot symbol in Coinbase exchange from the EMEA endpoint. + +## Logging + +The application logs detailed information to the console, including subscribed data types, message counts, CPU usage, and latencies. + +## Stopping the Application + +To stop the application, press `Ctrl+C` or any key to terminate the execution.