Skip to content

Commit

Permalink
Remove MarketItem cache (#1356)
Browse files Browse the repository at this point in the history
  • Loading branch information
karashiiro authored Jul 27, 2024
1 parent abb8ac6 commit ddcc1a9
Showing 1 changed file with 8 additions and 202 deletions.
210 changes: 8 additions & 202 deletions src/Universalis.DbAccess/MarketBoard/MarketItemStore.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MemoryPack;
using Microsoft.Extensions.Logging;
using Npgsql;
using Prometheus;
using StackExchange.Redis;
using Universalis.DbAccess.Queries.MarketBoard;
using Universalis.Entities.MarketBoard;

Expand All @@ -19,29 +15,13 @@ namespace Universalis.DbAccess.MarketBoard;
[SuppressMessage("ReSharper", "ExplicitCallerInfoArgument")]
public class MarketItemStore : IMarketItemStore
{
private static readonly Counter CacheHits =
Prometheus.Metrics.CreateCounter("universalis_market_item_cache_hit", "");

private static readonly Counter
CacheMisses = Prometheus.Metrics.CreateCounter("universalis_market_item_cache_miss", "");

private static readonly Counter CacheUpdates =
Prometheus.Metrics.CreateCounter("universalis_market_item_cache_update", "");

private static readonly Counter CacheTimeouts =
Prometheus.Metrics.CreateCounter("universalis_market_item_cache_timeout", "");

private static readonly TimeSpan MarketItemCacheTime = TimeSpan.FromMinutes(30);

private readonly ILogger<MarketItemStore> _logger;
private readonly NpgsqlDataSource _dataSource;
private readonly ICacheRedisMultiplexer _cache;

public MarketItemStore(NpgsqlDataSource dataSource, ICacheRedisMultiplexer cache, ILogger<MarketItemStore> logger)
public MarketItemStore(NpgsqlDataSource dataSource, ILogger<MarketItemStore> logger)
{
_dataSource = dataSource;
_logger = logger;
_cache = cache;
}

public async Task Insert(MarketItem marketItem, CancellationToken cancellationToken = default)
Expand All @@ -63,9 +43,6 @@ public async Task Insert(MarketItem marketItem, CancellationToken cancellationTo
try
{
await command.ExecuteNonQueryAsync(cancellationToken);

// Update the cache, since this dataset should fit completely in Redis
await StoreMarketItemInCache(marketItem);
}
catch (Exception e)
{
Expand All @@ -79,13 +56,6 @@ public async ValueTask<MarketItem> Retrieve(MarketItemQuery query, CancellationT
{
using var activity = Util.ActivitySource.StartActivity("MarketItemStore.Retrieve");

// Try to fetch the market item from the cache
var (success, cacheValue) = await TryGetMarketItemFromCache(query.WorldId, query.ItemId, cancellationToken);
if (success)
{
return cacheValue;
}

await using var command =
_dataSource.CreateCommand("SELECT updated FROM market_item WHERE item_id = $1 AND world_id = $2");
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = query.ItemId });
Expand All @@ -106,8 +76,6 @@ public async ValueTask<MarketItem> Retrieve(MarketItemQuery query, CancellationT
LastUploadTime = lastUploadTime.Value,
};

// Cache the result temporarily
await StoreMarketItemInCache(marketItem);
return marketItem;
}
catch (Exception e)
Expand All @@ -128,23 +96,6 @@ public async ValueTask<IEnumerable<MarketItem>> RetrieveMany(MarketItemManyQuery
itemIds.Select(itemId => new WorldItemPair(worldId, itemId)))
.ToList();

var marketItemRecords = new Dictionary<WorldItemPair, MarketItem>();

// Attempt to retrieve market items from the cache
activity?.AddEvent(new ActivityEvent("TryGetMarketItemFromCacheMulti"));
var cacheValues = await TryGetMarketItemFromCacheMulti(worldItemPairs);
if (cacheValues.Count == worldItemPairs.Count)
{
// Retrieved everything from the cache
return cacheValues.Values;
}

foreach (var (wip, cacheValue) in cacheValues)
{
marketItemRecords[wip] = cacheValue;
worldItemPairs.Remove(wip);
}

await using var command = _dataSource.CreateCommand(
"""
SELECT updated, item_id, world_id
Expand All @@ -161,30 +112,18 @@ FROM market_item
await using var reader =
await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken);

var marketItemRecords = new List<MarketItem>();
while (await reader.ReadAsync(cancellationToken))
{
var lastUploadTime = reader.GetDateTime(0);
var itemId = reader.GetInt32(1);
var worldId = reader.GetInt32(2);
var wip = new WorldItemPair(worldId, itemId);

marketItemRecords[wip] = new MarketItem
marketItemRecords.Add(new MarketItem
{
ItemId = itemId,
WorldId = worldId,
LastUploadTime = lastUploadTime,
};
LastUploadTime = reader.GetDateTime(0),
ItemId = reader.GetInt32(1),
WorldId = reader.GetInt32(2),
});
}

// Cache the results, except for things we already got from the cache
activity?.AddEvent(new ActivityEvent("StoreMarketItemInCacheMulti"));
var toCache = marketItemRecords.Where(r => !cacheValues.ContainsKey(r.Key))
.ToDictionary(
kvp => kvp.Key,
kvp => kvp.Value);
await StoreMarketItemInCacheMulti(toCache);

return marketItemRecords.Values.ToList();
return marketItemRecords;
}
catch (Exception e)
{
Expand All @@ -193,137 +132,4 @@ FROM market_item
throw;
}
}

private async Task<IDictionary<WorldItemPair, MarketItem>> TryGetMarketItemFromCacheMulti(IList<WorldItemPair> keys)
{
using var activity = Util.ActivitySource.StartActivity("MarketItemStore.TryGetMarketItemFromCacheMulti");

var results = new Dictionary<WorldItemPair, MarketItem>();

// Try to fetch the listings from the cache. SE.Redis will always skew heavily to one node or the other, but
// we want to load-balance more evenly
var replicaRatio = 1 / (1 + _cache.ReplicaCount); // 1 / (1 + 1 replica) = 0.5, 1 / (1 + 2 replicas) = 0.33...
var commandFlags = Random.Shared.NextDouble() > replicaRatio
? CommandFlags.PreferReplica
: CommandFlags.PreferMaster;

var db = _cache.GetDatabase(RedisDatabases.Cache.Listings);
var cacheKeys = keys.Select(MarketItemKey).Select(k => new RedisKey(k)).ToArray();
try
{
var cacheValues = await db.StringGetAsync(cacheKeys, commandFlags);
for (var i = 0; i < cacheValues.Length; i++)
{
var wip = keys[i];
var cacheValue = cacheValues[i];
if (cacheValue != RedisValue.Null)
{
results[wip] = DeserializeMarketItem(cacheValue);
CacheHits.Inc();
}
else
{
CacheMisses.Inc();
}
}
}
catch (TimeoutException)
{
CacheTimeouts.Inc();
}
catch (OperationCanceledException)
{
CacheTimeouts.Inc();
}

return results;
}

private async Task<(bool, MarketItem)> TryGetMarketItemFromCache(int worldId, int itemId,
CancellationToken cancellationToken = default)
{
using var activity = Util.ActivitySource.StartActivity("MarketItemStore.TryGetMarketItemFromCache");
var db = _cache.GetDatabase(RedisDatabases.Cache.Listings);
var cacheKey = MarketItemKey(worldId, itemId);

try
{
var cacheValue = await db.StringGetAsync(cacheKey, CommandFlags.PreferReplica)
.WaitAsync(TimeSpan.FromSeconds(1), cancellationToken);
if (cacheValue != RedisValue.Null)
{
CacheHits.Inc();
return (true, DeserializeMarketItem(cacheValue));
}
else
{
CacheMisses.Inc();
return (false, null);
}
}
catch (TimeoutException)
{
CacheTimeouts.Inc();
return (false, null);
}
catch (OperationCanceledException)
{
CacheTimeouts.Inc();
return (false, null);
}
}

private async Task StoreMarketItemInCacheMulti(IDictionary<WorldItemPair, MarketItem> marketItems)
{
using var activity = Util.ActivitySource.StartActivity("MarketItemStore.StoreMarketItemInCacheMulti");

var db = _cache.GetDatabase(RedisDatabases.Cache.Listings);
var cacheRecords = marketItems.Select(kvp =>
new KeyValuePair<RedisKey, RedisValue>(MarketItemKey(kvp.Key), SerializeMarketItem(kvp.Value)))
.ToArray();

// Do an MSET and add an expiry to each new key
await db.StringSetAsync(cacheRecords, When.Always, CommandFlags.FireAndForget);
foreach (var wip in marketItems.Keys)
{
await db.KeyExpireAsync(MarketItemKey(wip), MarketItemCacheTime, CommandFlags.FireAndForget);
}

CacheUpdates.Inc(marketItems.Count);
}

private async Task StoreMarketItemInCache(MarketItem marketItem)
{
using var activity = Util.ActivitySource.StartActivity("MarketItemStore.StoreMarketItemInCache");
var db = _cache.GetDatabase(RedisDatabases.Cache.Listings);
var cacheKey = MarketItemKey(marketItem);
await db.StringSetAsync(cacheKey, SerializeMarketItem(marketItem), MarketItemCacheTime, When.Always,
CommandFlags.FireAndForget);
CacheUpdates.Inc();
}

private static string MarketItemKey(MarketItem marketItem)
{
return MarketItemKey(marketItem.WorldId, marketItem.ItemId);
}

private static string MarketItemKey(WorldItemPair wip)
{
return MarketItemKey(wip.WorldId, wip.ItemId);
}

private static string MarketItemKey(int worldId, int itemId)
{
return $"market_item:{worldId}:{itemId}";
}

private static MarketItem DeserializeMarketItem(byte[] value)
{
return MemoryPackSerializer.Deserialize<MarketItem>(value);
}

private static byte[] SerializeMarketItem(MarketItem marketItem)
{
return MemoryPackSerializer.Serialize(marketItem);
}
}

0 comments on commit ddcc1a9

Please sign in to comment.