diff --git a/README.md b/README.md index 7635676..f667896 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,9 @@ schematic.Identify( { "is_staff", false } } ); + +// to guarantee that all events are sent before the application exits, call this method before your program shuts down +await schematic.Shutdown(); ``` This call is non-blocking and there is no response to check. @@ -76,6 +79,9 @@ schematic.Track( user: new Dictionary { { "user_id", "your-user-id" } }, company: new Dictionary { { "id", "your-company-id" } } ); + +// to guarantee that all events are sent before the application exits, call this method before your program shuts down +await schematic.Shutdown(); ``` This call is non-blocking and there is no response to check. @@ -328,7 +334,7 @@ try { System.Console.WriteLine(e.Message) System.Console.WriteLine(e.StatusCode) } -``` +``` ## Contributing While we value open-source contributions to this SDK, this library diff --git a/src/SchematicHQ.Client.Test/TestClient.cs b/src/SchematicHQ.Client.Test/TestClient.cs index 4e67f9d..bf95f6e 100644 --- a/src/SchematicHQ.Client.Test/TestClient.cs +++ b/src/SchematicHQ.Client.Test/TestClient.cs @@ -34,6 +34,20 @@ private HttpResponseMessage CreateCheckFlagResponse(HttpStatusCode code, bool fl }; } + private HttpResponseMessage CreateEventBatchResponse(HttpStatusCode code) + { + var response = new CreateEventBatchResponse{ + Data = new RawEventBatchResponseData{ + Events = new List() + } + }; + var serializedResponse = JsonSerializer.Serialize(response, new JsonSerializerOptions { WriteIndented = true }); + return new HttpResponseMessage(code) + { + Content = new StringContent(serializedResponse, Encoding.UTF8, "application/json") + }; + } + private void SetupSchematicTestClient(bool isOffline, HttpResponseMessage response, Dictionary? flagDefaults = null) { HttpClient testClient = new HttpClient(new OfflineHttpMessageHandler()); @@ -64,9 +78,9 @@ public void Setup() } [TearDown] - public void TearDown() + public async Task TearDown() { - if (_schematic != null) _schematic.Shutdown(); + if (_schematic != null) await _schematic.Shutdown(); } [Test] @@ -150,7 +164,7 @@ public async Task CheckFlag_LogsErrorAndReturnsDefaultOnException() public async Task Identify_EnqueuesEventNonBlocking() { // Arrange - SetupSchematicTestClient(isOffline: false, response: null); + SetupSchematicTestClient(isOffline: false, response: CreateEventBatchResponse(HttpStatusCode.OK)); var keys = new Dictionary { { "user_id", "12345" } }; var company = new EventBodyIdentifyCompany { Name = "test_company" }; @@ -167,7 +181,7 @@ public async Task Identify_EnqueuesEventNonBlocking() public async Task Track_EnqueuesEventNonBlocking() { // Arrange - SetupSchematicTestClient(isOffline: false, response: CreateCheckFlagResponse(HttpStatusCode.OK, false)); + SetupSchematicTestClient(isOffline: false, response: CreateEventBatchResponse(HttpStatusCode.OK)); var company = new Dictionary { { "company_id", "67890" } }; var user = new Dictionary { { "user_id", "12345" } }; diff --git a/src/SchematicHQ.Client.Test/TestEventBuffer.cs b/src/SchematicHQ.Client.Test/TestEventBuffer.cs index 08ca4b0..ef269c1 100644 --- a/src/SchematicHQ.Client.Test/TestEventBuffer.cs +++ b/src/SchematicHQ.Client.Test/TestEventBuffer.cs @@ -26,9 +26,9 @@ public void SetUp() } [TearDown] - public void TearDown() + public async Task TearDown() { - _buffer.Dispose(); + await _buffer.Stop(); } [Test] @@ -39,11 +39,19 @@ public void Start_BufferStartsSuccessfully() } [Test] - public void Stop_BufferStopsSuccessfully() + public async Task Stop_BufferStopsSuccessfully() { _buffer.Start(); - _buffer.Stop(); + + await _buffer.Stop(); + Assert.Throws(() => _buffer.Push(1)); + + var semaphore = GetPrivateFieldValue(_buffer, "_semaphore"); + var cts = GetPrivateFieldValue(_buffer, "_cts"); + + Assert.That(IsSemaphoreSlimDisposed(semaphore), Is.True, "SemaphoreSlim was not disposed."); + Assert.That(IsCancellationTokenSourceDisposed(cts), Is.True, "CancellationTokenSource was not disposed."); } [Test] @@ -68,23 +76,6 @@ public async Task Flush_ManualFlushWorks() Assert.That(_processedItems[0], Is.EqualTo(1)); } - [Test] - public void Dispose_BufferDisposesSuccessfully() - { - // Arrange - _buffer.Start(); - - // Act - Assert.DoesNotThrow(() => _buffer.Dispose()); - - // Assert - var semaphore = GetPrivateFieldValue(_buffer, "_semaphore"); - var cts = GetPrivateFieldValue(_buffer, "_cts"); - - Assert.That(IsSemaphoreSlimDisposed(semaphore), Is.True, "SemaphoreSlim was not disposed."); - Assert.That(IsCancellationTokenSourceDisposed(cts), Is.True, "CancellationTokenSource was not disposed."); - } - [Test] public void PeriodicFlush_FlushesAtInterval() { @@ -157,7 +148,7 @@ public async Task StartAndStop_ConcurrencyTest() { _buffer.Start(); await Task.Delay(10); - _buffer.Stop(); + await _buffer.Stop(); }).ToArray(); await Task.WhenAll(startStopTasks); @@ -236,4 +227,4 @@ private T GetPrivateFieldValue(object obj, string fieldName) return (T)field?.GetValue(obj); } } -} \ No newline at end of file +} diff --git a/src/SchematicHQ.Client/Core/ClientOptionsCustom.cs b/src/SchematicHQ.Client/Core/ClientOptionsCustom.cs index 3f2b83b..23f6499 100644 --- a/src/SchematicHQ.Client/Core/ClientOptionsCustom.cs +++ b/src/SchematicHQ.Client/Core/ClientOptionsCustom.cs @@ -25,9 +25,9 @@ public static ClientOptions WithHttpClient(this ClientOptions options, HttpClien HttpClient = httpClient, MaxRetries = options.MaxRetries, TimeoutInSeconds = options.TimeoutInSeconds, - FlagDefaults = options.FlagDefaults, - Logger = options.Logger, - CacheProviders = options.CacheProviders, + FlagDefaults = options.FlagDefaults ?? new Dictionary(), + Logger = options.Logger ?? new ConsoleLogger(), + CacheProviders = options.CacheProviders ?? new List>(), Offline = options.Offline, DefaultEventBufferPeriod = options.DefaultEventBufferPeriod, EventBuffer = options.EventBuffer diff --git a/src/SchematicHQ.Client/EventBuffer.cs b/src/SchematicHQ.Client/EventBuffer.cs index 0d9a076..13e5ad7 100644 --- a/src/SchematicHQ.Client/EventBuffer.cs +++ b/src/SchematicHQ.Client/EventBuffer.cs @@ -4,11 +4,11 @@ namespace SchematicHQ.Client; -public interface IEventBuffer : IDisposable +public interface IEventBuffer { void Push(T item); void Start(); - void Stop(); + Task Stop(); Task Flush(); int GetEventCount(); } @@ -17,7 +17,7 @@ public class EventBuffer : IEventBuffer { private const int DefaultMaxSize = 100; private static readonly TimeSpan DefaultFlushPeriod = TimeSpan.FromMilliseconds(5000); - private const int MaxWaitForBuffer = 3; //seconds to wait for event buffer to flush on Stop and Shutdown + private const int MaxWaitForBuffer = 3; //seconds to wait for event buffer to flush on Stop private readonly int _maxSize; private readonly TimeSpan _flushPeriod; @@ -43,7 +43,41 @@ public EventBuffer(Func, Task> action, ISchematicLogger logger, int maxS _cts = new CancellationTokenSource(); _isRunning = false; - _logger.Info("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod); + _logger.Debug("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod); + + AppDomain.CurrentDomain.ProcessExit += async (s, e) => { + await EmergencyFlush(); + }; + + Console.CancelKeyPress += async (s, e) => { + e.Cancel = true; // Prevent immediate termination + await EmergencyFlush(); + }; + } + + private async Task EmergencyFlush() + { + try + { + _logger.Debug("Emergency flush triggered by program termination"); + // Don't check _isRunning here since we're in an emergency shutdown + var items = new List(); + while (_queue.TryDequeue(out var item)) + { + items.Add(item); + } + + if (items.Count > 0) + { + _logger.Info("Emergency flushing {0} items", items.Count); + await _action(items); + } + _logger.Info("Emergency flush completed"); + } + catch (Exception ex) + { + _logger.Error("Error during emergency flush: {0}", ex.Message); + } } public void Push(T item) @@ -86,32 +120,6 @@ public void Start() _logger.Info("EventBuffer started."); } - public void Stop() - { - lock (_runningLock) - { - if (!_isRunning) return; - - _isRunning = false; - _cts.Cancel(); - } - - // Wait for a maximum of MaxWaitForBuffer seconds for the periodic flush task to complete - if (_periodicFlushTask != Task.CompletedTask) - { - try - { - _periodicFlushTask.Wait(TimeSpan.FromSeconds(MaxWaitForBuffer)); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException)) - { - _logger.Warn("Periodic flush task was canceled."); - } - } - - _logger.Info("EventBuffer stopped."); - } - public async Task Flush() { lock (_runningLock) @@ -178,57 +186,45 @@ private async Task FlushBufferAsync() } } - public void Dispose() + public async Task Stop() { + if (!_isRunning) return; + try { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - // The CancellationTokenSource has already been disposed - } + await Flush(); - // Wait for a maximum of MaxWaitForBuffer seconds for the periodic flush task to complete - if (_periodicFlushTask != Task.CompletedTask) - { - try + lock (_runningLock) { - _periodicFlushTask.Wait(TimeSpan.FromSeconds(MaxWaitForBuffer)); + _isRunning = false; + _cts.Cancel(); } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException)) + + if (_periodicFlushTask != Task.CompletedTask) { - _logger.Warn("Periodic flush task was canceled."); + var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer)); + await Task.WhenAny(_periodicFlushTask, timeoutTask); } - catch (ObjectDisposedException) + + if (_processBufferTask != Task.CompletedTask) { - _logger.Warn("Periodic flush task's CancellationTokenSource was disposed."); + var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer)); + await Task.WhenAny(_processBufferTask, timeoutTask); } - } - if (_processBufferTask != Task.CompletedTask) + _semaphore.Dispose(); + _cts.Dispose(); + _logger.Info("EventBuffer shut down cleanly."); + } + catch (Exception ex) { - try - { - _processBufferTask.Wait(TimeSpan.FromSeconds(5)); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException)) - { - _logger.Warn("Process buffer task was canceled."); - } - catch (ObjectDisposedException) - { - _logger.Warn("Process buffer task's CancellationTokenSource was disposed."); - } + _logger.Error("Error during shutdown: {0}", ex.Message); + throw; } - - _semaphore.Dispose(); - _cts.Dispose(); - _logger.Info("EventBuffer disposed."); } public int GetEventCount() { return _queue.Count; } -} \ No newline at end of file +} diff --git a/src/SchematicHQ.Client/Schematic.cs b/src/SchematicHQ.Client/Schematic.cs index 2a58c8c..e469d1e 100644 --- a/src/SchematicHQ.Client/Schematic.cs +++ b/src/SchematicHQ.Client/Schematic.cs @@ -37,7 +37,7 @@ public Schematic(string apiKey, ClientOptions? options = null) await API.Events.CreateEventBatchAsync(request); }, _logger, - flushPeriod: options.DefaultEventBufferPeriod // default flush period + flushPeriod: _options.DefaultEventBufferPeriod ); _eventBuffer.Start(); @@ -47,9 +47,12 @@ public Schematic(string apiKey, ClientOptions? options = null) }; } - public void Shutdown() + public async Task Shutdown() { - _eventBuffer.Dispose(); + if (_eventBuffer != null) + { + await _eventBuffer.Stop(); + } } public async Task CheckFlag(string flagKey, Dictionary? company = null, Dictionary? user = null)