Skip to content

Commit

Permalink
feat: optimize KeyBlockReadWrite (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 10, 2025
1 parent 3c37f2f commit e6e8afe
Showing 1 changed file with 42 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,66 +63,12 @@ private void warm() {

@Override
public KeyInfo get(short slot, Key key) throws IOException {
SlotInfo slotInfo = keyManifest.get(slot);
if (slotInfo == null) {
return null;
}
long fileId = slotInfo.fileId();
long offset = slotInfo.offset();
int capacity = slotInfo.capacity();
int bucketSize = capacity / _4k;
int bucket = KeyHashUtils.hash(key.key()) % bucketSize;
long bucketOffset = offset + bucket * _4k;

String cacheKey = fileId + "|" + bucketOffset;
byte[] block = readCache.get(cacheKey);
if (block == null) {
block = writeCache.get(cacheKey);
if (block != null) {
readCache.put(cacheKey, block);
writeCache.delete(cacheKey);
}
}
if (block == null) {
block = fileReadWrite.read(file(fileId), bucketOffset, _4k);
readCache.put(cacheKey, block);
}
Map<Key, KeyInfo> map = KeyCodec.decodeBucket(block);
KeyInfo data = map.get(key);
if (data == null) {
return KeyInfo.DELETE;
}
return data;
return get0(slot, key, CacheType.read);
}

@Override
public KeyInfo getForCompact(short slot, Key key) throws IOException {
SlotInfo slotInfo = keyManifest.get(slot);
if (slotInfo == null) {
return null;
}
long fileId = slotInfo.fileId();
long offset = slotInfo.offset();
int capacity = slotInfo.capacity();
int bucketSize = capacity / _4k;
int bucket = KeyHashUtils.hash(key.key()) % bucketSize;
long bucketOffset = offset + bucket * _4k;

String cacheKey = fileId + "|" + bucketOffset;
byte[] block = readCache.get(cacheKey);
if (block == null) {
block = writeCache.get(cacheKey);
}
if (block == null) {
block = fileReadWrite.read(file(fileId), bucketOffset, _4k);
writeCache.put(cacheKey, block);
}
Map<Key, KeyInfo> map = KeyCodec.decodeBucket(block);
KeyInfo data = map.get(key);
if (data == null) {
return KeyInfo.DELETE;
}
return data;
return get0(slot, key, CacheType.write);
}

@Override
Expand Down Expand Up @@ -168,4 +114,44 @@ public void writeBlocks(long fileId, long offset, byte[] data) throws IOExceptio
public byte[] readBlocks(long fileId, long offset, int size) throws IOException {
return fileReadWrite.read(file(fileId), offset, size);
}

private static enum CacheType {
read,
write,
none,
;
}

private KeyInfo get0(short slot, Key key, CacheType cacheType) throws IOException {
SlotInfo slotInfo = keyManifest.get(slot);
if (slotInfo == null) {
return null;
}
long fileId = slotInfo.fileId();
long offset = slotInfo.offset();
int capacity = slotInfo.capacity();
int bucketSize = capacity / _4k;
int bucket = KeyHashUtils.hash(key.key()) % bucketSize;
long bucketOffset = offset + bucket * _4k;

String cacheKey = fileId + "|" + bucketOffset;
byte[] block = readCache.get(cacheKey);
if (block == null) {
block = writeCache.get(cacheKey);
}
if (block == null) {
block = fileReadWrite.read(file(fileId), bucketOffset, _4k);
if (cacheType == CacheType.write) {
writeCache.put(cacheKey, block);
} else if (cacheType == CacheType.read) {
readCache.put(cacheKey, block);
}
}
Map<Key, KeyInfo> map = KeyCodec.decodeBucket(block);
KeyInfo data = map.get(key);
if (data == null) {
return KeyInfo.DELETE;
}
return data;
}
}

0 comments on commit e6e8afe

Please sign in to comment.