diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java index 2ab74e282..621559436 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java @@ -45,6 +45,7 @@ public class ErrorReply implements Reply { public static final ErrorReply ILLEGAL_CLUSTER_HEATBEAT = new ErrorReply("ERR illegal cluster heatbeat"); public static final ErrorReply KEY_TOO_LONG = new ErrorReply("ERR key too long"); + public static final ErrorReply VALUE_TOO_LONG = new ErrorReply("ERR value too long"); private static final char MARKER = Marker.ErrorReply.getMarker(); private final String error; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java index 3468dcd88..7779150a2 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java @@ -61,6 +61,12 @@ public void sendCommand(int db, List commands, List keys = command.getKeys(); + for (byte[] key : keys) { + if (key.length > 1024) { + future.complete(ErrorReply.KEY_TOO_LONG); + return; + } + } if (keys.size() == 1) { byte[] key = keys.getFirst(); sendCommand(redisCommand, key, command, future); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/PSetExCommand.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/PSetExCommand.java index 82f11b0ea..a55b99264 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/PSetExCommand.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/PSetExCommand.java @@ -2,6 +2,7 @@ import com.netease.nim.camellia.redis.proxy.command.Command; import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; +import com.netease.nim.camellia.redis.proxy.reply.ErrorReply; import com.netease.nim.camellia.redis.proxy.reply.Reply; import com.netease.nim.camellia.redis.proxy.reply.StatusReply; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandConfig; @@ -10,6 +11,8 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.util.Utils; +import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k; + /** * PSETEX key milliseconds value *

@@ -39,6 +42,10 @@ protected Reply execute(short slot, Command command) throws Exception { long millis = Utils.bytesToNum(objects[2]); byte[] value = objects[3]; + if (value.length > _1024k) { + return ErrorReply.VALUE_TOO_LONG; + } + long expireTime = System.currentTimeMillis() + millis; KeyInfo keyInfo = new KeyInfo(DataType.string, key); keyInfo.setExpireTime(expireTime); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java index 1c735f31c..3a80f1c23 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java @@ -15,6 +15,8 @@ import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector; import com.netease.nim.camellia.redis.proxy.util.Utils; +import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k; + /** * SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL] * Created by caojiajun on 2025/1/3 @@ -44,6 +46,9 @@ protected Reply execute(short slot, Command command) throws Exception { byte[][] objects = command.getObjects(); Key key = new Key(objects[1]); byte[] value = objects[2]; + if (value.length > _1024k) { + return ErrorReply.VALUE_TOO_LONG; + } int nxxx = -1; long expireTime = -1; boolean get = false; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetExCommand.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetExCommand.java index f54ca1f26..924245ae7 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetExCommand.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetExCommand.java @@ -2,6 +2,7 @@ import com.netease.nim.camellia.redis.proxy.command.Command; import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; +import com.netease.nim.camellia.redis.proxy.reply.ErrorReply; import com.netease.nim.camellia.redis.proxy.reply.Reply; import com.netease.nim.camellia.redis.proxy.reply.StatusReply; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandConfig; @@ -10,6 +11,8 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.util.Utils; +import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k; + /** * SETEX key seconds value *

@@ -39,6 +42,10 @@ protected Reply execute(short slot, Command command) throws Exception { long seconds = Utils.bytesToNum(objects[2]); byte[] value = objects[3]; + if (value.length > _1024k) { + return ErrorReply.VALUE_TOO_LONG; + } + long expireTime = System.currentTimeMillis() + seconds * 1000L; KeyInfo keyInfo = new KeyInfo(DataType.string, key); keyInfo.setExpireTime(expireTime); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetNxCommand.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetNxCommand.java index 2417df210..7df485583 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetNxCommand.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetNxCommand.java @@ -2,6 +2,7 @@ import com.netease.nim.camellia.redis.proxy.command.Command; import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; +import com.netease.nim.camellia.redis.proxy.reply.ErrorReply; import com.netease.nim.camellia.redis.proxy.reply.IntegerReply; import com.netease.nim.camellia.redis.proxy.reply.Reply; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandConfig; @@ -10,6 +11,8 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; +import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k; + /** * SETNX key value *

@@ -38,6 +41,10 @@ protected Reply execute(short slot, Command command) throws Exception { Key key = new Key(objects[1]); byte[] value = objects[2]; + if (value.length > _1024k) { + return ErrorReply.VALUE_TOO_LONG; + } + KeyInfo keyInfo = keyReadWrite.get(slot, key); if (keyInfo == null) { keyInfo = new KeyInfo(DataType.string, key.key()); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java index f58d15f24..662fa3450 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java @@ -100,6 +100,8 @@ public void compact(short slot) { recycle = decodeResult.remaining() > BlockType._32k.getBlockSize(); } else if (blockType == BlockType._1024k) { recycle = decodeResult.remaining() > BlockType._256k.getBlockSize(); + } else if (blockType == BlockType._10m) { + recycle = decodeResult.remaining() > BlockType._1024k.getBlockSize(); } } if (recycle) { @@ -149,7 +151,8 @@ private BlockType nextBlockType(short slot) { case _4k -> nextBlockTypeMap.put(slot, BlockType._32k); case _32k -> nextBlockTypeMap.put(slot, BlockType._256k); case _256k -> nextBlockTypeMap.put(slot, BlockType._1024k); - case _1024k -> nextBlockTypeMap.put(slot, BlockType._4k); + case _1024k -> nextBlockTypeMap.put(slot, BlockType._10m); + case _10m -> nextBlockTypeMap.put(slot, BlockType._4k); } return blockType; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/constants/LocalStorageConstants.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/constants/LocalStorageConstants.java index 272e538ae..fd2946567 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/constants/LocalStorageConstants.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/constants/LocalStorageConstants.java @@ -10,6 +10,7 @@ public class LocalStorageConstants { public static final int _64k = 64*1024; public static final int _256k = 256*1024; public static final int _1024k = 1024*1024; + public static final int _10m = 10*1024*1024; public static final int key_manifest_bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib public static final long data_file_size = 192*1024*1024*1024L;//128Gib public static final int block_header_len = 4+4+2+1+4+4; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/BlockType.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/BlockType.java index fc4015847..6254113a4 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/BlockType.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/BlockType.java @@ -11,6 +11,7 @@ public enum BlockType { _32k(2, 32*1024, 2), _256k(3, 256*1024, 4), _1024k(4, 1024*1024, 4), + _10m(5, 10*1024*1024, 4), ; private final int type; @@ -53,6 +54,8 @@ public static BlockType fromData(byte[] data) { return BlockType._256k; } else if (data.length + 4 + 4 < LocalStorageConstants._1024k) { return BlockType._1024k; + } else if (data.length + 4 + 4 < LocalStorageConstants._10m) { + return BlockType._10m; } else { throw new IllegalArgumentException("data too long"); } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java index e5d0cac70..0be7174a6 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java @@ -1,6 +1,10 @@ package com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.EstimateSizeValueCalculator; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.LRUCache; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.SizeCalculator; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.persist.ValueFlushExecutor; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.block.StringBlockReadWrite; @@ -10,26 +14,53 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; + /** * Created by caojiajun on 2025/1/3 */ public class StringReadWrite { + private static final String READ_CACHE_CONFIG_KEY = "local.storage.string.read.cache.capacity"; + private static final String WRITE_CACHE_CONFIG_KEY = "local.storage.string.write.cache.capacity"; + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); private final ValueFlushExecutor flushExecutor; private final StringBlockReadWrite stringBlockReadWrite; + private final LRUCache readCache; + private final LRUCache writeCache; + public StringReadWrite(ValueFlushExecutor flushExecutor, StringBlockReadWrite stringBlockReadWrite) { this.flushExecutor = flushExecutor; this.stringBlockReadWrite = stringBlockReadWrite; + this.readCache = new LRUCache<>("string-read-cache", READ_CACHE_CONFIG_KEY, "32M", 1024, new EstimateSizeValueCalculator<>(), SizeCalculator.BYTES_INSTANCE); + this.writeCache = new LRUCache<>("string-write-cache", WRITE_CACHE_CONFIG_KEY, "32M", 1024, new EstimateSizeValueCalculator<>(), SizeCalculator.BYTES_INSTANCE); } public void put(short slot, KeyInfo keyInfo, byte[] data) throws IOException { + Key key = new Key(keyInfo.getKey()); + byte[] bytes = readCache.get(key); + if (bytes != null) { + readCache.put(key, data); + } else { + writeCache.put(key, data); + } get(slot).put(keyInfo, data); } public byte[] get(short slot, KeyInfo keyInfo) throws IOException { + Key key = new Key(keyInfo.getKey()); + byte[] bytes = readCache.get(key); + if (bytes != null) { + return bytes; + } + bytes = writeCache.get(key); + if (bytes != null) { + readCache.put(key, bytes); + writeCache.delete(key); + return bytes; + } return get(slot).get(keyInfo); } @@ -41,10 +72,6 @@ public CompletableFuture flush(short slot) throws IOException { return slotStringReadWrite.flush(); } - private SlotStringReadWrite get(short slot) { - return CamelliaMapUtils.computeIfAbsent(map, slot, s -> new SlotStringReadWrite(slot, flushExecutor, stringBlockReadWrite)); - } - public boolean needFlush(short slot) { SlotStringReadWrite slotStringReadWrite = get(slot); if (slotStringReadWrite == null) { @@ -53,4 +80,8 @@ public boolean needFlush(short slot) { return slotStringReadWrite.needFlush(); } + private SlotStringReadWrite get(short slot) { + return CamelliaMapUtils.computeIfAbsent(map, slot, s -> new SlotStringReadWrite(slot, flushExecutor, stringBlockReadWrite)); + } + } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java index 5a6de8c72..a4643f476 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java @@ -2,6 +2,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.LRUCache; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.SizeCalculator; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValue; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueCodec; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueDecodeResult; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.file.FileNames; @@ -12,6 +13,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.IValueManifest; import java.io.IOException; +import java.util.Arrays; import java.util.List; import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._4k; @@ -66,10 +68,15 @@ public byte[] get(KeyInfo keyInfo) throws IOException { if (list.isEmpty()) { return null; } - if (list.size() > keyInfo.getValueLocation().offset()) { + if (list.size() <= keyInfo.getValueLocation().offset()) { return null; } - return list.get(keyInfo.getValueLocation().offset()); + byte[] bytes = list.get(keyInfo.getValueLocation().offset()); + StringValue stringValue = StringValue.decode(bytes); + if (Arrays.equals(stringValue.key(), keyInfo.getKey())) { + return stringValue.value(); + } + return null; } @Override