Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 9, 2025
1 parent 67b8fe2 commit b9f15aa
Show file tree
Hide file tree
Showing 5 changed files with 691 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package stroom.planb.impl.experiment;

import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.lmdb.LmdbConfig;
import stroom.lmdb2.LmdbEnvDir;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;

import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;
import org.lmdbjava.EnvFlags;
import org.lmdbjava.Txn;

import java.nio.ByteBuffer;
import java.nio.file.Path;

import static java.nio.charset.StandardCharsets.UTF_8;

abstract class AbstractLmdbWriter2<K, V> implements AutoCloseable {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(AbstractLmdbWriter2.class);

private static final byte[] KEY = "key".getBytes(UTF_8);
private static final byte[] VALUE = "value".getBytes(UTF_8);
private static final byte[] STATE = "state".getBytes(UTF_8);

private final Serde2<K, V> serde;
final ByteBufferFactory byteBufferFactory;
final Env<ByteBuffer> env;
final Dbi<ByteBuffer> keyDb;
final Dbi<ByteBuffer> valueDb;
final Dbi<ByteBuffer> stateDb;
private Txn<ByteBuffer> writeTxn;
final boolean keepFirst;
int commitCount = 0;

public AbstractLmdbWriter2(final Path path,
final ByteBufferFactory byteBufferFactory,
final Serde2<K, V> serde,
final boolean keepFirst) {
final LmdbEnvDir lmdbEnvDir = new LmdbEnvDir(path, true);
this.byteBufferFactory = byteBufferFactory;
this.serde = serde;
this.keepFirst = keepFirst;

LOGGER.info(() -> "Creating: " + path);

final Env.Builder<ByteBuffer> builder = Env.create()
.setMapSize(LmdbConfig.DEFAULT_MAX_STORE_SIZE.getBytes())
.setMaxDbs(3)
.setMaxReaders(1);

env = builder.open(lmdbEnvDir.getEnvDir().toFile(), EnvFlags.MDB_NOTLS);
keyDb = env.openDbi(KEY, DbiFlags.MDB_CREATE);
valueDb = env.openDbi(VALUE, DbiFlags.MDB_CREATE);
stateDb = env.openDbi(STATE, DbiFlags.MDB_CREATE);
}

// public void merge(final Path source) {
// final Env.Builder<ByteBuffer> builder = Env.create()
// .setMaxDbs(1)
// .setMaxReaders(1);
// try (final Env<ByteBuffer> sourceEnv = builder.open(source.toFile(), EnvFlags.MDB_NOTLS)) {
// final Dbi<ByteBuffer> sourceDbi = sourceEnv.openDbi(STATE);
// try (final Txn<ByteBuffer> readTxn = sourceEnv.txnRead()) {
// try (final CursorIterable<ByteBuffer> cursorIterable = sourceDbi.iterate(readTxn)) {
// final Iterator<KeyVal<ByteBuffer>> iterator = cursorIterable.iterator();
// while (iterator.hasNext()) {
// final KeyVal<ByteBuffer> keyVal = iterator.next();
// insert(keyVal.key(), keyVal.val());
// }
// }
// }
// }
// }
//
// public boolean insert(final KV<K, V> kv) {
// return insert(kv.key(), kv.value());
// }
//
// public boolean insert(final K key, final V value) {
// return serde.createKeyByteBuffer(key, keyByteBuffer ->
// serde.createValueByteBuffer(key, value, valueByteBuffer ->
// insert(keyByteBuffer, valueByteBuffer)));
// }
//
// public boolean insert(final ByteBuffer keyByteBuffer,
// final ByteBuffer valueByteBuffer) {
// final Txn<ByteBuffer> writeTxn = getOrCreateWriteTxn();
//
// // If we do not prefix values then we can simply put rows.
//
//
//
// // If the value has no key prefix, i.e. we are not using key hashes then just try to put.
// if (keepFirst) {
// // If we are keeping the first then don't allow overwrite.
// stateDb.put(writeTxn, keyByteBuffer, valueByteBuffer, PutFlags.MDB_NOOVERWRITE);
// } else {
// // Put and overwrite any existing key/value.
// stateDb.put(writeTxn, keyByteBuffer, valueByteBuffer);
// }
//// } else {
//// // Try to put without overwriting existing values.
//// if (!stateDb.put(writeTxn, keyByteBuffer, valueByteBuffer, PutFlags.MDB_NOOVERWRITE)) {
//// serde.createPrefixPredicate(keyByteBuffer, valueByteBuffer, predicate -> {
//// if (keepFirst) {
//// if (!exists(writeTxn, keyByteBuffer, predicate)) {
//// stateDb.put(writeTxn, keyByteBuffer, valueByteBuffer);
//// }
//// } else {
//// // Delete current value if there is one.
//// delete(writeTxn, keyByteBuffer, predicate);
//// // Put new value allowing for duplicate keys as we are only using a hash key.
//// stateDb.put(writeTxn, keyByteBuffer, valueByteBuffer);
//// }
//// return true;
//// });
//// }
//// }
//
// commitCount++;
// if (commitCount > 10000) {
// commit();
// commitCount = 0;
// }
//
// return true;
// }
//
//// private boolean delete(final Txn<ByteBuffer> txn,
//// final ByteBuffer keyByteBuffer,
//// final Predicate<KeyVal<ByteBuffer>> predicate) {
//// final KeyRange<ByteBuffer> keyRange = KeyRange.closed(keyByteBuffer, keyByteBuffer);
//// try (final CursorIterable<ByteBuffer> cursor = stateDb.iterate(txn, keyRange)) {
//// final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
//// while (iterator.hasNext()) {
//// final KeyVal<ByteBuffer> keyVal = iterator.next();
//// if (predicate.test(keyVal)) {
//// iterator.remove();
//// return true;
//// }
//// }
//// }
//// return false;
//// }
////
//// private boolean exists(final Txn<ByteBuffer> txn,
//// final ByteBuffer keyByteBuffer,
//// final Predicate<KeyVal<ByteBuffer>> predicate) {
//// final KeyRange<ByteBuffer> keyRange = KeyRange.closed(keyByteBuffer, keyByteBuffer);
//// try (final CursorIterable<ByteBuffer> cursor = stateDb.iterate(txn, keyRange)) {
//// final Iterator<KeyVal<ByteBuffer>> iterator = cursor.iterator();
//// while (iterator.hasNext()) {
//// final KeyVal<ByteBuffer> keyVal = iterator.next();
//// if (predicate.test(keyVal)) {
//// return true;
//// }
//// }
//// }
//// return false;
//// }


Txn<ByteBuffer> getOrCreateWriteTxn() {
if (writeTxn == null) {
writeTxn = env.txnWrite();
}
return writeTxn;
}

void commit() {
if (writeTxn != null) {
try {
writeTxn.commit();
} finally {
try {
writeTxn.close();
} finally {
writeTxn = null;
}
}
}
}

@Override
public void close() {
commit();
env.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package stroom.planb.impl.experiment;

import stroom.query.language.functions.FieldIndex;
import stroom.query.language.functions.Val;

import org.lmdbjava.CursorIterable.KeyVal;

import java.nio.ByteBuffer;
import java.util.function.Function;
import java.util.function.Predicate;

public interface Serde2<K, V> {

<R> R createKeyByteBuffer(K key, Function<ByteBuffer, R> function);

<R> R createValueByteBuffer(K key, V value, Function<ByteBuffer, R> function);

<R> R createPrefixPredicate(K key, Function<Predicate<KeyVal<ByteBuffer>>, R> function);

<R> R createPrefixPredicate(ByteBuffer keyByteBuffer,
ByteBuffer valueByteBuffer,
Function<Predicate<KeyVal<ByteBuffer>>, R> function);

Function<KeyVal<ByteBuffer>, Val>[] getValExtractors(FieldIndex fieldIndex);

K getKey(KeyVal<ByteBuffer> keyVal);

V getVal(KeyVal<ByteBuffer> keyVal);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package stroom.planb.impl.experiment;

import stroom.bytebuffer.ByteBufferUtils;
import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.planb.impl.io.State.Key;
import stroom.planb.impl.io.StateFields;
import stroom.planb.impl.io.StateValue;
import stroom.planb.impl.io.ValUtil;
import stroom.query.language.functions.FieldIndex;
import stroom.query.language.functions.Val;
import stroom.query.language.functions.ValNull;

import net.openhft.hashing.LongHashFunction;
import org.lmdbjava.CursorIterable.KeyVal;

import java.nio.ByteBuffer;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* KEY = <KEY_HASH><KEY_ID>
* VALUE = <VALUE_TYPE><VALUE_BYTES>
*/
public class StateSerde2 implements Serde2<Key, StateValue> {

private final ByteBufferFactory byteBufferFactory;

public StateSerde2(final ByteBufferFactory byteBufferFactory) {
this.byteBufferFactory = byteBufferFactory;
}

@Override
public <T> T createKeyByteBuffer(final Key key, final Function<ByteBuffer, T> function) {
final ByteBuffer keyByteBuffer = byteBufferFactory.acquire(Long.BYTES);
try {
// Hash the key.
final long keyHash = LongHashFunction.xx3().hashBytes(key.bytes());
keyByteBuffer.putLong(keyHash);
keyByteBuffer.flip();
return function.apply(keyByteBuffer);
} finally {
byteBufferFactory.release(keyByteBuffer);
}
}

@Override
public <R> R createValueByteBuffer(final Key key,
final StateValue value,
final Function<ByteBuffer, R> function) {
final ByteBuffer valueByteBuffer = byteBufferFactory.acquire(Integer.BYTES +
key.bytes().length +
Byte.BYTES +
value.byteBuffer().limit());
try {
putPrefix(valueByteBuffer, key.bytes());
valueByteBuffer.put(value.typeId());
valueByteBuffer.put(value.byteBuffer());
valueByteBuffer.flip();
return function.apply(valueByteBuffer);
} finally {
byteBufferFactory.release(valueByteBuffer);
}
}

@Override
public <R> R createPrefixPredicate(final Key key,
final Function<Predicate<KeyVal<ByteBuffer>>, R> function) {
final ByteBuffer prefixByteBuffer = byteBufferFactory.acquire(Integer.BYTES + key.bytes().length);
try {
putPrefix(prefixByteBuffer, key.bytes());
prefixByteBuffer.flip();

return function.apply(keyVal -> ByteBufferUtils.containsPrefix(keyVal.val(), prefixByteBuffer));
} finally {
byteBufferFactory.release(prefixByteBuffer);
}
}

@Override
public <R> R createPrefixPredicate(final ByteBuffer keyByteBuffer,
final ByteBuffer valueByteBuffer,
final Function<Predicate<KeyVal<ByteBuffer>>, R> function) {
final int keyLength = valueByteBuffer.getInt(0);
final ByteBuffer slice = valueByteBuffer.slice(0, Integer.BYTES + keyLength);
return function.apply(keyVal -> ByteBufferUtils.containsPrefix(keyVal.val(), slice));
}

private void putPrefix(final ByteBuffer byteBuffer, final byte[] keyBytes) {
byteBuffer.putInt(keyBytes.length);
byteBuffer.put(keyBytes);
}

@Override
@SuppressWarnings("unchecked")
public Function<KeyVal<ByteBuffer>, Val>[] getValExtractors(final FieldIndex fieldIndex) {
final Function<KeyVal<ByteBuffer>, Val>[] functions = new Function[fieldIndex.size()];
for (int i = 0; i < fieldIndex.getFields().length; i++) {
final String field = fieldIndex.getField(i);
functions[i] = switch (field) {
case StateFields.KEY -> kv -> {
final int keyLength = kv.val().getInt(0);
return ValUtil.getValue((byte) 0, kv.val().slice(Integer.BYTES, keyLength));
};
case StateFields.VALUE_TYPE -> kv -> {
final int keyLength = kv.val().getInt(0);
final byte typeId = kv.val().get(Integer.BYTES + keyLength);
return ValUtil.getType(typeId);
};
case StateFields.VALUE -> kv -> {
final int keyLength = kv.val().getInt(0);
final byte typeId = kv.val().get(Integer.BYTES + keyLength);
final int valueStart = Integer.BYTES + keyLength + Byte.BYTES;
return ValUtil.getValue(typeId, kv.val().slice(valueStart, kv.val().limit() - valueStart));
};
default -> byteBuffer -> ValNull.INSTANCE;
};
}
return functions;
}

@Override
public Key getKey(final KeyVal<ByteBuffer> keyVal) {
final ByteBuffer byteBuffer = keyVal.val();
final int keyLength = byteBuffer.getInt(0);
final ByteBuffer slice = byteBuffer.slice(Integer.BYTES, keyLength);
final byte[] keyBytes = ByteBufferUtils.toBytes(slice);
return new Key(keyBytes);
}

@Override
public StateValue getVal(final KeyVal<ByteBuffer> keyVal) {
final ByteBuffer byteBuffer = keyVal.val();
final int keyLength = byteBuffer.getInt(0);
final byte typeId = byteBuffer.get(Integer.BYTES + keyLength);
final int valueStart = Integer.BYTES + keyLength + Byte.BYTES;
final ByteBuffer slice = byteBuffer.slice(valueStart, byteBuffer.limit() - valueStart);
final byte[] valueBytes = ByteBufferUtils.toBytes(slice);
return new StateValue(typeId, ByteBuffer.wrap(valueBytes));
}
}
Loading

0 comments on commit b9f15aa

Please sign in to comment.