Skip to content

Commit

Permalink
feat(storage): wal (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 17, 2025
1 parent b39f116 commit 6dec13b
Show file tree
Hide file tree
Showing 32 changed files with 706 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.*;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.db.MemFlushCommand;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal;
import com.netease.nim.camellia.redis.proxy.upstream.utils.CompletableFutureUtils;
import com.netease.nim.camellia.redis.proxy.util.*;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,9 +85,13 @@ public void start() {
LocalStorageReadWrite readWrite = new LocalStorageReadWrite(dir);
CompactExecutor compactExecutor = new CompactExecutor(readWrite);

Wal wal = new Wal(readWrite);
wal.recover();

CommandConfig commandConfig = new CommandConfig();
commandConfig.setCompactExecutor(compactExecutor);
commandConfig.setReadWrite(readWrite);
commandConfig.setWal(wal);

commands = new Commands(commandConfig);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec;

import com.netease.nim.camellia.codec.Pack;
import com.netease.nim.camellia.codec.Unpack;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.StringWalEntry;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalEntry;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalEntryType;

/**
* Created by caojiajun on 2025/1/17
*/
public class WalEntryCodec {

public static byte[] encode(WalEntry walEntry) {
if (walEntry instanceof StringWalEntry) {
Pack pack = new Pack();
pack.putByte(WalEntryType.string.getType());
pack.putMarshallable(((StringWalEntry) walEntry).keyInfo());
byte[] value = ((StringWalEntry) walEntry).value();
if (value != null) {
pack.putVarbin(value);
}
pack.getBuffer().capacity(pack.getBuffer().readableBytes());
return pack.getBuffer().array();
} else {
throw new IllegalArgumentException("not support WalEntry");
}
}

public static WalEntry decode(byte[] data) {
Unpack unpack = new Unpack(data);
byte type = unpack.popByte();
if (type == WalEntryType.string.getType()) {
KeyInfo keyInfo = new KeyInfo();
unpack.popMarshallable(keyInfo);
byte[] value = null;
if (unpack.getBuffer().readableBytes() > 0) {
value = unpack.popVarbin();
}
return new StringWalEntry(keyInfo, value);
} else {
throw new IllegalArgumentException("not support WalEntry");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.command;

import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal;

/**
* Created by caojiajun on 2025/1/10
Expand All @@ -9,6 +10,7 @@ public class CommandConfig {

private CompactExecutor compactExecutor;
private LocalStorageReadWrite readWrite;
private Wal wal;

public CompactExecutor getCompactExecutor() {
return compactExecutor;
Expand All @@ -25,4 +27,12 @@ public LocalStorageReadWrite getReadWrite() {
public void setReadWrite(LocalStorageReadWrite readWrite) {
this.readWrite = readWrite;
}

public Wal getWal() {
return wal;
}

public void setWal(Wal wal) {
this.wal = wal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.db.*;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.string.*;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.SlotWalOffset;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;

import java.io.IOException;
Expand All @@ -33,10 +35,13 @@ public class Commands {
protected KeyReadWrite keyReadWrite;
protected StringReadWrite stringReadWrite;

protected Wal wal;

public Commands(CommandConfig commandConfig) {
compactExecutor = commandConfig.getCompactExecutor();
keyReadWrite = commandConfig.getReadWrite().getKeyReadWrite();
stringReadWrite = commandConfig.getReadWrite().getStringReadWrite();
wal = commandConfig.getWal();

//db
initCommand(new MemFlushCommand(commandConfig));
Expand Down Expand Up @@ -93,12 +98,18 @@ private void afterWrite(short slot) throws IOException {
compactExecutor.compact(slot);
//flush
if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) {
//获取slot当前wal写到哪里了
SlotWalOffset slotWalOffset = wal.getSlotWalOffsetEnd(slot);
//key flush prepare
Map<Key, KeyInfo> keyMap = keyReadWrite.flushPrepare(slot);
//flush string value
CompletableFuture<FlushResult> future = stringReadWrite.flush(slot, keyMap);
//flush key
future.thenAccept(result -> keyReadWrite.flush(slot));
CompletableFuture<FlushResult> future1 = stringReadWrite.flush(slot, keyMap);
future1.thenAccept(flushResult1 -> {
//flush key
CompletableFuture<FlushResult> future2 = keyReadWrite.flush(slot);
//flush wal,表示这个offset之前的关于指定slot的日志条目都无效了
future2.thenAccept(flushResult2 -> wal.flush(slot, slotWalOffset));
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.Wal;

/**
* Created by caojiajun on 2025/1/3
Expand All @@ -16,11 +17,13 @@ public abstract class ICommand {

protected KeyReadWrite keyReadWrite;
protected StringReadWrite stringReadWrite;
protected Wal wal;

public ICommand(CommandConfig commandConfig) {
compactExecutor = commandConfig.getCompactExecutor();
keyReadWrite = commandConfig.getReadWrite().getKeyReadWrite();
stringReadWrite = commandConfig.getReadWrite().getStringReadWrite();
wal = commandConfig.getWal();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.persist.ValueFlushExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.block.StringBlockReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.IWalManifest;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalManifest;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalReadWrite;

import java.io.IOException;

Expand All @@ -19,6 +22,9 @@
*/
public class LocalStorageReadWrite {

//
private WalReadWrite walReadWrite;
private IWalManifest walManifest;
//
private final IKeyManifest keyManifest;
private final IValueManifest valueManifest;
Expand Down Expand Up @@ -50,6 +56,14 @@ public LocalStorageReadWrite(String dir) throws IOException {
stringReadWrite = new StringReadWrite(valueFlushExecutor, stringBlockReadWrite);
}

public WalReadWrite getWalReadWrite() {
return walReadWrite;
}

public IWalManifest getWalManifest() {
return walManifest;
}

public IKeyManifest getKeyManifest() {
return keyManifest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.string.SetCommander;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.StringWalEntry;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalWriteResult;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import com.netease.nim.camellia.redis.proxy.util.Utils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k;

/**
Expand Down Expand Up @@ -120,14 +125,29 @@ protected Reply execute(short slot, Command command) throws Exception {
keyInfo = new KeyInfo(DataType.string, key.key());
}

byte[] bigValue = null;
if (key.key().length + value.length <= 128) {
keyInfo.setExtra(value);
} else {
keyInfo.setExtra(null);
bigValue = value;
}

//写入wal
StringWalEntry walEntry = new StringWalEntry(keyInfo, bigValue);
CompletableFuture<WalWriteResult> future = wal.append(slot, walEntry);
WalWriteResult result = future.get(30, TimeUnit.SECONDS);
if (result != WalWriteResult.success) {
return ErrorReply.INTERNAL_ERROR;
}

//写入memtable
if (bigValue != null) {
stringReadWrite.put(slot, keyInfo, value);
}
keyReadWrite.put(slot, keyInfo);

//响应
if (get) {
return new BulkReply(oldValue);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValue;
import com.netease.nim.camellia.redis.proxy.util.TimeCache;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,39 @@ public class FileNames {

private static final Logger logger = LoggerFactory.getLogger(FileNames.class);

public static String createKeyManifestFileIfNotExists(String dir) throws IOException {
public static void createKeyManifestFileIfNotExists(String dir) throws IOException {
String fileName = keyManifestFile(dir);
File file = new File(fileName);
if (!file.exists()) {
boolean result = file.createNewFile();
logger.info("create key manifest file, file = {}, result = {}", fileName, result);
}
return fileName;
}

public static String keyManifestFile(String dir) {
return dir + "/" + "key.manifest";
}

public static String createKeyFile(String dir, long fileId) throws IOException {
public static void createKeyFile(String dir, long fileId) throws IOException {
String fileName = keyFile(dir, fileId);
File file = new File(fileName);
if (!file.exists()) {
boolean result = file.createNewFile();
logger.info("create key file, file = {}, result = {}", file, result);
}
return fileName;
}

public static String keyFile(String dir, long fileId) {
return dir + "/" + fileId + ".key";
}

public static String createStringDataFileIfNotExists(String dir, BlockType blockType, long fileId) throws IOException {
public static void createStringDataFileIfNotExists(String dir, BlockType blockType, long fileId) throws IOException {
String fileName = stringBlockFile(dir, blockType, fileId);
File file = new File(fileName);
if (!file.exists()) {
boolean result = file.createNewFile();
logger.info("create string block file, file = {}, result = {}", file, result);
logger.info("create string data file, file = {}, result = {}", file, result);
}
return fileName;
}

public static String stringBlockFile(String dir, BlockType blockType, long fileId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ public byte[] read(String file, long offset, int size) throws IOException {
fileChannel.read(buffer, offset);
return buffer.array();
}

public int readInt(String file, long offset) throws IOException {
FileChannel fileChannel = getFileChannel(file);
if (offset >= fileChannel.size()) {
return -1;
}
ByteBuffer buffer = ByteBuffer.allocate(4);
fileChannel.read(buffer, offset);
return buffer.getInt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush;

/**
* Created by caojiajun on 2025/1/3
*/
public enum FlushResult {
OK,
ERROR,
SKIP,
;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush;

/**
* Created by caojiajun on 2025/1/6
*/
public enum FlushStatus {
PREPARE,
FLUSHING,
FLUSH_OK,
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.ValueWrapper;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.EstimateSizeValueCalculator;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.LRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.persist.KeyFlushExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.block.KeyBlockReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.SlotKeyReadWrite;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.persist;

import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.KeyCodec;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.IKeyManifest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.ValueWrapper;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.CacheType;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushStatus;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushStatus;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.block.KeyBlockReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.persist.KeyFlushExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueCodec;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueEncodeResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.ValueWrapper;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValue;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushStatus;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush.FlushStatus;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.persist.StringValueFlushTask;
Expand Down
Loading

0 comments on commit 6dec13b

Please sign in to comment.