diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java index 64147d29f..18ac14907 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/RedisLocalStorageClient.java @@ -12,6 +12,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.*; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.db.MemFlushCommand; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal; import com.netease.nim.camellia.redis.proxy.upstream.utils.CompletableFutureUtils; import com.netease.nim.camellia.redis.proxy.util.*; import org.slf4j.Logger; @@ -84,9 +85,13 @@ public void start() { LocalStorageReadWrite readWrite = new LocalStorageReadWrite(dir); CompactExecutor compactExecutor = new CompactExecutor(readWrite); + Wal wal = new Wal(readWrite); + wal.recover(); + CommandConfig commandConfig = new CommandConfig(); commandConfig.setCompactExecutor(compactExecutor); commandConfig.setReadWrite(readWrite); + commandConfig.setWal(wal); commands = new Commands(commandConfig); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/codec/WalEntryCodec.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/codec/WalEntryCodec.java new file mode 100644 index 000000000..68e6a5efa --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/codec/WalEntryCodec.java @@ -0,0 +1,46 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec; + +import com.netease.nim.camellia.codec.Pack; +import com.netease.nim.camellia.codec.Unpack; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.StringWalEntry; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalEntry; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalEntryType; + +/** + * Created by caojiajun on 2025/1/17 + */ +public class WalEntryCodec { + + public static byte[] encode(WalEntry walEntry) { + if (walEntry instanceof StringWalEntry) { + Pack pack = new Pack(); + pack.putByte(WalEntryType.string.getType()); + pack.putMarshallable(((StringWalEntry) walEntry).keyInfo()); + byte[] value = ((StringWalEntry) walEntry).value(); + if (value != null) { + pack.putVarbin(value); + } + pack.getBuffer().capacity(pack.getBuffer().readableBytes()); + return pack.getBuffer().array(); + } else { + throw new IllegalArgumentException("not support WalEntry"); + } + } + + public static WalEntry decode(byte[] data) { + Unpack unpack = new Unpack(data); + byte type = unpack.popByte(); + if (type == WalEntryType.string.getType()) { + KeyInfo keyInfo = new KeyInfo(); + unpack.popMarshallable(keyInfo); + byte[] value = null; + if (unpack.getBuffer().readableBytes() > 0) { + value = unpack.popVarbin(); + } + return new StringWalEntry(keyInfo, value); + } else { + throw new IllegalArgumentException("not support WalEntry"); + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/CommandConfig.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/CommandConfig.java index 30f131e24..fee80a67d 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/CommandConfig.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/CommandConfig.java @@ -1,6 +1,7 @@ package com.netease.nim.camellia.redis.proxy.upstream.local.storage.command; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal; /** * Created by caojiajun on 2025/1/10 @@ -9,6 +10,7 @@ public class CommandConfig { private CompactExecutor compactExecutor; private LocalStorageReadWrite readWrite; + private Wal wal; public CompactExecutor getCompactExecutor() { return compactExecutor; @@ -25,4 +27,12 @@ public LocalStorageReadWrite getReadWrite() { public void setReadWrite(LocalStorageReadWrite readWrite) { this.readWrite = readWrite; } + + public Wal getWal() { + return wal; + } + + public void setWal(Wal wal) { + this.wal = wal; + } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/Commands.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/Commands.java index 9d56a3d56..5202168d4 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/Commands.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/Commands.java @@ -9,11 +9,13 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.db.*; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.string.*; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.SlotWalOffset; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal; import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector; import java.io.IOException; @@ -33,10 +35,13 @@ public class Commands { protected KeyReadWrite keyReadWrite; protected StringReadWrite stringReadWrite; + protected Wal wal; + public Commands(CommandConfig commandConfig) { compactExecutor = commandConfig.getCompactExecutor(); keyReadWrite = commandConfig.getReadWrite().getKeyReadWrite(); stringReadWrite = commandConfig.getReadWrite().getStringReadWrite(); + wal = commandConfig.getWal(); //db initCommand(new MemFlushCommand(commandConfig)); @@ -93,12 +98,18 @@ private void afterWrite(short slot) throws IOException { compactExecutor.compact(slot); //flush if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) { + //获取slot当前wal写到哪里了 + SlotWalOffset slotWalOffset = wal.getSlotWalOffsetEnd(slot); //key flush prepare Map keyMap = keyReadWrite.flushPrepare(slot); //flush string value - CompletableFuture future = stringReadWrite.flush(slot, keyMap); - //flush key - future.thenAccept(result -> keyReadWrite.flush(slot)); + CompletableFuture future1 = stringReadWrite.flush(slot, keyMap); + future1.thenAccept(flushResult1 -> { + //flush key + CompletableFuture future2 = keyReadWrite.flush(slot); + //flush wal,表示这个offset之前的关于指定slot的日志条目都无效了 + future2.thenAccept(flushResult2 -> wal.flush(slot, slotWalOffset)); + }); } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/ICommand.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/ICommand.java index fac8be681..51da26e16 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/ICommand.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/ICommand.java @@ -6,6 +6,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal; /** * Created by caojiajun on 2025/1/3 @@ -16,11 +17,13 @@ public abstract class ICommand { protected KeyReadWrite keyReadWrite; protected StringReadWrite stringReadWrite; + protected Wal wal; public ICommand(CommandConfig commandConfig) { compactExecutor = commandConfig.getCompactExecutor(); keyReadWrite = commandConfig.getReadWrite().getKeyReadWrite(); stringReadWrite = commandConfig.getReadWrite().getStringReadWrite(); + wal = commandConfig.getWal(); } /** diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/LocalStorageReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/LocalStorageReadWrite.java index d892cdb59..eb185ed7b 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/LocalStorageReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/LocalStorageReadWrite.java @@ -11,6 +11,9 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.persist.ValueFlushExecutor; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.block.StringBlockReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.IWalManifest; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalManifest; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalReadWrite; import java.io.IOException; @@ -19,6 +22,9 @@ */ public class LocalStorageReadWrite { + // + private WalReadWrite walReadWrite; + private IWalManifest walManifest; // private final IKeyManifest keyManifest; private final IValueManifest valueManifest; @@ -50,6 +56,14 @@ public LocalStorageReadWrite(String dir) throws IOException { stringReadWrite = new StringReadWrite(valueFlushExecutor, stringBlockReadWrite); } + public WalReadWrite getWalReadWrite() { + return walReadWrite; + } + + public IWalManifest getWalManifest() { + return walManifest; + } + public IKeyManifest getKeyManifest() { return keyManifest; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java index 3a80f1c23..5cadc4da4 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/command/string/SetCommand.java @@ -12,9 +12,14 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.DataType; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.upstream.kv.command.string.SetCommander; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.StringWalEntry; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalWriteResult; import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector; import com.netease.nim.camellia.redis.proxy.util.Utils; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k; /** @@ -120,14 +125,29 @@ protected Reply execute(short slot, Command command) throws Exception { keyInfo = new KeyInfo(DataType.string, key.key()); } + byte[] bigValue = null; if (key.key().length + value.length <= 128) { keyInfo.setExtra(value); } else { keyInfo.setExtra(null); + bigValue = value; + } + + //写入wal + StringWalEntry walEntry = new StringWalEntry(keyInfo, bigValue); + CompletableFuture future = wal.append(slot, walEntry); + WalWriteResult result = future.get(30, TimeUnit.SECONDS); + if (result != WalWriteResult.success) { + return ErrorReply.INTERNAL_ERROR; + } + + //写入memtable + if (bigValue != null) { stringReadWrite.put(slot, keyInfo, value); } keyReadWrite.put(slot, keyInfo); + //响应 if (get) { return new BulkReply(oldValue); } else { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java index 1a5bb68b1..ef46c58f9 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java @@ -12,7 +12,6 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValue; import com.netease.nim.camellia.redis.proxy.util.TimeCache; -import com.netease.nim.camellia.redis.proxy.util.Utils; import com.netease.nim.camellia.tools.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileNames.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileNames.java index 1a77d928c..9193d32af 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileNames.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileNames.java @@ -15,42 +15,39 @@ public class FileNames { private static final Logger logger = LoggerFactory.getLogger(FileNames.class); - public static String createKeyManifestFileIfNotExists(String dir) throws IOException { + public static void createKeyManifestFileIfNotExists(String dir) throws IOException { String fileName = keyManifestFile(dir); File file = new File(fileName); if (!file.exists()) { boolean result = file.createNewFile(); logger.info("create key manifest file, file = {}, result = {}", fileName, result); } - return fileName; } public static String keyManifestFile(String dir) { return dir + "/" + "key.manifest"; } - public static String createKeyFile(String dir, long fileId) throws IOException { + public static void createKeyFile(String dir, long fileId) throws IOException { String fileName = keyFile(dir, fileId); File file = new File(fileName); if (!file.exists()) { boolean result = file.createNewFile(); logger.info("create key file, file = {}, result = {}", file, result); } - return fileName; } public static String keyFile(String dir, long fileId) { return dir + "/" + fileId + ".key"; } - public static String createStringDataFileIfNotExists(String dir, BlockType blockType, long fileId) throws IOException { + public static void createStringDataFileIfNotExists(String dir, BlockType blockType, long fileId) throws IOException { String fileName = stringBlockFile(dir, blockType, fileId); File file = new File(fileName); if (!file.exists()) { boolean result = file.createNewFile(); - logger.info("create string block file, file = {}, result = {}", file, result); + logger.info("create string data file, file = {}, result = {}", file, result); } - return fileName; } public static String stringBlockFile(String dir, BlockType blockType, long fileId) { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileReadWrite.java index ba39af7d1..f3f77362f 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/file/FileReadWrite.java @@ -45,4 +45,14 @@ public byte[] read(String file, long offset, int size) throws IOException { fileChannel.read(buffer, offset); return buffer.array(); } + + public int readInt(String file, long offset) throws IOException { + FileChannel fileChannel = getFileChannel(file); + if (offset >= fileChannel.size()) { + return -1; + } + ByteBuffer buffer = ByteBuffer.allocate(4); + fileChannel.read(buffer, offset); + return buffer.getInt(); + } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/flush/FlushResult.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/flush/FlushResult.java new file mode 100644 index 000000000..d1532f6d7 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/flush/FlushResult.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush; + +/** + * Created by caojiajun on 2025/1/3 + */ +public enum FlushResult { + OK, + ERROR, + SKIP, + ; +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/flush/FlushStatus.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/flush/FlushStatus.java new file mode 100644 index 000000000..bf8975203 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/flush/FlushStatus.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush; + +/** + * Created by caojiajun on 2025/1/6 + */ +public enum FlushStatus { + PREPARE, + FLUSHING, + FLUSH_OK, + ; +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java index 7f2ed668d..97bcffc1c 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java @@ -3,7 +3,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.ValueWrapper; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.EstimateSizeValueCalculator; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.LRUCache; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.persist.KeyFlushExecutor; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.block.KeyBlockReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.SlotKeyReadWrite; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/persist/KeyFlushExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/persist/KeyFlushExecutor.java index 833ef1dd0..b2a29bdd9 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/persist/KeyFlushExecutor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/persist/KeyFlushExecutor.java @@ -1,7 +1,7 @@ package com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.persist; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.KeyCodec; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushExecutor; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.IKeyManifest; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java index 53a695d09..8bc0f3eea 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java @@ -3,8 +3,8 @@ import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.ValueWrapper; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.CacheType; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushStatus; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushStatus; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.block.KeyBlockReadWrite; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.persist.KeyFlushExecutor; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/persist/ValueFlushExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/persist/ValueFlushExecutor.java index e93aa30aa..69256ddcb 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/persist/ValueFlushExecutor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/persist/ValueFlushExecutor.java @@ -2,7 +2,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueCodec; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueEncodeResult; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushExecutor; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/SlotStringReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/SlotStringReadWrite.java index b63e28e49..45415a8ed 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/SlotStringReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/SlotStringReadWrite.java @@ -2,8 +2,8 @@ import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.ValueWrapper; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValue; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushStatus; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushStatus; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.persist.StringValueFlushTask; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java index c1fac7868..a22ad91d9 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/StringReadWrite.java @@ -4,7 +4,7 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.EstimateSizeValueCalculator; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.LRUCache; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.SizeCalculator; -import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.persist.ValueFlushExecutor; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/IWalManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/IWalManifest.java new file mode 100644 index 000000000..1c7fa9dff --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/IWalManifest.java @@ -0,0 +1,62 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +import java.util.Map; + +/** + * Created by caojiajun on 2025/1/17 + */ +public interface IWalManifest { + + /** + * 初始化加载 + */ + void load(); + + /** + * 这个slot的wal应该写在哪个文件上 + * @param slot slot + * @return 文件id + */ + long fileId(short slot); + + /** + * wal文件已经写到哪里了 + * @param fileId 文件id + * @return 下一个可以写数据的offset + */ + long getFileWriteNextOffset(long fileId); + + /** + * 写完日志后,更新wal文件已经写到哪里了 + * @param fileId 文件id + * @param nextOffset 下一个可以写数据的offset + */ + void updateFileWriteNextOffset(long fileId, long nextOffset); + + /** + * 查询slot最新一条日志写入的offset + * @param slot slot + * @return offset + */ + SlotWalOffset getSlotWalOffsetEnd(short slot); + + /** + * 更新slot最新一条日志写入的offset位置 + * @param slot slot + * @param offset offset + */ + void updateSlotWalOffsetEnd(short slot, SlotWalOffset offset); + + /** + * 更新slot最早一条日志的offset,小于等于这个offset的日志都无效了 + * @param slot slot + * @param offset offset + */ + void updateSlotWalOffsetStart(short slot, SlotWalOffset offset); + + /** + * 查询所有slot的最早有效记录的offset,小于等于这个offset的日志都无效了 + * @return 记录 + */ + Map getSlotWalOffsetStartMap(); +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/LogRecord.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/LogRecord.java new file mode 100644 index 000000000..a433f6291 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/LogRecord.java @@ -0,0 +1,62 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +import com.netease.nim.camellia.codec.Marshallable; +import com.netease.nim.camellia.codec.Pack; +import com.netease.nim.camellia.codec.Unpack; + +/** + * Created by caojiajun on 2025/1/14 + */ +public class LogRecord implements Marshallable { + + private long id; + private short slot; + private byte[] data; + + public LogRecord() { + } + + public LogRecord(long id, short slot, byte[] data) { + this.id = id; + this.slot = slot; + this.data = data; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public short getSlot() { + return slot; + } + + public void setSlot(short slot) { + this.slot = slot; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + @Override + public void marshal(Pack pack) { + pack.putLong(id); + pack.putShort(slot); + pack.putVarbin(data); + } + + @Override + public void unmarshal(Unpack unpack) { + id = unpack.popLong(); + slot = unpack.popShort(); + data = unpack.popVarbin(); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/LogRecordIdGen.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/LogRecordIdGen.java new file mode 100644 index 000000000..028f67010 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/LogRecordIdGen.java @@ -0,0 +1,14 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +/** + * Created by caojiajun on 2025/1/14 + */ +public class LogRecordIdGen { + + public LogRecordIdGen() { + } + + public long nextId() { + return System.currentTimeMillis(); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/SlotWalOffset.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/SlotWalOffset.java new file mode 100644 index 000000000..5a76efddb --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/SlotWalOffset.java @@ -0,0 +1,7 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +/** + * Created by caojiajun on 2025/1/14 + */ +public record SlotWalOffset(long recordId, long fileId, long fileOffset) { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/StringWalEntry.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/StringWalEntry.java new file mode 100644 index 000000000..4bebe0f73 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/StringWalEntry.java @@ -0,0 +1,22 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.LocalStorageReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite; + +/** + * Created by caojiajun on 2025/1/16 + */ +public record StringWalEntry(KeyInfo keyInfo, byte[] value) implements WalEntry { + + @Override + public void recover(short slot, LocalStorageReadWrite readWrite) throws Exception { + KeyReadWrite keyReadWrite = readWrite.getKeyReadWrite(); + StringReadWrite stringReadWrite = readWrite.getStringReadWrite(); + keyReadWrite.put(slot, keyInfo); + if (value != null) { + stringReadWrite.put(slot, keyInfo, value); + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/Wal.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/Wal.java new file mode 100644 index 000000000..8646a9a28 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/Wal.java @@ -0,0 +1,117 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + + +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.WalEntryCodec; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.LocalStorageReadWrite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.CompletableFuture; + +/** + * Created by caojiajun on 2025/1/14 + */ +public class Wal { + + private static final Logger logger = LoggerFactory.getLogger(Wal.class); + + private final LocalStorageReadWrite readWrite; + private final LogRecordIdGen idGen = new LogRecordIdGen(); + + private final WalReadWrite walReadWrite; + private final IWalManifest walManifest; + + private final WalWriteExecutor executor; + + public Wal(LocalStorageReadWrite readWrite) { + this.readWrite = readWrite; + this.walManifest = readWrite.getWalManifest(); + this.walReadWrite = readWrite.getWalReadWrite(); + this.executor = new WalWriteExecutor(walManifest, walReadWrite); + } + + /** + * 追加一条wal日志 + * @param slot 所在slot + * @param walEntry log条目 + * @return 写入结果 + * @throws Exception 异常 + */ + public CompletableFuture append(short slot, WalEntry walEntry) throws Exception { + //获取log_record_id + long recordId = idGen.nextId(); + //构造record + byte[] data = WalEntryCodec.encode(walEntry); + LogRecord record = new LogRecord(recordId, slot, data); + //获取wal_file_id + long fileId = walManifest.fileId(slot); + //异步批量落盘 + CompletableFuture future = new CompletableFuture<>(); + executor.submit(slot, new WalWriteTask(record, fileId, future)); + return future; + } + + /** + * 获取slot当前写到哪里了 + * @param slot slot + * @return SlotWalOffset + */ + public SlotWalOffset getSlotWalOffsetEnd(short slot) { + return walManifest.getSlotWalOffsetEnd(slot); + } + + /** + * flush slot下的SlotWalOffset,则SlotWalOffset之前关于指定slot的wal日志条目都无效了 + * @param slot slot + * @param offset SlotWalOffset + */ + public void flush(short slot, SlotWalOffset offset) { + if (offset == null) { + return; + } + walManifest.updateSlotWalOffsetStart(slot, offset); + } + + /** + * recover from wal + * @throws Exception exception + */ + public void recover() throws Exception { + logger.info("recover from wal start."); + long start = System.currentTimeMillis(); + try { + Map walOffsetMap = walManifest.getSlotWalOffsetStartMap(); + Map fileIdOffsetMap = new TreeMap<>(Comparator.comparingLong(o -> o)); + for (Map.Entry entry : walOffsetMap.entrySet()) { + long fileId = entry.getValue().fileId(); + SlotWalOffset slotWalOffset = entry.getValue(); + Long offset = fileIdOffsetMap.get(fileId); + if (offset == null || slotWalOffset.fileOffset() < offset) { + fileIdOffsetMap.put(fileId, slotWalOffset.fileOffset()); + } + } + for (Map.Entry entry : fileIdOffsetMap.entrySet()) { + Long fileId = entry.getKey(); + Long offset = entry.getValue(); + List records = walReadWrite.read(fileId, offset); + if (records == null) { + continue; + } + for (LogRecord record : records) { + short slot = record.getSlot(); + if (record.getId() <= walOffsetMap.get(slot).recordId()) { + continue; + } + byte[] data = record.getData(); + WalEntry walEntry = WalEntryCodec.decode(data); + walEntry.recover(slot, readWrite); + } + } + logger.info("recover from wal success, spendMs = {}", System.currentTimeMillis() - start); + } catch (Exception e) { + logger.error("recover from wal error, spendMs = {}", System.currentTimeMillis() - start); + throw e; + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalEntry.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalEntry.java new file mode 100644 index 000000000..35cdc8f02 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalEntry.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.LocalStorageReadWrite; + +/** + * Created by caojiajun on 2025/1/16 + */ +public interface WalEntry { + + void recover(short slot, LocalStorageReadWrite readWrite) throws Exception; +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalEntryType.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalEntryType.java new file mode 100644 index 000000000..a15701996 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalEntryType.java @@ -0,0 +1,19 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +/** + * Created by caojiajun on 2025/1/16 + */ +public enum WalEntryType { + string((byte) 1), + ; + + private final byte type; + + WalEntryType(byte type) { + this.type = type; + } + + public byte getType() { + return type; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalManifest.java new file mode 100644 index 000000000..f1978e026 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalManifest.java @@ -0,0 +1,51 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +import java.util.Map; + +/** + * + * Created by caojiajun on 2025/1/14 + */ +public class WalManifest implements IWalManifest { + + + @Override + public void load() { + + } + + @Override + public long fileId(short slot) { + return 0; + } + + @Override + public long getFileWriteNextOffset(long fileId) { + return 0; + } + + @Override + public void updateFileWriteNextOffset(long fileId, long nextOffset) { + + } + + @Override + public SlotWalOffset getSlotWalOffsetEnd(short slot) { + return null; + } + + @Override + public void updateSlotWalOffsetEnd(short slot, SlotWalOffset offset) { + + } + + @Override + public void updateSlotWalOffsetStart(short slot, SlotWalOffset offset) { + + } + + @Override + public Map getSlotWalOffsetStartMap() { + return Map.of(); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalReadWrite.java new file mode 100644 index 000000000..d80aed6f5 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalReadWrite.java @@ -0,0 +1,48 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +import com.netease.nim.camellia.codec.Pack; +import com.netease.nim.camellia.codec.Unpack; +import com.netease.nim.camellia.redis.proxy.upstream.local.storage.file.FileReadWrite; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by caojiajun on 2025/1/14 + */ +public class WalReadWrite { + + private final FileReadWrite fileReadWrite = new FileReadWrite(); + + private String file(long fileId) { + return fileId + ".wal"; + } + + public long write(long fileId, long offset, List records) throws IOException { + Pack pack = new Pack(); + pack.putInt(records.size()); + for (LogRecord record : records) { + pack.putMarshallable(record); + } + pack.getBuffer().capacity(pack.getBuffer().readableBytes()); + byte[] data = pack.getBuffer().array(); + fileReadWrite.write(file(fileId), offset, data); + return offset + data.length; + } + + public List read(long fileId, long offset) throws IOException { + int size = fileReadWrite.readInt(file(fileId), offset); + if (size <= 0) { + return null; + } + byte[] bytes = fileReadWrite.read(file(fileId), offset + 4, size); + Unpack unpack = new Unpack(bytes); + List list = new ArrayList<>(size); + for (int i=0; i> queueList = new ArrayList<>(); + + private final IWalManifest walManifest; + private final WalReadWrite walReadWrite; + + public WalWriteExecutor(IWalManifest walManifest, WalReadWrite walReadWrite) { + this.walManifest = walManifest; + this.walReadWrite = walReadWrite; + } + + public void start() { + threads = SysUtils.getCpuNum(); + for (int i=0; i queue = new MpscBlockingConsumerArrayQueue<>(10240); + queueList.add(queue); + new Thread(() -> write0(queue), "wal-flush-" + i).start(); + } + logger.info("wal write executor start, threads = {}", threads); + } + + public void submit(short slot, WalWriteTask task) throws Exception { + int index = slot % threads; + queueList.get(index).put(task); + } + + private void write0(BlockingQueue queue) { + List tasks = new ArrayList<>(); + while (true) { + try { + WalWriteTask task = queue.take(); + tasks.add(task); + queue.drainTo(tasks, 100); + try { + flush(tasks); + } finally { + tasks.clear(); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + } + + private void flush(List tasks) { + boolean success; + try { + Map> map = new HashMap<>(); + for (WalWriteTask task : tasks) { + List list = map.computeIfAbsent(task.fileId(), k -> new ArrayList<>()); + list.add(task.record()); + } + for (Map.Entry> entry : map.entrySet()) { + Long fileId = entry.getKey(); + //获取文件已经写到哪里了 + long offset = walManifest.getFileWriteNextOffset(fileId); + //批量写入 + long nextOffset = walReadWrite.write(fileId, offset, entry.getValue()); + //更新文件已经写到哪里了 + walManifest.updateFileWriteNextOffset(fileId, nextOffset); + Map slotWalOffsetMap = new HashMap<>(); + for (LogRecord logRecord : entry.getValue()) { + slotWalOffsetMap.put(logRecord.getSlot(), new SlotWalOffset(logRecord.getId(), fileId, nextOffset)); + } + for (Map.Entry walOffsetEntry : slotWalOffsetMap.entrySet()) { + Short slot = walOffsetEntry.getKey(); + SlotWalOffset slotWalOffset = walOffsetEntry.getValue(); + //更新slot已经写到哪里了 + walManifest.updateSlotWalOffsetEnd(slot, slotWalOffset); + } + } + success = true; + } catch (Exception e) { + logger.error("flush wal error", e); + success = false; + } + if (success) { + for (WalWriteTask task : tasks) { + task.future().complete(WalWriteResult.success); + } + } else { + for (WalWriteTask task : tasks) { + task.future().complete(WalWriteResult.error); + } + } + } + +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteResult.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteResult.java new file mode 100644 index 000000000..17216f6d6 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteResult.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +/** + * Created by caojiajun on 2025/1/14 + */ +public enum WalWriteResult { + success, + error, + ; + +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteTask.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteTask.java new file mode 100644 index 000000000..317c0c366 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteTask.java @@ -0,0 +1,9 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +import java.util.concurrent.CompletableFuture; + +/** + * Created by caojiajun on 2025/1/17 + */ +public record WalWriteTask(LogRecord record, long fileId, CompletableFuture future) { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteType.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteType.java new file mode 100644 index 000000000..4a32727e9 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/wal/WalWriteType.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal; + +/** + * Created by caojiajun on 2025/1/17 + */ +public enum WalWriteType { + none, + async, + sync, + ; +}