Skip to content

Commit

Permalink
Buffer shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bpapillon committed Jan 20, 2025
1 parent eb042da commit c9fdc45
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 99 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ schematic.Identify(
{ "is_staff", false }
}
);

// to guarantee that all events are sent before the application exits, call this method before your program shuts down
await schematic.Shutdown();
```

This call is non-blocking and there is no response to check.
Expand All @@ -76,6 +79,9 @@ schematic.Track(
user: new Dictionary<string, string> { { "user_id", "your-user-id" } },
company: new Dictionary<string, string> { { "id", "your-company-id" } }
);

// to guarantee that all events are sent before the application exits, call this method before your program shuts down
await schematic.Shutdown();
```

This call is non-blocking and there is no response to check.
Expand Down Expand Up @@ -328,7 +334,7 @@ try {
System.Console.WriteLine(e.Message)
System.Console.WriteLine(e.StatusCode)
}
```
```

## Contributing
While we value open-source contributions to this SDK, this library
Expand Down
22 changes: 18 additions & 4 deletions src/SchematicHQ.Client.Test/TestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ private HttpResponseMessage CreateCheckFlagResponse(HttpStatusCode code, bool fl
};
}

private HttpResponseMessage CreateEventBatchResponse(HttpStatusCode code)
{
var response = new CreateEventBatchResponse{
Data = new RawEventBatchResponseData{
Events = new List<RawEventResponseData>()
}
};
var serializedResponse = JsonSerializer.Serialize(response, new JsonSerializerOptions { WriteIndented = true });
return new HttpResponseMessage(code)
{
Content = new StringContent(serializedResponse, Encoding.UTF8, "application/json")
};
}

private void SetupSchematicTestClient(bool isOffline, HttpResponseMessage response, Dictionary<string, bool>? flagDefaults = null)
{
HttpClient testClient = new HttpClient(new OfflineHttpMessageHandler());
Expand Down Expand Up @@ -64,9 +78,9 @@ public void Setup()
}

[TearDown]
public void TearDown()
public async Task TearDown()
{
if (_schematic != null) _schematic.Shutdown();
if (_schematic != null) await _schematic.Shutdown();
}

[Test]
Expand Down Expand Up @@ -150,7 +164,7 @@ public async Task CheckFlag_LogsErrorAndReturnsDefaultOnException()
public async Task Identify_EnqueuesEventNonBlocking()
{
// Arrange
SetupSchematicTestClient(isOffline: false, response: null);
SetupSchematicTestClient(isOffline: false, response: CreateEventBatchResponse(HttpStatusCode.OK));
var keys = new Dictionary<string, string> { { "user_id", "12345" } };
var company = new EventBodyIdentifyCompany { Name = "test_company" };

Expand All @@ -167,7 +181,7 @@ public async Task Identify_EnqueuesEventNonBlocking()
public async Task Track_EnqueuesEventNonBlocking()
{
// Arrange
SetupSchematicTestClient(isOffline: false, response: CreateCheckFlagResponse(HttpStatusCode.OK, false));
SetupSchematicTestClient(isOffline: false, response: CreateEventBatchResponse(HttpStatusCode.OK));
var company = new Dictionary<string, string> { { "company_id", "67890" } };
var user = new Dictionary<string, string> { { "user_id", "12345" } };

Expand Down
37 changes: 14 additions & 23 deletions src/SchematicHQ.Client.Test/TestEventBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public void SetUp()
}

[TearDown]
public void TearDown()
public async Task TearDown()
{
_buffer.Dispose();
await _buffer.Stop();
}

[Test]
Expand All @@ -39,11 +39,19 @@ public void Start_BufferStartsSuccessfully()
}

[Test]
public void Stop_BufferStopsSuccessfully()
public async Task Stop_BufferStopsSuccessfully()
{
_buffer.Start();
_buffer.Stop();

await _buffer.Stop();

Assert.Throws<InvalidOperationException>(() => _buffer.Push(1));

var semaphore = GetPrivateFieldValue<SemaphoreSlim>(_buffer, "_semaphore");
var cts = GetPrivateFieldValue<CancellationTokenSource>(_buffer, "_cts");

Assert.That(IsSemaphoreSlimDisposed(semaphore), Is.True, "SemaphoreSlim was not disposed.");
Assert.That(IsCancellationTokenSourceDisposed(cts), Is.True, "CancellationTokenSource was not disposed.");
}

[Test]
Expand All @@ -68,23 +76,6 @@ public async Task Flush_ManualFlushWorks()
Assert.That(_processedItems[0], Is.EqualTo(1));
}

[Test]
public void Dispose_BufferDisposesSuccessfully()
{
// Arrange
_buffer.Start();

// Act
Assert.DoesNotThrow(() => _buffer.Dispose());

// Assert
var semaphore = GetPrivateFieldValue<SemaphoreSlim>(_buffer, "_semaphore");
var cts = GetPrivateFieldValue<CancellationTokenSource>(_buffer, "_cts");

Assert.That(IsSemaphoreSlimDisposed(semaphore), Is.True, "SemaphoreSlim was not disposed.");
Assert.That(IsCancellationTokenSourceDisposed(cts), Is.True, "CancellationTokenSource was not disposed.");
}

[Test]
public void PeriodicFlush_FlushesAtInterval()
{
Expand Down Expand Up @@ -157,7 +148,7 @@ public async Task StartAndStop_ConcurrencyTest()
{
_buffer.Start();
await Task.Delay(10);
_buffer.Stop();
await _buffer.Stop();
}).ToArray();

await Task.WhenAll(startStopTasks);
Expand Down Expand Up @@ -236,4 +227,4 @@ private T GetPrivateFieldValue<T>(object obj, string fieldName)
return (T)field?.GetValue(obj);
}
}
}
}
6 changes: 3 additions & 3 deletions src/SchematicHQ.Client/Core/ClientOptionsCustom.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public static ClientOptions WithHttpClient(this ClientOptions options, HttpClien
HttpClient = httpClient,
MaxRetries = options.MaxRetries,
TimeoutInSeconds = options.TimeoutInSeconds,
FlagDefaults = options.FlagDefaults,
Logger = options.Logger,
CacheProviders = options.CacheProviders,
FlagDefaults = options.FlagDefaults ?? new Dictionary<string, bool>(),
Logger = options.Logger ?? new ConsoleLogger(),
CacheProviders = options.CacheProviders ?? new List<ICacheProvider<bool?>>(),
Offline = options.Offline,
DefaultEventBufferPeriod = options.DefaultEventBufferPeriod,
EventBuffer = options.EventBuffer
Expand Down
126 changes: 61 additions & 65 deletions src/SchematicHQ.Client/EventBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace SchematicHQ.Client;

public interface IEventBuffer<T> : IDisposable
public interface IEventBuffer<T>
{
void Push(T item);
void Start();
void Stop();
Task Stop();
Task Flush();
int GetEventCount();
}
Expand All @@ -17,7 +17,7 @@ public class EventBuffer<T> : IEventBuffer<T>
{
private const int DefaultMaxSize = 100;
private static readonly TimeSpan DefaultFlushPeriod = TimeSpan.FromMilliseconds(5000);
private const int MaxWaitForBuffer = 3; //seconds to wait for event buffer to flush on Stop and Shutdown
private const int MaxWaitForBuffer = 3; //seconds to wait for event buffer to flush on Stop

private readonly int _maxSize;
private readonly TimeSpan _flushPeriod;
Expand All @@ -43,7 +43,41 @@ public EventBuffer(Func<List<T>, Task> action, ISchematicLogger logger, int maxS
_cts = new CancellationTokenSource();
_isRunning = false;

_logger.Info("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod);
_logger.Debug("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod);

AppDomain.CurrentDomain.ProcessExit += async (s, e) => {
await EmergencyFlush();
};

Console.CancelKeyPress += async (s, e) => {
e.Cancel = true; // Prevent immediate termination
await EmergencyFlush();
};
}

private async Task EmergencyFlush()
{
try
{
_logger.Debug("Emergency flush triggered by program termination");
// Don't check _isRunning here since we're in an emergency shutdown
var items = new List<T>();
while (_queue.TryDequeue(out var item))
{
items.Add(item);
}

if (items.Count > 0)
{
_logger.Info("Emergency flushing {0} items", items.Count);
await _action(items);
}
_logger.Info("Emergency flush completed");
}
catch (Exception ex)
{
_logger.Error("Error during emergency flush: {0}", ex.Message);
}
}

public void Push(T item)
Expand Down Expand Up @@ -86,32 +120,6 @@ public void Start()
_logger.Info("EventBuffer started.");
}

public void Stop()
{
lock (_runningLock)
{
if (!_isRunning) return;

_isRunning = false;
_cts.Cancel();
}

// Wait for a maximum of MaxWaitForBuffer seconds for the periodic flush task to complete
if (_periodicFlushTask != Task.CompletedTask)
{
try
{
_periodicFlushTask.Wait(TimeSpan.FromSeconds(MaxWaitForBuffer));
}
catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException))
{
_logger.Warn("Periodic flush task was canceled.");
}
}

_logger.Info("EventBuffer stopped.");
}

public async Task Flush()
{
lock (_runningLock)
Expand Down Expand Up @@ -178,57 +186,45 @@ private async Task FlushBufferAsync()
}
}

public void Dispose()
public async Task Stop()
{
if (!_isRunning) return;

try
{
_cts.Cancel();
}
catch (ObjectDisposedException)
{
// The CancellationTokenSource has already been disposed
}
await Flush();

// Wait for a maximum of MaxWaitForBuffer seconds for the periodic flush task to complete
if (_periodicFlushTask != Task.CompletedTask)
{
try
lock (_runningLock)
{
_periodicFlushTask.Wait(TimeSpan.FromSeconds(MaxWaitForBuffer));
_isRunning = false;
_cts.Cancel();
}
catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException))

if (_periodicFlushTask != Task.CompletedTask)
{
_logger.Warn("Periodic flush task was canceled.");
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer));
await Task.WhenAny(_periodicFlushTask, timeoutTask);
}
catch (ObjectDisposedException)

if (_processBufferTask != Task.CompletedTask)
{
_logger.Warn("Periodic flush task's CancellationTokenSource was disposed.");
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer));
await Task.WhenAny(_processBufferTask, timeoutTask);
}
}

if (_processBufferTask != Task.CompletedTask)
_semaphore.Dispose();
_cts.Dispose();
_logger.Info("EventBuffer shut down cleanly.");
}
catch (Exception ex)
{
try
{
_processBufferTask.Wait(TimeSpan.FromSeconds(5));
}
catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is TaskCanceledException))
{
_logger.Warn("Process buffer task was canceled.");
}
catch (ObjectDisposedException)
{
_logger.Warn("Process buffer task's CancellationTokenSource was disposed.");
}
_logger.Error("Error during shutdown: {0}", ex.Message);
throw;
}

_semaphore.Dispose();
_cts.Dispose();
_logger.Info("EventBuffer disposed.");
}

public int GetEventCount()
{
return _queue.Count;
}
}
}
9 changes: 6 additions & 3 deletions src/SchematicHQ.Client/Schematic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Schematic(string apiKey, ClientOptions? options = null)
await API.Events.CreateEventBatchAsync(request);
},
_logger,
flushPeriod: options.DefaultEventBufferPeriod // default flush period
flushPeriod: _options.DefaultEventBufferPeriod
);
_eventBuffer.Start();

Expand All @@ -47,9 +47,12 @@ public Schematic(string apiKey, ClientOptions? options = null)
};
}

public void Shutdown()
public async Task Shutdown()
{
_eventBuffer.Dispose();
if (_eventBuffer != null)
{
await _eventBuffer.Stop();
}
}

public async Task<bool> CheckFlag(string flagKey, Dictionary<string, string>? company = null, Dictionary<string, string>? user = null)
Expand Down

0 comments on commit c9fdc45

Please sign in to comment.