From c35a6104d16fae706ad563a8f35b7df8f2fa1181 Mon Sep 17 00:00:00 2001 From: caojiajun Date: Tue, 7 Jan 2025 20:38:10 +0800 Subject: [PATCH] feat: string value (#364) --- .../storage/codec/StringValueCodec.java | 173 ++++++++++++++++++ .../command/CommandOnEmbeddedStorage.java | 18 ++ .../embedded/storage/command/string/Get.java | 10 +- .../embedded/storage/command/string/Set.java | 8 +- .../storage/compress/ZstdCompressor.java | 2 +- .../constants/EmbeddedStorageConstants.java | 5 + .../embedded/storage/enums/FlushStatus.java | 11 ++ .../embedded/storage/file/ByteBufferUtil.java | 31 ++++ .../embedded/storage/flush/FlushExecutor.java | 32 ++++ .../embedded/storage/flush/FlushThread.java | 42 +++++ .../storage/flush/FlushThreadFactory.java | 39 ++++ .../embedded/storage/key/KeyInfo.java | 31 +++- .../embedded/storage/key/KeyReadWrite.java | 22 ++- .../storage/key/persist/KeyFlushExecutor.java | 23 ++- .../storage/key/slot/IKeyManifest.java | 43 +++++ ...tKeyBlockCache.java => KeyBlockCache.java} | 6 +- .../{KeySlotMap.java => KeyManifest.java} | 40 ++-- .../storage/key/slot/SlotKeyReadWrite.java | 61 +++--- .../storage/value/block/BlockInfo.java | 9 + .../storage/value/block/BlockLocation.java | 7 + .../storage/value/block/BlockType.java | 52 ++++++ .../storage/value/block/IValueManifest.java | 41 +++++ .../value/{ => block}/ValueLocation.java | 4 +- .../storage/value/block/ValueManifest.java | 29 +++ .../value/persist/StringValueFlushTask.java | 11 ++ .../value/persist/ValueFlushExecutor.java | 82 +++++++++ .../value/string/SlotStringReadWrite.java | 83 +++++++++ .../value/string/StringBlockCache.java | 17 ++ .../storage/value/string/StringReadWrite.java | 48 ++++- .../redis/proxy/test/KeyCodecTest.java | 9 +- ...ySlotMapTest.java => KeyManifestTest.java} | 10 +- .../proxy/test/StringValueCodecTest.java | 127 +++++++++++++ 32 files changed, 1025 insertions(+), 101 deletions(-) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushStatus.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/ByteBufferUtil.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushExecutor.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThread.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThreadFactory.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/IKeyManifest.java rename camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/{SlotKeyBlockCache.java => KeyBlockCache.java} (95%) rename camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/{KeySlotMap.java => KeyManifest.java} (89%) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockInfo.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockLocation.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockType.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java rename camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/{ => block}/ValueLocation.java (55%) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/StringValueFlushTask.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/SlotStringReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringBlockCache.java rename camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/{KeySlotMapTest.java => KeyManifestTest.java} (94%) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java new file mode 100644 index 000000000..f50a1b6cc --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/StringValueCodec.java @@ -0,0 +1,173 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressUtils; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.*; +import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils; +import com.netease.nim.camellia.tools.utils.Pair; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants.block_header_len; + +/** + * Created by caojiajun on 2025/1/6 + */ +public class StringValueCodec { + + public static List encode(short slot, BlockType blockType, IValueManifest valueManifest, List> values) throws IOException { + // + //data to sub-block + List subBlocks = new ArrayList<>(); + { + SubBlock subBlock = new SubBlock(); + //new sub-block + ByteBuffer decompressed = ByteBuffer.allocate(blockType.getBlockSize() - block_header_len); + int size = 0; + for (Pair entry : values) { + // + byte[] data = entry.getSecond(); + int length = data.length; + if (decompressed.remaining() < length + blockType.getSizeLen()) { + //sub-block compressed + CompressType compressType = CompressType.zstd; + byte[] compressed = CompressUtils.get(compressType).compress(decompressed.array(), 0, size); + if (compressed.length >= size) { + compressType = CompressType.none; + compressed = new byte[size]; + System.arraycopy(decompressed.array(), 0, compressed, 0, compressed.length); + } + subBlock.compressType = compressType;//1 + subBlock.decompressLen = size;//4 + subBlock.compressLen = compressed.length;//4 + subBlock.compressed = compressed;//n + subBlocks.add(subBlock); + // + //new sub-block + decompressed = ByteBuffer.allocate(blockType.getBlockSize() - block_header_len); + size = 0; + subBlock = new SubBlock(); + } + subBlock.keyInfos.add(entry.getFirst()); + //add data to sub-block + if (blockType == BlockType._4k || blockType == BlockType._32k) { + decompressed.putShort((short) length); + } else { + decompressed.putInt(length); + } + decompressed.put(data); + size += blockType.getSizeLen(); + size += length; + } + //sub-block compressed + CompressType compressType = CompressType.zstd; + byte[] compressed = CompressUtils.get(compressType).compress(decompressed.array(), 0, size); + if (compressed.length >= size) { + compressType = CompressType.none; + compressed = new byte[size]; + System.arraycopy(decompressed.array(), 0, compressed, 0, compressed.length); + } + subBlock.compressType = compressType;//1 + subBlock.decompressLen = size;//4 + subBlock.compressLen = compressed.length;//4 + subBlock.compressed = compressed;//n + subBlocks.add(subBlock); + } + + //sub-block to block + List blockInfos = new ArrayList<>(); + + { + //new block + BlockLocation location = valueManifest.allocate(slot, blockType); + ByteBuffer buffer = ByteBuffer.allocate(blockType.getBlockSize()); + int offset = 0; + short subBlockCount = 0; + buffer.putInt(0);//4 + buffer.putShort(subBlockCount);//2 + for (SubBlock block : subBlocks) { + // + if (buffer.remaining() < block.size()) { + //sub block merge + int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 6, buffer.array().length); + buffer.putInt(0, crc);//4 + buffer.putShort(4, subBlockCount);//2 + blockInfos.add(new BlockInfo(blockType, location, buffer.array())); + // + //new block + location = valueManifest.allocate(slot, blockType); + buffer = ByteBuffer.allocate(blockType.getBlockSize()); + subBlockCount = 0; + buffer.putInt(0);//4 + buffer.putShort(subBlockCount);//2 + } + //add sub-block to block + buffer.put(block.compressType.getType()); + buffer.putInt(block.decompressLen); + buffer.putInt(block.compressLen); + buffer.put(block.compressed); + for (KeyInfo keyInfo : block.keyInfos) { + keyInfo.setValueLocation(new ValueLocation(location, offset)); + offset++; + } + subBlockCount++; + } + //sub block merge + int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 6, buffer.array().length); + buffer.putInt(0, crc); + buffer.putShort(4, subBlockCount); + blockInfos.add(new BlockInfo(blockType, location, buffer.array())); + } + return blockInfos; + } + + public static List decode(byte[] data, BlockType blockType) { + ByteBuffer buffer = ByteBuffer.wrap(data); + int crc1 = buffer.getInt(); + int crc2 = RedisClusterCRC16Utils.getCRC16(data, 6, data.length); + if (crc1 != crc2) { + return new ArrayList<>(); + } + + List values = new ArrayList<>(); + + short subBlockCount = buffer.getShort(); + for (int i=0; i keyInfos = new ArrayList<>(); + CompressType compressType; + int decompressLen = 0; + int compressLen = 0; + byte[] compressed; + + int size() { + return 1 + 4 + 4 + compressed.length; + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java index a1ccd56f6..66b021367 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java @@ -3,10 +3,15 @@ import com.netease.nim.camellia.redis.proxy.command.Command; import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; import com.netease.nim.camellia.redis.proxy.reply.Reply; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string.StringReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.wal.WalGroup; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + /** * Created by caojiajun on 2025/1/3 */ @@ -37,4 +42,17 @@ public abstract class CommandOnEmbeddedStorage { * @return reply */ protected abstract Reply execute(short slot, Command command) throws Exception; + + /** + * check and flush + * @param slot slot + * @throws IOException exception + */ + protected void checkAndFlush(short slot) throws IOException { + if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) { + keyReadWrite.flushPrepare(slot); + CompletableFuture future = stringReadWrite.flush(slot); + future.thenAccept(flushResult -> keyReadWrite.flush(slot)); + } + } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java index 276a7f433..25352afca 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java @@ -8,9 +8,10 @@ import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; import com.netease.nim.camellia.tools.utils.BytesKey; +import java.io.IOException; + /** * GET key *

@@ -34,10 +35,10 @@ protected Reply execute(short slot, Command command) throws Exception { byte[][] objects = command.getObjects(); BytesKey key = new BytesKey(objects[1]); KeyInfo keyInfo = keyReadWrite.get(slot, key); - return execute0(keyInfo); + return execute0(slot, keyInfo); } - private Reply execute0(KeyInfo keyInfo) { + private Reply execute0(short slot, KeyInfo keyInfo) throws IOException { if (keyInfo == null) { return BulkReply.NIL_REPLY; } @@ -47,8 +48,7 @@ private Reply execute0(KeyInfo keyInfo) { if (keyInfo.containsExtra()) { return new BulkReply(keyInfo.getExtra()); } - ValueLocation valueLocation = keyInfo.getValueLocation(); - byte[] bytes = stringReadWrite.get(valueLocation); + byte[] bytes = stringReadWrite.get(slot, keyInfo); if (bytes == null) { return BulkReply.NIL_REPLY; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java index 55a396094..6782d48dc 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java @@ -8,8 +8,8 @@ import com.netease.nim.camellia.redis.proxy.reply.StatusReply; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; import com.netease.nim.camellia.redis.proxy.upstream.kv.command.string.SetCommander; import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector; import com.netease.nim.camellia.redis.proxy.util.Utils; @@ -117,10 +117,12 @@ protected Reply execute(short slot, Command command) throws Exception { keyInfo.setExtra(value); } else { keyInfo.setExtra(null); - ValueLocation location = stringReadWrite.put(slot, keyInfo, value); - keyInfo.setValueLocation(location); + stringReadWrite.put(slot, keyInfo, value); } keyReadWrite.put(slot, keyInfo); + + checkAndFlush(slot); + if (get) { return new BulkReply(oldValue); } else { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java index ba2e6525d..6a19c3127 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java @@ -11,7 +11,7 @@ public class ZstdCompressor implements ICompressor { private final int compressionLevel; public ZstdCompressor() { - compressionLevel = Zstd.maxCompressionLevel(); + compressionLevel = Zstd.defaultCompressionLevel(); } @Override diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java index b0fd5e54a..682be689f 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java @@ -6,6 +6,11 @@ public class EmbeddedStorageConstants { public static final int _4k = 4*1024; + public static final int _32k = 32*1024; public static final int _64k = 64*1024; + public static final int _256k = 256*1024; + public static final int _1024k = 1024*1024; public static final int bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib + public static final long block_size = 128*1024*1024*1024L;//128Gib + public static final int block_header_len = 4+2+1+4+4; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushStatus.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushStatus.java new file mode 100644 index 000000000..45ba890d2 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushStatus.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums; + +/** + * Created by caojiajun on 2025/1/6 + */ +public enum FlushStatus { + PREPARE, + FLUSHING, + FLUSH_OK, + ; +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/ByteBufferUtil.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/ByteBufferUtil.java new file mode 100644 index 000000000..dc4381c3a --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/ByteBufferUtil.java @@ -0,0 +1,31 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.file; + +import java.nio.ByteBuffer; + +/** + * Created by caojiajun on 2025/1/6 + */ +public class ByteBufferUtil { + + public static void writeInt(int value, ByteBuffer buffer) { + while ((value & 0xFFFFFF80) != 0L) { + buffer.put((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + buffer.put((byte) (value & 0x7F)); + } + + public static int readInt(ByteBuffer buffer, int idx) { + int value = 0; + int i = 0; + while ((buffer.get(idx) & 0x80) != 0) { + value |= (buffer.get(idx) & 0x7F) << i; + i += 7; + idx++; + if (i > 21) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (buffer.get(idx) << i); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushExecutor.java new file mode 100644 index 000000000..b209a7f3d --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushExecutor.java @@ -0,0 +1,32 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.flush; + + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Created by caojiajun on 2025/1/7 + */ +public class FlushExecutor { + + private final ThreadPoolExecutor executor; + + public FlushExecutor(int poolSize, int queueSize) { + this.executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), new FlushThreadFactory("flush", false)); + } + + public boolean isInFlushThread() { + Thread thread = Thread.currentThread(); + return thread instanceof FlushThread; + } + + public void submit(Runnable task) { + if (isInFlushThread()) { + task.run(); + } else { + executor.submit(task); + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThread.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThread.java new file mode 100644 index 000000000..b34edef6b --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThread.java @@ -0,0 +1,42 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.flush; + +/** + * Created by caojiajun on 2025/1/7 + */ +public class FlushThread extends Thread { + + public FlushThread() { + } + + public FlushThread(Runnable task) { + super(task); + } + + public FlushThread(ThreadGroup group, Runnable task) { + super(group, task); + } + + public FlushThread(String name) { + super(name); + } + + public FlushThread(ThreadGroup group, String name) { + super(group, name); + } + + public FlushThread(Runnable task, String name) { + super(task, name); + } + + public FlushThread(ThreadGroup group, Runnable task, String name) { + super(group, task, name); + } + + public FlushThread(ThreadGroup group, Runnable task, String name, long stackSize) { + super(group, task, name, stackSize); + } + + public FlushThread(ThreadGroup group, Runnable task, String name, long stackSize, boolean inheritInheritableThreadLocals) { + super(group, task, name, stackSize, inheritInheritableThreadLocals); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThreadFactory.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThreadFactory.java new file mode 100644 index 000000000..45659d9d9 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/flush/FlushThreadFactory.java @@ -0,0 +1,39 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.flush; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by caojiajun on 2025/1/7 + */ +public class FlushThreadFactory implements ThreadFactory { + + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + private final boolean daemon; + + public FlushThreadFactory(String name, boolean daemon) { + this.group = Thread.currentThread().getThreadGroup(); + this.namePrefix = name + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; + this.daemon = daemon; + } + + public Thread newThread(Runnable r) { + Thread t = new FlushThread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (daemon) { + if (!t.isDaemon()) { + t.setDaemon(true); + } + } else { + if (t.isDaemon()) { + t.setDaemon(false); + } + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java index 441946254..78b3afbdf 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java @@ -4,7 +4,11 @@ import com.netease.nim.camellia.codec.Pack; import com.netease.nim.camellia.codec.Unpack; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.BlockLocation; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.ValueLocation; + +import java.util.Arrays; +import java.util.Objects; /** * Created by caojiajun on 2025/1/2 @@ -161,8 +165,9 @@ public void marshal(Pack pack) { pack.putLong(expireTime); } if (containsValue()) { - pack.putLong(valueLocation.fileId()); - pack.putLong(valueLocation.offset()); + pack.putLong(valueLocation.blockLocation().fileId()); + pack.putInt(valueLocation.blockLocation().blockId()); + pack.putInt(valueLocation.offset()); } if (containsExtra()) { pack.putVarbin(extra); @@ -178,12 +183,26 @@ public void unmarshal(Unpack unpack) { expireTime = unpack.popLong(); } if (containsValue()) { - long valueFileId = unpack.popLong(); - long valueOffset = unpack.popLong(); - valueLocation = new ValueLocation(valueFileId, valueOffset); + long fileId = unpack.popLong(); + int blockId = unpack.popInt(); + int offset = unpack.popInt(); + valueLocation = new ValueLocation(new BlockLocation(fileId, blockId), offset); } if (containsExtra()) { extra = unpack.popVarbin(); } } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KeyInfo keyInfo = (KeyInfo) o; + return Objects.deepEquals(key, keyInfo.key); + } + + @Override + public int hashCode() { + return Arrays.hashCode(key); + } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java index fca7de73b..d0f85b17f 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java @@ -2,7 +2,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.persist.KeyFlushExecutor; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotKeyBlockCache; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeyBlockCache; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotKeyReadWrite; import com.netease.nim.camellia.tools.utils.BytesKey; import com.netease.nim.camellia.tools.utils.CamelliaMapUtils; @@ -17,11 +17,11 @@ public class KeyReadWrite { private final KeyFlushExecutor executor; - private final SlotKeyBlockCache blockCache; + private final KeyBlockCache blockCache; private final ConcurrentHashMap map = new ConcurrentHashMap<>(); - public KeyReadWrite(KeyFlushExecutor executor, SlotKeyBlockCache blockCache) { + public KeyReadWrite(KeyFlushExecutor executor, KeyBlockCache blockCache) { this.executor = executor; this.blockCache = blockCache; } @@ -49,6 +49,14 @@ public void delete(short slot, BytesKey key) { get(slot).delete(key); } + public void flushPrepare(short slot) { + SlotKeyReadWrite slotKeyReadWrite = map.get(slot); + if (slotKeyReadWrite == null) { + return; + } + slotKeyReadWrite.flushPrepare(); + } + public CompletableFuture flush(short slot) { SlotKeyReadWrite slotKeyReadWrite = map.get(slot); if (slotKeyReadWrite == null) { @@ -56,4 +64,12 @@ public CompletableFuture flush(short slot) { } return slotKeyReadWrite.flush(); } + + public boolean needFlush(short slot) { + SlotKeyReadWrite slotKeyReadWrite = map.get(slot); + if (slotKeyReadWrite == null) { + return false; + } + return slotKeyReadWrite.needFlush(); + } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java index 09ceef9d6..0471d4b72 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java @@ -1,13 +1,13 @@ package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.persist; - import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.file.FileReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec.KeyCodec; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.flush.FlushExecutor; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.util.KeyHashUtils; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotKeyBlockCache; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeySlotMap; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeyBlockCache; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeyManifest; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotInfo; import com.netease.nim.camellia.tools.utils.BytesKey; import org.slf4j.Logger; @@ -15,7 +15,6 @@ import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; import static com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants.*; @@ -26,14 +25,14 @@ public class KeyFlushExecutor { private static final Logger logger = LoggerFactory.getLogger(KeyFlushExecutor.class); - private final ThreadPoolExecutor executor; - private final KeySlotMap keySlotMap; + private final FlushExecutor executor; + private final KeyManifest keyManifest; private final FileReadWrite fileReadWrite; - private final SlotKeyBlockCache blockCache; + private final KeyBlockCache blockCache; - public KeyFlushExecutor(ThreadPoolExecutor executor, KeySlotMap keySlotMap, FileReadWrite fileReadWrite, SlotKeyBlockCache blockCache) { + public KeyFlushExecutor(FlushExecutor executor, KeyManifest keyManifest, FileReadWrite fileReadWrite, KeyBlockCache blockCache) { this.executor = executor; - this.keySlotMap = keySlotMap; + this.keyManifest = keyManifest; this.fileReadWrite = fileReadWrite; this.blockCache = blockCache; } @@ -60,9 +59,9 @@ public CompletableFuture submit(KeyFlushTask flushTask) { private void execute(KeyFlushTask task) throws Exception { short slot = task.slot(); Map flushKeys = task.flushKeys(); - SlotInfo source = keySlotMap.get(slot); + SlotInfo source = keyManifest.get(slot); if (source == null) { - source = keySlotMap.init(slot); + source = keyManifest.init(slot); clear(source); } SlotInfo target = source; @@ -72,7 +71,7 @@ private void execute(KeyFlushTask task) throws Exception { if (lastWrite.success) { break; } - target = keySlotMap.expand(slot); + target = keyManifest.expand(slot); } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/IKeyManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/IKeyManifest.java new file mode 100644 index 000000000..5eeea4b20 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/IKeyManifest.java @@ -0,0 +1,43 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot; + +import java.io.IOException; + +/** + * Created by caojiajun on 2025/1/6 + */ +public interface IKeyManifest { + + /** + * init and load + * 初始化 + * @throws IOException exception + */ + void load() throws IOException; + + /** + * get slot info + * 获取slot-info + * @param slot slot + * @return slot info + * @throws IOException exception + */ + SlotInfo get(short slot) throws IOException; + + /** + * init slot info + * 初始化slot info + * @param slot slot + * @return slot info + * @throws IOException exception + */ + SlotInfo init(short slot) throws IOException; + + /** + * expand slot info, capacity will expand to double size + * 扩容slot-info,容量会在当前基础上加倍 + * @param slot slot + * @return slot info + * @throws IOException exception + */ + SlotInfo expand(short slot) throws IOException; +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyBlockCache.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyBlockCache.java similarity index 95% rename from camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyBlockCache.java rename to camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyBlockCache.java index 13aac02be..28110328f 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyBlockCache.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyBlockCache.java @@ -15,16 +15,16 @@ /** * Created by caojiajun on 2025/1/2 */ -public class SlotKeyBlockCache { +public class KeyBlockCache { - private final KeySlotMap keySlotMap; + private final KeyManifest keySlotMap; private final FileReadWrite fileReadWrite; private final ConcurrentLinkedHashMap readCache; private final ConcurrentLinkedHashMap writeCache; - public SlotKeyBlockCache(KeySlotMap keySlotMap, FileReadWrite fileReadWrite) { + public KeyBlockCache(KeyManifest keySlotMap, FileReadWrite fileReadWrite) { this.keySlotMap = keySlotMap; this.fileReadWrite = fileReadWrite; readCache = new ConcurrentLinkedHashMap.Builder() diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeySlotMap.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyManifest.java similarity index 89% rename from camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeySlotMap.java rename to camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyManifest.java index 7a50d325c..1610cd61f 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeySlotMap.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyManifest.java @@ -20,9 +20,9 @@ /** * Created by caojiajun on 2024/12/31 */ -public class KeySlotMap { +public class KeyManifest implements IKeyManifest { - private static final Logger logger = LoggerFactory.getLogger(KeySlotMap.class); + private static final Logger logger = LoggerFactory.getLogger(KeyManifest.class); private static final byte[] magic_header = "camellia_header".getBytes(StandardCharsets.UTF_8); private static final byte[] magic_footer = "camellia_footer".getBytes(StandardCharsets.UTF_8); @@ -35,33 +35,34 @@ public class KeySlotMap { private final String fileName; private FileChannel fileChannel; - public KeySlotMap(String fileName) { + public KeyManifest(String fileName) { this.fileName = fileName; } + @Override public void load() throws IOException { - logger.info("try load key.slot.map.file = {}", fileName); + logger.info("try load key.manifest.file = {}", fileName); File file = new File(fileName); if (!file.exists()) { boolean newFile = file.createNewFile(); - logger.info("create key.slot.map.file = {}, result = {}", fileName, newFile); + logger.info("create key.manifest.file = {}, result = {}", fileName, newFile); } fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ, StandardOpenOption.WRITE); if (fileChannel.size() == 0) { ByteBuffer buffer1 = ByteBuffer.wrap(magic_header); while (buffer1.hasRemaining()) { int write = fileChannel.write(buffer1); - logger.info("init slot map file, magic_header, key.slot.map.file = {}, result = {}", fileName, write); + logger.info("init key.manifest.file, magic_header, key.manifest.file = {}, result = {}", fileName, write); } ByteBuffer buffer2 = ByteBuffer.wrap(magic_footer); while (buffer2.hasRemaining()) { int write = fileChannel.write(buffer2, magic_header.length + RedisClusterCRC16Utils.SLOT_SIZE * (8+8+4)); - logger.info("init slot map file, magic_footer, key.slot.map.file = {}, result = {}", fileName, write); + logger.info("init key.manifest.file, magic_footer, key.manifest.file = {}, result = {}", fileName, write); } } else { int len = magic_header.length + magic_footer.length + RedisClusterCRC16Utils.SLOT_SIZE * (8+8+4); if (fileChannel.size() != len) { - throw new IOException("slot map file illegal size"); + throw new IOException("key.manifest.file illegal size"); } ByteBuffer buffer = ByteBuffer.allocate(len); fileChannel.read(buffer); @@ -69,7 +70,7 @@ public void load() throws IOException { byte[] realMagicHeader = new byte[magic_header.length]; buffer.get(realMagicHeader); if (!Arrays.equals(realMagicHeader, magic_header)) { - throw new IOException("slot map file magic_header not match!"); + throw new IOException("key.manifest.file magic_header not match!"); } long totalCapacity = 0; for (short slot=0; slot mutable = new HashMap<>(); - private Map immutable = new HashMap<>(); - private volatile boolean flushing = false; + private final Map mutable = new HashMap<>(); + private final Map immutable = new HashMap<>(); - public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, SlotKeyBlockCache blockCache) { + private volatile FlushStatus flushStatus = FlushStatus.FLUSH_OK; + + public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, KeyBlockCache blockCache) { this.slot = slot; this.executor = executor; this.blockCache = blockCache; @@ -61,7 +63,6 @@ public KeyInfo get(BytesKey key) throws IOException { */ public void put(KeyInfo key) { mutable.put(new BytesKey(key.getKey()), key); - checkAndFlush(); } /** @@ -70,7 +71,6 @@ public void put(KeyInfo key) { */ public void delete(BytesKey key) { mutable.put(key, KeyInfo.DELETE); - checkAndFlush(); } /** @@ -78,16 +78,12 @@ public void delete(BytesKey key) { */ public CompletableFuture flush() { CompletableFuture future = new CompletableFuture<>(); - if (flushing) { + if (flushStatus == FlushStatus.FLUSHING || flushStatus == FlushStatus.FLUSH_OK) { future.complete(FlushResult.SKIP); return future; } - if (mutable.isEmpty()) { - future.complete(FlushResult.OK); - return future; - } - Map flushKeys = flushPrepare(); - CompletableFuture submit = executor.submit(new KeyFlushTask(slot, flushKeys)); + flushStatus = FlushStatus.FLUSHING; + CompletableFuture submit = executor.submit(new KeyFlushTask(slot, immutable)); submit.thenAccept(b -> { flushDone(); future.complete(b); @@ -95,21 +91,34 @@ public CompletableFuture flush() { return future; } - private Map flushPrepare() { - immutable = mutable; - mutable = new HashMap<>(); - flushing = true; - return immutable; + /** + * flush prepare + */ + public void flushPrepare() { + if (flushStatus != FlushStatus.FLUSH_OK) { + return; + } + if (mutable.isEmpty()) { + return; + } + immutable.putAll(mutable); + mutable.clear(); + flushStatus = FlushStatus.PREPARE; } - private void flushDone() { - immutable.clear(); - flushing = false; + /** + * check need flush + * @return true/false + */ + public boolean needFlush() { + if (flushStatus != FlushStatus.FLUSH_OK) { + return false; + } + return mutable.size() >= 200; } - private void checkAndFlush() { - if (mutable.size() >= 200 && !flushing) { - flush(); - } + private void flushDone() { + immutable.clear(); + flushStatus = FlushStatus.FLUSH_OK; } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockInfo.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockInfo.java new file mode 100644 index 000000000..84a018aa2 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockInfo.java @@ -0,0 +1,9 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block; + + + +/** + * Created by caojiajun on 2025/1/6 + */ +public record BlockInfo(BlockType blockType, BlockLocation blockLocation, byte[] data) { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockLocation.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockLocation.java new file mode 100644 index 000000000..e297bab3e --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockLocation.java @@ -0,0 +1,7 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block; + +/** + * Created by caojiajun on 2025/1/6 + */ +public record BlockLocation(long fileId, int blockId) { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockType.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockType.java new file mode 100644 index 000000000..03cab76a9 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/BlockType.java @@ -0,0 +1,52 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants; + + +/** + * Created by caojiajun on 2025/1/6 + */ +public enum BlockType { + _4k(1, 4*1024, 2), + _32k(2, 32*1024, 2), + _256k(3, 256*1024, 4), + _1024k(4, 1024*1024, 4), + no_align(5, -1, 4), + ; + + private final int type; + private final int blockSize; + private final int sizeLen; + + BlockType(int type, int blockSize, int sizeLen) { + this.type = type; + this.blockSize = blockSize; + this.sizeLen = sizeLen; + } + + public int getType() { + return type; + } + + public int getBlockSize() { + return blockSize; + } + + public int getSizeLen() { + return sizeLen; + } + + public static BlockType fromData(byte[] data) { + if (data.length + 4 + 2 < EmbeddedStorageConstants._4k) { + return BlockType._4k; + } else if (data.length + 4 + 2 < EmbeddedStorageConstants._32k) { + return BlockType._32k; + } else if (data.length + 4 + 4 < EmbeddedStorageConstants._256k) { + return BlockType._256k; + } else if (data.length + 4 + 4 < EmbeddedStorageConstants._1024k) { + return BlockType._1024k; + } else { + return BlockType.no_align; + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java new file mode 100644 index 000000000..648815496 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IValueManifest.java @@ -0,0 +1,41 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block; + +import java.io.IOException; + +/** + * Created by caojiajun on 2025/1/6 + */ +public interface IValueManifest { + + /** + * init and load + * @throws IOException exception + */ + void load() throws IOException; + + /** + * allocate block + * @param slot slot + * @param blockType block type, 4k、32k、256k、1024k、no_align + * @return block location + * @throws IOException exception + */ + BlockLocation allocate(short slot, BlockType blockType) throws IOException; + + /** + * block type + * @param fileId fileId + * @return block type + */ + BlockType blockType(long fileId) throws IOException; + + /** + * block flush to disk + * @param slot slot + * @param location location + * @throws IOException exception + */ + void flush(short slot, BlockLocation location) throws IOException; + + +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/ValueLocation.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueLocation.java similarity index 55% rename from camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/ValueLocation.java rename to camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueLocation.java index ffee18d1d..c243b117c 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/ValueLocation.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueLocation.java @@ -1,8 +1,8 @@ -package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value; +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block; /** * Created by caojiajun on 2025/1/3 */ -public record ValueLocation(long fileId, long offset) { +public record ValueLocation(BlockLocation blockLocation, int offset) { } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java new file mode 100644 index 000000000..97d23a812 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/ValueManifest.java @@ -0,0 +1,29 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block; + +import java.io.IOException; + +/** + * Created by caojiajun on 2025/1/6 + */ +public class ValueManifest implements IValueManifest { + + @Override + public void load() throws IOException { + + } + + @Override + public BlockLocation allocate(short slot, BlockType blockType) throws IOException { + return null; + } + + @Override + public BlockType blockType(long fileId) throws IOException { + return null; + } + + @Override + public void flush(short slot, BlockLocation location) throws IOException { + + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/StringValueFlushTask.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/StringValueFlushTask.java new file mode 100644 index 000000000..277865b0d --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/StringValueFlushTask.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.persist; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; + +import java.util.Map; + +/** + * Created by caojiajun on 2025/1/6 + */ +public record StringValueFlushTask(short slot, Map flushValues) { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java new file mode 100644 index 000000000..9d601b132 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/persist/ValueFlushExecutor.java @@ -0,0 +1,82 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.persist; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec.StringValueCodec; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.file.FileReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.flush.FlushExecutor; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.*; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string.StringBlockCache; +import com.netease.nim.camellia.tools.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Created by caojiajun on 2025/1/6 + */ +public class ValueFlushExecutor { + + private static final Logger logger = LoggerFactory.getLogger(ValueFlushExecutor.class); + + private final FlushExecutor executor; + private final ValueManifest valueManifest; + private final FileReadWrite fileReadWrite; + private final StringBlockCache blockCache; + + public ValueFlushExecutor(FlushExecutor executor, ValueManifest valueManifest, FileReadWrite fileReadWrite, StringBlockCache blockCache) { + this.executor = executor; + this.valueManifest = valueManifest; + this.fileReadWrite = fileReadWrite; + this.blockCache = blockCache; + } + + public CompletableFuture submit(StringValueFlushTask flushTask) { + CompletableFuture future = new CompletableFuture<>(); + try { + executor.submit(() -> { + try { + execute(flushTask); + future.complete(FlushResult.OK); + } catch (Exception e) { + logger.error("string value flush error, slot = {}", flushTask.slot(), e); + future.complete(FlushResult.ERROR); + } + }); + } catch (Exception e) { + logger.error("submit string value flush error, slot = {}", flushTask.slot(), e); + future.complete(FlushResult.ERROR); + } + return future; + } + + private void execute(StringValueFlushTask task) throws Exception { + short slot = task.slot(); + Map flushValues = task.flushValues(); + Map>> blockMap = new HashMap<>(); + for (Map.Entry entry : flushValues.entrySet()) { + byte[] data = entry.getValue(); + BlockType blockType = BlockType.fromData(data); + List> buffers = blockMap.computeIfAbsent(blockType, k -> new ArrayList<>()); + buffers.add(new Pair<>(entry.getKey(), entry.getValue())); + } + List list = new ArrayList<>(); + for (Map.Entry>> entry : blockMap.entrySet()) { + List blockInfos = StringValueCodec.encode(slot, entry.getKey(), valueManifest, entry.getValue()); + list.addAll(blockInfos); + } + for (BlockInfo blockInfo : list) { + BlockLocation blockLocation = blockInfo.blockLocation(); + long fileId = blockLocation.fileId(); + long offset = (long) blockLocation.blockId() * blockInfo.blockType().getBlockSize(); + fileReadWrite.write(fileId, offset, blockInfo.data()); + blockCache.updateBlockCache(slot, fileId, offset, blockInfo.data()); + } + } + +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/SlotStringReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/SlotStringReadWrite.java new file mode 100644 index 000000000..142b3856f --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/SlotStringReadWrite.java @@ -0,0 +1,83 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushStatus; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.persist.StringValueFlushTask; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.persist.ValueFlushExecutor; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class SlotStringReadWrite { + + private final short slot; + private final StringBlockCache slotStringBlockCache; + private final ValueFlushExecutor flushExecutor; + + private final Map mutable = new HashMap<>(); + private final Map immutable = new HashMap<>(); + private volatile FlushStatus flushStatus = FlushStatus.FLUSH_OK; + + public SlotStringReadWrite(short slot, ValueFlushExecutor flushExecutor, StringBlockCache slotStringBlockCache) { + this.slot = slot; + this.flushExecutor = flushExecutor; + this.slotStringBlockCache = slotStringBlockCache; + } + + public void put(KeyInfo keyInfo, byte[] data) throws IOException { + mutable.put(keyInfo, data); + } + + public void delete(KeyInfo keyInfo) throws IOException { + mutable.remove(keyInfo); + } + + public byte[] get(KeyInfo keyInfo) throws IOException { + byte[] data = mutable.get(keyInfo); + if (data != null) { + return data; + } + data = immutable.get(keyInfo); + if (data != null) { + return data; + } + return slotStringBlockCache.get(slot, keyInfo); + } + + public CompletableFuture flush() throws IOException { + if (flushStatus != FlushStatus.FLUSH_OK) { + return CompletableFuture.completedFuture(FlushResult.SKIP); + } + if (mutable.isEmpty()) { + return CompletableFuture.completedFuture(FlushResult.OK); + } + CompletableFuture future = new CompletableFuture<>(); + immutable.putAll(mutable); + mutable.clear(); + flushStatus = FlushStatus.FLUSHING; + CompletableFuture submit = flushExecutor.submit(new StringValueFlushTask(slot, immutable)); + submit.thenAccept(flushResult -> { + flushDone(); + future.complete(flushResult); + }); + return future; + } + + public boolean needFlush() { + if (flushStatus != FlushStatus.FLUSH_OK) { + return false; + } + return mutable.size() >= 200; + } + + private void flushDone() { + immutable.clear(); + flushStatus = FlushStatus.FLUSH_OK; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringBlockCache.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringBlockCache.java new file mode 100644 index 000000000..518bdabff --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringBlockCache.java @@ -0,0 +1,17 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; + +/** + * Created by caojiajun on 2025/1/6 + */ +public class StringBlockCache { + + public byte[] get(short slot, KeyInfo keyInfo) { + return null; + } + + public void updateBlockCache(short slot, long fileId, long offset, byte[] block) { + + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java index 745705c34..c0e15264f 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java @@ -1,18 +1,56 @@ package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.*; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.persist.ValueFlushExecutor; +import com.netease.nim.camellia.tools.utils.CamelliaMapUtils; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; /** * Created by caojiajun on 2025/1/3 */ public class StringReadWrite { - public ValueLocation put(short slot, KeyInfo keyInfo, byte[] data) { - return null; + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); + + private final ValueFlushExecutor flushExecutor; + private final StringBlockCache stringBlockCache; + + public StringReadWrite(ValueFlushExecutor flushExecutor, StringBlockCache stringBlockCache) { + this.flushExecutor = flushExecutor; + this.stringBlockCache = stringBlockCache; + } + + public void put(short slot, KeyInfo keyInfo, byte[] data) throws IOException { + get(slot).put(keyInfo, data); + } + + public byte[] get(short slot, KeyInfo keyInfo) throws IOException { + return get(slot).get(keyInfo); + } + + public CompletableFuture flush(short slot) throws IOException { + SlotStringReadWrite slotStringReadWrite = get(slot); + if (slotStringReadWrite == null) { + return CompletableFuture.completedFuture(FlushResult.OK); + } + return slotStringReadWrite.flush(); } - public byte[] get(ValueLocation location) { - return null; + private SlotStringReadWrite get(short slot) { + return CamelliaMapUtils.computeIfAbsent(map, slot, s -> new SlotStringReadWrite(slot, flushExecutor, stringBlockCache)); } + + public boolean needFlush(short slot) { + SlotStringReadWrite slotStringReadWrite = get(slot); + if (slotStringReadWrite == null) { + return false; + } + return slotStringReadWrite.needFlush(); + } + } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyCodecTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyCodecTest.java index 9fae6f971..149514735 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyCodecTest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyCodecTest.java @@ -3,7 +3,8 @@ import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec.KeyCodec; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.BlockLocation; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.ValueLocation; import com.netease.nim.camellia.tools.utils.BytesKey; import org.junit.Assert; import org.junit.Test; @@ -25,9 +26,9 @@ public void test() { KeyInfo keyInfo1 = keyInfo("k1", 0, null, "v1"); KeyInfo keyInfo2 = keyInfo("k2", 10000, null, "v2"); KeyInfo keyInfo3 = keyInfo("k3", 20000, null, null); - KeyInfo keyInfo4 = keyInfo("k4", 0, new ValueLocation(1, 1000), null); - KeyInfo keyInfo5 = keyInfo("k5", 30000, new ValueLocation(2, 2000), "v5"); - KeyInfo keyInfo6 = keyInfo("k6", 40000, new ValueLocation(3, 3000), null); + KeyInfo keyInfo4 = keyInfo("k4", 0, new ValueLocation(new BlockLocation(1, 10), (short) 1000), null); + KeyInfo keyInfo5 = keyInfo("k5", 30000, new ValueLocation(new BlockLocation(2, 30), (short) 2000), "v5"); + KeyInfo keyInfo6 = keyInfo("k6", 40000, new ValueLocation(new BlockLocation(1, 50), (short) 3000), null); map.put(new BytesKey(keyInfo1.getKey()), keyInfo1); map.put(new BytesKey(keyInfo2.getKey()), keyInfo2); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeySlotMapTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyManifestTest.java similarity index 94% rename from camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeySlotMapTest.java rename to camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyManifestTest.java index a61af29c1..f714d4f92 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeySlotMapTest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyManifestTest.java @@ -1,6 +1,6 @@ package com.netease.nim.camellia.redis.proxy.test; -import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeySlotMap; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeyManifest; import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotInfo; import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils; import org.junit.After; @@ -16,7 +16,7 @@ /** * Created by caojiajun on 2025/1/2 */ -public class KeySlotMapTest { +public class KeyManifestTest { private final String fileName1 = "/tmp/" + UUID.randomUUID(); private final String fileName2 = "/tmp/" + UUID.randomUUID(); @@ -44,7 +44,7 @@ public void before() { @Test public void test() throws IOException { - KeySlotMap keySlotMap = new KeySlotMap(fileName1); + KeyManifest keySlotMap = new KeyManifest(fileName1); keySlotMap.load(); short slot = (short)ThreadLocalRandom.current().nextInt(RedisClusterCRC16Utils.SLOT_SIZE / 2); @@ -68,7 +68,7 @@ public void test() throws IOException { SlotInfo slotInfo3 = keySlotMap.init((short) (slot + 200)); Assert.assertEquals(slotInfo3, slotInfo); - KeySlotMap keySlotMap2 = new KeySlotMap(fileName1); + KeyManifest keySlotMap2 = new KeyManifest(fileName1); keySlotMap2.load(); SlotInfo slotInfo11 = keySlotMap2.get(slot); @@ -81,7 +81,7 @@ public void test() throws IOException { @Test public void test2() throws IOException { - KeySlotMap keySlotMap = new KeySlotMap(fileName2); + KeyManifest keySlotMap = new KeyManifest(fileName2); keySlotMap.load(); long time1 = System.nanoTime(); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java new file mode 100644 index 000000000..d60d6740a --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java @@ -0,0 +1,127 @@ +package com.netease.nim.camellia.redis.proxy.test; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec.StringValueCodec; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.*; +import com.netease.nim.camellia.tools.utils.Pair; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by caojiajun on 2025/1/7 + */ +public class StringValueCodecTest { + + private static final long fileId = System.currentTimeMillis(); + + + @Test + public void test() throws IOException { + short slot = 1; + BlockType blockType = BlockType._4k; + IValueManifest valueManifest = new MockValueManifest(); + + List> values = new ArrayList<>(); + + values.add(new Pair<>(keyInfo("k1", 1000L, null, null), UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))); + values.add(new Pair<>(keyInfo("k2", 1000L, null, null), UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))); + values.add(new Pair<>(keyInfo("k3", 1000L, null, null), UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))); + values.add(new Pair<>(keyInfo("k4", 1000L, null, null), "111".getBytes(StandardCharsets.UTF_8))); + values.add(new Pair<>(keyInfo("k5", 1000L, null, null), "hhhhh".getBytes(StandardCharsets.UTF_8))); + values.add(new Pair<>(keyInfo("k6", 1000L, null, null), "1k12l12121".getBytes(StandardCharsets.UTF_8))); + values.add(new Pair<>(keyInfo("k7", 1000L, null, null), "sasasas".getBytes(StandardCharsets.UTF_8))); + + List blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values); + + Assert.assertEquals(blockInfos.size(), 1); + + BlockInfo blockInfo = blockInfos.getFirst(); + List list = StringValueCodec.decode(blockInfo.data(), blockType); + + Assert.assertEquals(list.size(), values.size()); + + for (int i=0; i> values = new ArrayList<>(); + + for (int i=0; i<10000; i++) { + values.add(new Pair<>(keyInfo("k" + i, 1000L, null, null), UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))); + } + + List blockInfos = StringValueCodec.encode(slot, blockType, valueManifest, values); + + for (int i=0;i result = new ArrayList<>(); + for (BlockInfo blockInfo : blockInfos) { + List list = StringValueCodec.decode(blockInfo.data(), blockType); + result.addAll(list); + } + + Assert.assertEquals(result.size(), values.size()); + + for (int i=0; i