Skip to content

Commit

Permalink
feat: string value (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 7, 2025
1 parent 14ace96 commit c35a610
Show file tree
Hide file tree
Showing 32 changed files with 1,025 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec;

import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressUtils;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.*;
import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils;
import com.netease.nim.camellia.tools.utils.Pair;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import static com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants.block_header_len;

/**
* Created by caojiajun on 2025/1/6
*/
public class StringValueCodec {

public static List<BlockInfo> encode(short slot, BlockType blockType, IValueManifest valueManifest, List<Pair<KeyInfo, byte[]>> values) throws IOException {
//
//data to sub-block
List<SubBlock> subBlocks = new ArrayList<>();
{
SubBlock subBlock = new SubBlock();
//new sub-block
ByteBuffer decompressed = ByteBuffer.allocate(blockType.getBlockSize() - block_header_len);
int size = 0;
for (Pair<KeyInfo, byte[]> entry : values) {
//
byte[] data = entry.getSecond();
int length = data.length;
if (decompressed.remaining() < length + blockType.getSizeLen()) {
//sub-block compressed
CompressType compressType = CompressType.zstd;
byte[] compressed = CompressUtils.get(compressType).compress(decompressed.array(), 0, size);
if (compressed.length >= size) {
compressType = CompressType.none;
compressed = new byte[size];
System.arraycopy(decompressed.array(), 0, compressed, 0, compressed.length);
}
subBlock.compressType = compressType;//1
subBlock.decompressLen = size;//4
subBlock.compressLen = compressed.length;//4
subBlock.compressed = compressed;//n
subBlocks.add(subBlock);
//
//new sub-block
decompressed = ByteBuffer.allocate(blockType.getBlockSize() - block_header_len);
size = 0;
subBlock = new SubBlock();
}
subBlock.keyInfos.add(entry.getFirst());
//add data to sub-block
if (blockType == BlockType._4k || blockType == BlockType._32k) {
decompressed.putShort((short) length);
} else {
decompressed.putInt(length);
}
decompressed.put(data);
size += blockType.getSizeLen();
size += length;
}
//sub-block compressed
CompressType compressType = CompressType.zstd;
byte[] compressed = CompressUtils.get(compressType).compress(decompressed.array(), 0, size);
if (compressed.length >= size) {
compressType = CompressType.none;
compressed = new byte[size];
System.arraycopy(decompressed.array(), 0, compressed, 0, compressed.length);
}
subBlock.compressType = compressType;//1
subBlock.decompressLen = size;//4
subBlock.compressLen = compressed.length;//4
subBlock.compressed = compressed;//n
subBlocks.add(subBlock);
}

//sub-block to block
List<BlockInfo> blockInfos = new ArrayList<>();

{
//new block
BlockLocation location = valueManifest.allocate(slot, blockType);
ByteBuffer buffer = ByteBuffer.allocate(blockType.getBlockSize());
int offset = 0;
short subBlockCount = 0;
buffer.putInt(0);//4
buffer.putShort(subBlockCount);//2
for (SubBlock block : subBlocks) {
//
if (buffer.remaining() < block.size()) {
//sub block merge
int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 6, buffer.array().length);
buffer.putInt(0, crc);//4
buffer.putShort(4, subBlockCount);//2
blockInfos.add(new BlockInfo(blockType, location, buffer.array()));
//
//new block
location = valueManifest.allocate(slot, blockType);
buffer = ByteBuffer.allocate(blockType.getBlockSize());
subBlockCount = 0;
buffer.putInt(0);//4
buffer.putShort(subBlockCount);//2
}
//add sub-block to block
buffer.put(block.compressType.getType());
buffer.putInt(block.decompressLen);
buffer.putInt(block.compressLen);
buffer.put(block.compressed);
for (KeyInfo keyInfo : block.keyInfos) {
keyInfo.setValueLocation(new ValueLocation(location, offset));
offset++;
}
subBlockCount++;
}
//sub block merge
int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 6, buffer.array().length);
buffer.putInt(0, crc);
buffer.putShort(4, subBlockCount);
blockInfos.add(new BlockInfo(blockType, location, buffer.array()));
}
return blockInfos;
}

public static List<byte[]> decode(byte[] data, BlockType blockType) {
ByteBuffer buffer = ByteBuffer.wrap(data);
int crc1 = buffer.getInt();
int crc2 = RedisClusterCRC16Utils.getCRC16(data, 6, data.length);
if (crc1 != crc2) {
return new ArrayList<>();
}

List<byte[]> values = new ArrayList<>();

short subBlockCount = buffer.getShort();
for (int i=0; i<subBlockCount; i++) {
CompressType compressType = CompressType.getByValue(buffer.get());
int decompressLen = buffer.getInt();
int compressLen = buffer.getInt();
byte[] compressed = new byte[compressLen];
buffer.get(compressed);
byte[] decompressed = CompressUtils.get(compressType).decompress(compressed, 0, compressLen, decompressLen);
ByteBuffer subBlockBuffer = ByteBuffer.wrap(decompressed);
while (subBlockBuffer.hasRemaining()) {
int size;
if (blockType == BlockType._4k || blockType == BlockType._32k) {
size = subBlockBuffer.getShort();
} else {
size = subBlockBuffer.getInt();
}
byte[] value = new byte[size];
subBlockBuffer.get(value);
values.add(value);
}
}
return values;
}

private static class SubBlock {
List<KeyInfo> keyInfos = new ArrayList<>();
CompressType compressType;
int decompressLen = 0;
int compressLen = 0;
byte[] compressed;

int size() {
return 1 + 4 + 4 + compressed.length;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.wal.WalGroup;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* Created by caojiajun on 2025/1/3
*/
Expand Down Expand Up @@ -37,4 +42,17 @@ public abstract class CommandOnEmbeddedStorage {
* @return reply
*/
protected abstract Reply execute(short slot, Command command) throws Exception;

/**
* check and flush
* @param slot slot
* @throws IOException exception
*/
protected void checkAndFlush(short slot) throws IOException {
if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) {
keyReadWrite.flushPrepare(slot);
CompletableFuture<FlushResult> future = stringReadWrite.flush(slot);
future.thenAccept(flushResult -> keyReadWrite.flush(slot));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation;
import com.netease.nim.camellia.tools.utils.BytesKey;

import java.io.IOException;

/**
* GET key
* <p>
Expand All @@ -34,10 +35,10 @@ protected Reply execute(short slot, Command command) throws Exception {
byte[][] objects = command.getObjects();
BytesKey key = new BytesKey(objects[1]);
KeyInfo keyInfo = keyReadWrite.get(slot, key);
return execute0(keyInfo);
return execute0(slot, keyInfo);
}

private Reply execute0(KeyInfo keyInfo) {
private Reply execute0(short slot, KeyInfo keyInfo) throws IOException {
if (keyInfo == null) {
return BulkReply.NIL_REPLY;
}
Expand All @@ -47,8 +48,7 @@ private Reply execute0(KeyInfo keyInfo) {
if (keyInfo.containsExtra()) {
return new BulkReply(keyInfo.getExtra());
}
ValueLocation valueLocation = keyInfo.getValueLocation();
byte[] bytes = stringReadWrite.get(valueLocation);
byte[] bytes = stringReadWrite.get(slot, keyInfo);
if (bytes == null) {
return BulkReply.NIL_REPLY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.netease.nim.camellia.redis.proxy.reply.StatusReply;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.string.SetCommander;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import com.netease.nim.camellia.redis.proxy.util.Utils;
Expand Down Expand Up @@ -117,10 +117,12 @@ protected Reply execute(short slot, Command command) throws Exception {
keyInfo.setExtra(value);
} else {
keyInfo.setExtra(null);
ValueLocation location = stringReadWrite.put(slot, keyInfo, value);
keyInfo.setValueLocation(location);
stringReadWrite.put(slot, keyInfo, value);
}
keyReadWrite.put(slot, keyInfo);

checkAndFlush(slot);

if (get) {
return new BulkReply(oldValue);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ZstdCompressor implements ICompressor {
private final int compressionLevel;

public ZstdCompressor() {
compressionLevel = Zstd.maxCompressionLevel();
compressionLevel = Zstd.defaultCompressionLevel();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
public class EmbeddedStorageConstants {

public static final int _4k = 4*1024;
public static final int _32k = 32*1024;
public static final int _64k = 64*1024;
public static final int _256k = 256*1024;
public static final int _1024k = 1024*1024;
public static final int bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib
public static final long block_size = 128*1024*1024*1024L;//128Gib
public static final int block_header_len = 4+2+1+4+4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums;

/**
* Created by caojiajun on 2025/1/6
*/
public enum FlushStatus {
PREPARE,
FLUSHING,
FLUSH_OK,
;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.file;

import java.nio.ByteBuffer;

/**
* Created by caojiajun on 2025/1/6
*/
public class ByteBufferUtil {

public static void writeInt(int value, ByteBuffer buffer) {
while ((value & 0xFFFFFF80) != 0L) {
buffer.put((byte) ((value & 0x7F) | 0x80));
value >>>= 7;
}
buffer.put((byte) (value & 0x7F));
}

public static int readInt(ByteBuffer buffer, int idx) {
int value = 0;
int i = 0;
while ((buffer.get(idx) & 0x80) != 0) {
value |= (buffer.get(idx) & 0x7F) << i;
i += 7;
idx++;
if (i > 21) {
throw new IllegalArgumentException("Variable length quantity is too long");
}
}
return value | (buffer.get(idx) << i);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.flush;


import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Created by caojiajun on 2025/1/7
*/
public class FlushExecutor {

private final ThreadPoolExecutor executor;

public FlushExecutor(int poolSize, int queueSize) {
this.executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize), new FlushThreadFactory("flush", false));
}

public boolean isInFlushThread() {
Thread thread = Thread.currentThread();
return thread instanceof FlushThread;
}

public void submit(Runnable task) {
if (isInFlushThread()) {
task.run();
} else {
executor.submit(task);
}
}
}
Loading

0 comments on commit c35a610

Please sign in to comment.