Skip to content

Commit

Permalink
V1 custom code
Browse files Browse the repository at this point in the history
  • Loading branch information
bpapillon committed Jun 26, 2024
1 parent 90a52f6 commit c5ba86d
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 2 deletions.
8 changes: 8 additions & 0 deletions .fernignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# Specify files that shouldn't be modified by Fern
README.md

src/SchematicHQ.Client.Test/TestCache.cs
src/SchematicHQ.Client.Test/TestEventBuffer.cs
src/SchematicHQ.Client/Cache.cs
src/SchematicHQ.Client/Core/ClientOptionsCustom.cs
src/SchematicHQ.Client/EventBuffer.cs
src/SchematicHQ.Client/Logger.cs
src/SchematicHQ.Client/Schematic.cs
4 changes: 2 additions & 2 deletions src/SchematicHQ.Client.Test/SchematicHQ.Client.Test.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

Expand All @@ -21,4 +21,4 @@
<ProjectReference Include="..\SchematicHQ.Client\SchematicHQ.Client.csproj" />
</ItemGroup>

</Project>
</Project>
61 changes: 61 additions & 0 deletions src/SchematicHQ.Client.Test/TestCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using NUnit.Framework;
using System;
using System.Threading.Tasks;

#nullable enable

namespace SchematicHQ.Client.Test
{
[TestFixture]
public class TestCache
{
[Test]
public void TestCacheGetAndSet()
{
var cache = new LocalCache<string>(maxItems: 1000, ttl: 5000);

cache.Set("key1", "value1");
Assert.AreEqual("value1", cache.Get("key1"));

cache.Set("key2", "value2", ttlOverride: 1);
Assert.AreEqual("value2", cache.Get("key2"));

// Wait for the TTL to expire
Task.Delay(TimeSpan.FromSeconds(2)).Wait();
Assert.IsNull(cache.Get("key2"));
}

[Test]
public void TestCacheEviction()
{
var cache = new LocalCache<string>(maxItems: 2, ttl: 5000);

cache.Set("key1", "a");
cache.Set("key2", "b");

// Access key1, making it more recently used than key2
Assert.AreEqual("a", cache.Get("key1"));

// Adding a new key should evict the least recently used key,
// which will now be key2
cache.Set("key3", "c");

Assert.IsNull(cache.Get("key2"));
Assert.AreEqual("a", cache.Get("key1"));
Assert.AreEqual("c", cache.Get("key3"));
}

[Test]
public void TestCacheExpiration()
{
var cache = new LocalCache<string>(maxItems: 1000, ttl: 1000);

cache.Set("key1", "value1");
Assert.AreEqual("value1", cache.Get("key1"));

// Wait for the TTL to expire
Task.Delay(TimeSpan.FromSeconds(2)).Wait();
Assert.IsNull(cache.Get("key1"));
}
}
}
2 changes: 2 additions & 0 deletions src/SchematicHQ.Client.Test/TestClient.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
namespace SchematicHQ.Client.Test;

#nullable enable

public class TestClient { }
114 changes: 114 additions & 0 deletions src/SchematicHQ.Client/Cache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

#nullable enable

namespace SchematicHQ.Client;

public interface ICacheProvider<T>
{
T? Get(string key);
void Set(string key, T val, int? ttlOverride = null);
}

public class LocalCache<T> : ICacheProvider<T>
{
private const int DEFAULT_CACHE_CAPACITY = 1000;
private const int DEFAULT_CACHE_TTL = 5000; // 5 seconds

private readonly ConcurrentDictionary<string, CachedItem<T>> _cache;
private readonly LinkedList<string> _lruList;
private readonly object _lock = new object();
private readonly int _maxItems;
private readonly int _ttl;

public LocalCache(int maxItems = DEFAULT_CACHE_CAPACITY, int ttl = DEFAULT_CACHE_TTL)
{
_cache = new ConcurrentDictionary<string, CachedItem<T>>();
_lruList = new LinkedList<string>();
_maxItems = maxItems;
_ttl = ttl;
}

public T? Get(string key)
{
if (_maxItems == 0)
return default;

if (!_cache.TryGetValue(key, out var item))
return default;

if (DateTime.UtcNow > item.Expiration)
{
Remove(key);
return default;
}

lock (_lock)
{
_lruList.Remove(item.Node);
_lruList.AddFirst(item.Node);
}

return item.Value;
}

public void Set(string key, T val, int? ttlOverride = null)
{
if (_maxItems == 0)
return;

var ttl = ttlOverride ?? _ttl;
var expiration = DateTime.UtcNow.AddMilliseconds(ttl);

lock (_lock)
{
if (_cache.TryGetValue(key, out var existingItem))
{
existingItem.Value = val;
existingItem.Expiration = expiration;
_lruList.Remove(existingItem.Node);
_lruList.AddFirst(existingItem.Node);
}
else
{
if (_cache.Count >= _maxItems)
{
var lruKey = _lruList.Last!.Value;
Remove(lruKey);
}

var node = _lruList.AddFirst(key);
var newItem = new CachedItem<T>(val, expiration, node);
_cache[key] = newItem;
}
}
}

private void Remove(string key)
{
if (_cache.TryRemove(key, out var removedItem))
{
lock (_lock)
{
_lruList.Remove(removedItem.Node);
}
}
}
}

public class CachedItem<T>
{
public T Value { get; set; }
public DateTime Expiration { get; set; }
public LinkedListNode<string> Node { get; }

public CachedItem(T value, DateTime expiration, LinkedListNode<string> node)
{
Value = value;
Expiration = expiration;
Node = node;
}
}
33 changes: 33 additions & 0 deletions src/SchematicHQ.Client/Core/ClientOptionsCustom.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using SchematicHQ.Client.Core;

#nullable enable

namespace SchematicHQ.Client;

public partial class ClientOptions
{
public Dictionary<string, bool> FlagDefaults { get; set; }
public ISchematicLogger Logger { get; set; }
public List<ICacheProvider<bool>> CacheProviders { get; set; }
public bool Offline { get; set; }
public int? EventBufferPeriod { get; set; }
}

public static class ClientOptionsExtensions
{
public static ClientOptions WithHttpClient(this ClientOptions options, HttpClient httpClient)
{
return new ClientOptions
{
BaseUrl = options.BaseUrl,
HttpClient = httpClient,
MaxRetries = options.MaxRetries,
TimeoutInSeconds = options.TimeoutInSeconds,
FlagDefaults = options.FlagDefaults,
Logger = options.Logger,
CacheProviders = options.CacheProviders,
Offline = options.Offline,
EventBufferPeriod = options.EventBufferPeriod
};
}
}
111 changes: 111 additions & 0 deletions src/SchematicHQ.Client/EventBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Collections.Generic;
using System.Threading;

#nullable enable

namespace SchematicHQ.Client;

public class EventBuffer : IDisposable
{
private readonly EventsClient _eventsApi;
private readonly ISchematicLogger _logger;
private readonly int _interval;
private readonly int _maxSize;
private readonly List<CreateEventRequestBody> _events;
private int _currentSize;
private readonly object _flushLock = new object();
private readonly object _pushLock = new object();
private readonly CancellationTokenSource _cancellationTokenSource;
private bool _stopped;

public EventBuffer(EventsClient eventsApi, ISchematicLogger logger, int? period = null)
{
_eventsApi = eventsApi;
_logger = logger;
_interval = period ?? DEFAULT_EVENT_BUFFER_PERIOD;
_maxSize = DEFAULT_BUFFER_MAX_SIZE;
_events = new List<CreateEventRequestBody>();
_cancellationTokenSource = new CancellationTokenSource();

var flushThread = new Thread(PeriodicFlush);
flushThread.IsBackground = true;
flushThread.Start();
}

private async Task Flush()
{
List<CreateEventRequestBody> eventsToFlush;
lock (_flushLock)
{
if (_events.Count == 0)
return;

eventsToFlush = _events.FindAll(e => e != null);
_events.Clear();
_currentSize = 0;
}

try
{
var request = new CreateEventBatchRequestBody
{
Events = eventsToFlush
};
await _eventsApi.CreateEventBatchAsync(request);
}
catch (Exception ex)
{
_logger.Error("Error flushing events: {0}", ex.Message);
}
}

private void PeriodicFlush()
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
Task.Run(async () => await Flush());
_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(_interval));
}
}

public void Push(CreateEventRequestBody @event)
{
if (_stopped)
{
_logger.Error("Event buffer is stopped, not accepting new events");
return;
}

lock (_pushLock)
{
if (_currentSize + 1 > _maxSize)
Task.Run(async () => await Flush());

_events.Add(@event);
_currentSize += 1;
}
}

public void Stop()
{
try
{
_stopped = true;
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(5));
}
catch (Exception ex)
{
_logger.Error("Error stopping event buffer: {0}", ex.Message);
}
}

public void Dispose()
{
Stop();
}

private const int DEFAULT_BUFFER_MAX_SIZE = 100; // Flush after 100 events
private const int DEFAULT_EVENT_BUFFER_PERIOD = 5; // Flush after 5 seconds
}
34 changes: 34 additions & 0 deletions src/SchematicHQ.Client/Logger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#nullable enable

namespace SchematicHQ.Client;

public interface ISchematicLogger
{
void Error(string message, params object[] args);
void Warn(string message, params object[] args);
void Info(string message, params object[] args);
void Debug(string message, params object[] args);
}

public class ConsoleLogger : ISchematicLogger
{
public void Error(string message, params object[] args)
{
Console.WriteLine($"[ERROR] {string.Format(message, args)}");
}

public void Warn(string message, params object[] args)
{
Console.WriteLine($"[WARN] {string.Format(message, args)}");
}

public void Info(string message, params object[] args)
{
Console.WriteLine($"[INFO] {string.Format(message, args)}");
}

public void Debug(string message, params object[] args)
{
Console.WriteLine($"[DEBUG] {string.Format(message, args)}");
}
}
Loading

0 comments on commit c5ba86d

Please sign in to comment.