diff --git a/.fernignore b/.fernignore index 118c630..36ac0b1 100644 --- a/.fernignore +++ b/.fernignore @@ -1,2 +1,10 @@ # Specify files that shouldn't be modified by Fern README.md + +src/SchematicHQ.Client.Test/TestCache.cs +src/SchematicHQ.Client.Test/TestEventBuffer.cs +src/SchematicHQ.Client/Cache.cs +src/SchematicHQ.Client/Core/ClientOptionsCustom.cs +src/SchematicHQ.Client/EventBuffer.cs +src/SchematicHQ.Client/Logger.cs +src/SchematicHQ.Client/Schematic.cs diff --git a/src/SchematicHQ.Client.Test/SchematicHQ.Client.Test.csproj b/src/SchematicHQ.Client.Test/SchematicHQ.Client.Test.csproj index 4241ce0..65b5232 100644 --- a/src/SchematicHQ.Client.Test/SchematicHQ.Client.Test.csproj +++ b/src/SchematicHQ.Client.Test/SchematicHQ.Client.Test.csproj @@ -1,7 +1,7 @@ - net7.0 + net8.0 enable enable @@ -21,4 +21,4 @@ - \ No newline at end of file + diff --git a/src/SchematicHQ.Client.Test/TestCache.cs b/src/SchematicHQ.Client.Test/TestCache.cs new file mode 100644 index 0000000..9a15286 --- /dev/null +++ b/src/SchematicHQ.Client.Test/TestCache.cs @@ -0,0 +1,80 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +#nullable enable + +namespace SchematicHQ.Client.Test; + +public class TestCache +{ + [Fact] + public void TestCacheGetAndSet() + { + var cache = new LocalCache(maxSize: 1024, ttl: 5); + + cache.Set("key1", "value1"); + Assert.Equal("value1", cache.Get("key1")); + + cache.Set("key2", "value2", ttlOverride: 1); + Assert.Equal("value2", cache.Get("key2")); + + // Wait for the TTL to expire + Task.Delay(TimeSpan.FromSeconds(2)).Wait(); + Assert.Null(cache.Get("key2")); + } + + [Fact] + public void TestCacheEviction() + { + var cache = new LocalCache(maxSize: 100, ttl: 5); + + cache.Set("key1", "longvalue1"); + cache.Set("key2", "longvalue2"); + cache.Set("key3", "shortvalue"); + + // Least recently used item should be evicted + Assert.Null(cache.Get("key1")); + Assert.Equal("longvalue2", cache.Get("key2")); + Assert.Equal("shortvalue", cache.Get("key3")); + } + + [Fact] + public void TestCacheConcurrency() + { + var cache = new LocalCache(maxSize: 1024, ttl: 5); + + // Simulate concurrent accesses to the cache + Parallel.For(0, 1000, i => + { + cache.Set($"key{i}", i); + Assert.Equal(i, cache.Get($"key{i}")); + }); + } + + [Fact] + public void TestCacheExpiration() + { + var cache = new LocalCache(maxSize: 1024, ttl: 1); + + cache.Set("key1", "value1"); + Assert.Equal("value1", cache.Get("key1")); + + // Wait for the TTL to expire + Task.Delay(TimeSpan.FromSeconds(2)).Wait(); + Assert.Null(cache.Get("key1")); + } + + [Fact] + public void TestCacheMaxSize() + { + var cache = new LocalCache(maxSize: 50, ttl: 5); + + cache.Set("key1", "longvalue1"); + cache.Set("key2", "longvalue2"); + + // Cache size exceeds max size, oldest item should be evicted + Assert.Null(cache.Get("key1")); + Assert.Equal("longvalue2", cache.Get("key2")); + } +} diff --git a/src/SchematicHQ.Client.Test/TestClient.cs b/src/SchematicHQ.Client.Test/TestClient.cs index 22baeb9..b266c15 100644 --- a/src/SchematicHQ.Client.Test/TestClient.cs +++ b/src/SchematicHQ.Client.Test/TestClient.cs @@ -1,3 +1,5 @@ namespace SchematicHQ.Client.Test; +#nullable enable + public class TestClient { } diff --git a/src/SchematicHQ.Client.Test/TestEventBuffer.cs b/src/SchematicHQ.Client.Test/TestEventBuffer.cs new file mode 100644 index 0000000..04c5938 --- /dev/null +++ b/src/SchematicHQ.Client.Test/TestEventBuffer.cs @@ -0,0 +1,81 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using Moq; +using Xunit; + +#nullable enable + +namespace SchematicHQ.Client.Tests; + +public class TestEventBuffer +{ + [Fact] + public void Push_Should_Add_Event_To_Buffer() + { + // Arrange + var eventsApiMock = new Mock(); + var loggerMock = new Mock(); + var eventBuffer = new EventBuffer(eventsApiMock.Object, loggerMock.Object); + var @event = new CreateEventRequestBody(); + + // Act + eventBuffer.Push(@event); + + // Assert + // Since the events list is private, we can't directly assert its contents. + // However, we can verify that the CreateEventBatch method was called with the expected event. + eventsApiMock.Verify(api => api.CreateEventBatch(It.Is>(list => list.Contains(@event))), Times.Once); + } + + [Fact] + public void Push_Should_Flush_Events_When_Buffer_Size_Exceeded() + { + // Arrange + var eventsApiMock = new Mock(); + var loggerMock = new Mock(); + var eventBuffer = new EventBuffer(eventsApiMock.Object, loggerMock.Object, period: 10); + var @event = new CreateEventRequestBody(); + + // Act + for (int i = 0; i < 15; i++) + { + eventBuffer.Push(@event); + } + + // Assert + // Verify that the CreateEventBatch method was called multiple times due to buffer size being exceeded. + eventsApiMock.Verify(api => api.CreateEventBatch(It.IsAny>()), Times.AtLeast(2)); + } + + [Fact] + public void Stop_Should_Prevent_Further_Events_From_Being_Pushed() + { + // Arrange + var eventsApiMock = new Mock(); + var loggerMock = new Mock(); + var eventBuffer = new EventBuffer(eventsApiMock.Object, loggerMock.Object); + var @event = new CreateEventRequestBody(); + + // Act + eventBuffer.Stop(); + eventBuffer.Push(@event); + + // Assert + eventsApiMock.Verify(api => api.CreateEventBatch(It.IsAny>()), Times.Never); + loggerMock.Verify(logger => logger.Error(It.Is(msg => msg.Contains("Event buffer is stopped")), null), Times.Once); + } + + [Fact] + public void Dispose_Should_Stop_Event_Buffer() + { + // Arrange + var eventsApiMock = new Mock(); + var loggerMock = new Mock(); + var eventBuffer = new EventBuffer(eventsApiMock.Object, loggerMock.Object); + + // Act + eventBuffer.Dispose(); + Thread.Sleep(100); // Wait for a short duration to allow the background thread to stop + } +} diff --git a/src/SchematicHQ.Client/Cache.cs b/src/SchematicHQ.Client/Cache.cs new file mode 100644 index 0000000..7b223f5 --- /dev/null +++ b/src/SchematicHQ.Client/Cache.cs @@ -0,0 +1,147 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; + +#nullable enable + +namespace SchematicHQ.Client; + +public interface ICacheProvider +{ + T? Get(string key); + void Set(string key, T val, int? ttlOverride = null); +} + +public class CachedItem +{ + public T Value { get; } + public int AccessCounter { get; set; } + public int Size { get; } + public DateTime Expiration { get; } + + public CachedItem(T value, int accessCounter, int size, DateTime expiration) + { + Value = value; + AccessCounter = accessCounter; + Size = size; + Expiration = expiration; + } +} + +public class LocalCache : ICacheProvider +{ + private const int DEFAULT_CACHE_SIZE = 10 * 1024; // 10KB + private const int DEFAULT_CACHE_TTL = 5; // 5 seconds + + private readonly ConcurrentDictionary> _cache; + private readonly object _lockObject = new object(); + private readonly int _maxSize; + private int _currentSize; + private int _accessCounter; + private readonly int _ttl; + + public LocalCache(int maxSize = DEFAULT_CACHE_SIZE, int ttl = DEFAULT_CACHE_TTL) + { + _cache = new ConcurrentDictionary>(); + _maxSize = maxSize; + _ttl = ttl; + } + + public T? Get(string key) + { + if (_maxSize == 0) + return default; + + if (!_cache.TryGetValue(key, out var item)) + return default; + + // Check if the item has expired + if (DateTime.UtcNow > item.Expiration) + { + lock (_lockObject) + { + if (_cache.TryRemove(key, out var removedItem)) + { + Interlocked.Add(ref _currentSize, -removedItem.Size); + } + } + return default; + } + + // Update the access counter for LRU eviction + Interlocked.Increment(ref _accessCounter); + item.AccessCounter = _accessCounter; + _cache[key] = item; + + return item.Value; + } + + public void Set(string key, T val, int? ttlOverride = null) + { + if (_maxSize == 0) + return; + + var ttl = ttlOverride ?? _ttl; + var size = GetObjectSize(val); + + lock (_lockObject) + { + // Check if the key already exists in the cache + if (_cache.TryGetValue(key, out var item)) + { + Interlocked.Add(ref _currentSize, size - item.Size); + Interlocked.Increment(ref _accessCounter); + _cache[key] = new CachedItem(val, _accessCounter, size, DateTime.UtcNow.AddSeconds(ttl)); + return; + } + + // Evict expired items + foreach (var kvp in _cache) + { + if (DateTime.UtcNow > kvp.Value.Expiration) + { + if (_cache.TryRemove(kvp.Key, out var removedItem)) + { + Interlocked.Add(ref _currentSize, -removedItem.Size); + } + } + } + + // Evict records if the cache size exceeds the max size + while (_currentSize + size > _maxSize) + { + string? oldestKey = null; + var oldestAccessCounter = int.MaxValue; + + foreach (var kvp in _cache) + { + if (kvp.Value.AccessCounter < oldestAccessCounter) + { + oldestKey = kvp.Key; + oldestAccessCounter = kvp.Value.AccessCounter; + } + } + + if (oldestKey != null && _cache.TryRemove(oldestKey, out var removedItem)) + { + Interlocked.Add(ref _currentSize, -removedItem.Size); + } + else + { + break; + } + } + + // Add the new item to the cache + Interlocked.Increment(ref _accessCounter); + _cache[key] = new CachedItem(val, _accessCounter, size, DateTime.UtcNow.AddSeconds(ttl)); + Interlocked.Add(ref _currentSize, size); + } + } + + private static int GetObjectSize(T obj) + { + return System.Runtime.InteropServices.Marshal.SizeOf(obj); + } +} diff --git a/src/SchematicHQ.Client/Core/ClientOptionsCustom.cs b/src/SchematicHQ.Client/Core/ClientOptionsCustom.cs new file mode 100644 index 0000000..4cf3bf0 --- /dev/null +++ b/src/SchematicHQ.Client/Core/ClientOptionsCustom.cs @@ -0,0 +1,35 @@ +using SchematicHQ.Client.Core; + +#nullable enable + +namespace SchematicHQ.Client; + +public partial class ClientOptions +{ + public Dictionary FlagDefaults { get; set; } + public ISchematicLogger Logger { get; set; } + public List> CacheProviders { get; set; } + public TimeSpan? Timeout { get; set; } // TODO + public bool Offline { get; set; } + public int? EventBufferPeriod { get; set; } +} + +public static class ClientOptionsExtensions +{ + public static ClientOptions WithHttpClient(this ClientOptions options, HttpClient httpClient) + { + return new ClientOptions + { + BaseUrl = options.BaseUrl, + HttpClient = httpClient, + MaxRetries = options.MaxRetries, + TimeoutInSeconds = options.TimeoutInSeconds, + FlagDefaults = options.FlagDefaults, + Logger = options.Logger, + CacheProviders = options.CacheProviders, + Timeout = options.Timeout, + Offline = options.Offline, + EventBufferPeriod = options.EventBufferPeriod + }; + } +} diff --git a/src/SchematicHQ.Client/EventBuffer.cs b/src/SchematicHQ.Client/EventBuffer.cs new file mode 100644 index 0000000..8409a9d --- /dev/null +++ b/src/SchematicHQ.Client/EventBuffer.cs @@ -0,0 +1,117 @@ +using System; +using System.Collections.Generic; +using System.Threading; + +#nullable enable + +namespace SchematicHQ.Client; + +public class EventBuffer : IDisposable +{ + private readonly EventsClient _eventsApi; + private readonly ISchematicLogger _logger; + private readonly int _interval; + private readonly int _maxSize; + private readonly List _events; + private int _currentSize; + private readonly object _flushLock = new object(); + private readonly object _pushLock = new object(); + private readonly CancellationTokenSource _cancellationTokenSource; + private bool _stopped; + + public EventBuffer(EventsClient eventsApi, ISchematicLogger logger, int? period = null) + { + _eventsApi = eventsApi; + _logger = logger; + _interval = period ?? DEFAULT_EVENT_BUFFER_PERIOD; + _maxSize = DEFAULT_BUFFER_MAX_SIZE; + _events = new List(); + _cancellationTokenSource = new CancellationTokenSource(); + + // Start periodic flushing thread + var flushThread = new Thread(PeriodicFlush); + flushThread.IsBackground = true; + flushThread.Start(); + } + + private async Task Flush() + { + lock (_flushLock) + { + if (_events.Count == 0) + return; + + var events = _events.FindAll(e => e != null); + try + { + var request = new CreateEventBatchRequestBody + { + Events = events + }; + await _eventsApi.CreateEventBatchAsync(request); + } + catch (Exception ex) + { + _logger.Error("Error flushing events: {0}", ex.Message); + } + + _events.Clear(); + _currentSize = 0; + } + } + + private void PeriodicFlush() + { + while (!_cancellationTokenSource.IsCancellationRequested) + { + Task.Run(async () => await Flush()); + _cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(_interval)); + } + } + + public void Push(CreateEventRequestBody @event) + { + if (_stopped) + { + _logger.Error("Event buffer is stopped, not accepting new events"); + return; + } + + lock (_pushLock) + { + var eventSize = GetEventSize(@event); + if (_currentSize + eventSize > _maxSize) + Task.Run(async () => await Flush()); + + _events.Add(@event); + _currentSize += eventSize; + } + } + + public void Stop() + { + try + { + _stopped = true; + _cancellationTokenSource.Cancel(); + _cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(5)); + } + catch (Exception ex) + { + _logger.Error("Error stopping event buffer: {0}", ex.Message); + } + } + + public void Dispose() + { + Stop(); + } + + private const int DEFAULT_BUFFER_MAX_SIZE = 10 * 1024; // 10KB + private const int DEFAULT_EVENT_BUFFER_PERIOD = 5; // 5 seconds + + private static int GetEventSize(CreateEventRequestBody obj) + { + return System.Runtime.InteropServices.Marshal.SizeOf(obj); + } +} diff --git a/src/SchematicHQ.Client/Logger.cs b/src/SchematicHQ.Client/Logger.cs new file mode 100644 index 0000000..8f9e6fd --- /dev/null +++ b/src/SchematicHQ.Client/Logger.cs @@ -0,0 +1,34 @@ +#nullable enable + +namespace SchematicHQ.Client; + +public interface ISchematicLogger +{ + void Error(string message, params object[] args); + void Warn(string message, params object[] args); + void Info(string message, params object[] args); + void Debug(string message, params object[] args); +} + +public class ConsoleLogger : ISchematicLogger +{ + public void Error(string message, params object[] args) + { + Console.WriteLine($"[ERROR] {string.Format(message, args)}"); + } + + public void Warn(string message, params object[] args) + { + Console.WriteLine($"[WARN] {string.Format(message, args)}"); + } + + public void Info(string message, params object[] args) + { + Console.WriteLine($"[INFO] {string.Format(message, args)}"); + } + + public void Debug(string message, params object[] args) + { + Console.WriteLine($"[DEBUG] {string.Format(message, args)}"); + } +} diff --git a/src/SchematicHQ.Client/Schematic.cs b/src/SchematicHQ.Client/Schematic.cs new file mode 100644 index 0000000..abfb840 --- /dev/null +++ b/src/SchematicHQ.Client/Schematic.cs @@ -0,0 +1,144 @@ +using OneOf; +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Text.Json.Serialization; + +#nullable enable + +namespace SchematicHQ.Client; + +public partial class Schematic +{ + private readonly ClientOptions _options; + private readonly EventBuffer _eventBuffer; + private readonly ISchematicLogger _logger; + private readonly List> _flagCheckCacheProviders; + private readonly bool _offline; + public readonly SchematicApi API; + + public Schematic(string apiKey, ClientOptions? options = null) + { + _options = options ?? new ClientOptions(); + _offline = _options.Offline; + _logger = _options.Logger ?? new ConsoleLogger(); + + var httpClient = _offline ? new OfflineHttpClient() : _options.HttpClient; + API = new SchematicApi(apiKey, _options.WithHttpClient(httpClient)); + + _eventBuffer = new EventBuffer( + API.Events, + _logger, + _options.EventBufferPeriod + ); + + _flagCheckCacheProviders = _options.CacheProviders ?? new List> + { + new LocalCache(DEFAULT_CACHE_SIZE, DEFAULT_CACHE_TTL) + }; + } + + public void Shutdown() + { + _eventBuffer.Stop(); + } + + public async Task CheckFlagAsync(string flagKey, Dictionary? company = null, Dictionary? user = null) + { + if (_offline) + return GetFlagDefault(flagKey); + + try + { + string cacheKey = (company != null || user != null) + ? $"{flagKey}:{company}:{user}" + : flagKey; + + foreach (var provider in _flagCheckCacheProviders) + { + if (provider.Get(cacheKey) is bool cachedValue) + return cachedValue; + } + + var response = await API.Features.CheckFlagAsync(flagKey, company, user); + if (response == null) + return GetFlagDefault(flagKey); + + foreach (var provider in _flagCheckCacheProviders) + { + provider.Set(cacheKey, response.Data.Value); + } + + return response.Data.Value; + } + catch (Exception ex) + { + _logger.Error(ex, "Error checking flag: {0}", ex.Message); + + return GetFlagDefault(flagKey); + } + } + + public void Identify(Dictionary keys, EventBodyIdentifyCompany? company = null, string? name = null, Dictionary? traits = null) + { + EnqueueEvent("identify", new EventBodyIdentify + { + Company = company, + Keys = keys, + Name = name, + Traits = traits + }); + } + + public void Track(string eventName, Dictionary? company = null, Dictionary? user = null, Dictionary? traits = null) + { + EnqueueEvent("track", new EventBodyTrack + { + Company = company, + Event = eventName, + Traits = traits, + User = user + }); + } + + private void EnqueueEvent(string eventType, OneOf body) + { + if (_offline) + return; + + try + { + var eventBody = new CreateEventRequestBody + { + EventType = eventType, + Body = body + }; + + _eventBuffer.Push(eventBody); + } + catch (Exception ex) + { + _logger.Error("Error enqueueing event: {0}", ex.Message); + } + } + + private bool GetFlagDefault(string flagKey) + { + return _options.FlagDefaults?.GetValueOrDefault(flagKey) ?? false; + } + + private const int DEFAULT_CACHE_SIZE = 10 * 1024; // 10KB + private const int DEFAULT_CACHE_TTL = 5; // 5 seconds +} + +public class OfflineHttpClient : HttpClient +{ + public override HttpResponseMessage Send(HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) + { + var response = new HttpResponseMessage(System.Net.HttpStatusCode.OK) + { + Content = new StringContent("{\"data\":{}}") + }; + return response; + } +}