Skip to content

Commit

Permalink
Merge pull request #41 from emeraldpay/fix/block-with-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
splix authored Jul 21, 2020
2 parents 9d752fc + c80eb14 commit 74312e1
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 128 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ dependencies {
implementation 'org.apache.commons:commons-collections4:4.3'
implementation 'javax.annotation:javax.annotation-api:1.3.2'
implementation 'org.bouncycastle:bcprov-jdk15on:1.61'
implementation 'com.github.ben-manes.caffeine:caffeine:2.8.5'

implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.11.1"
Expand Down
23 changes: 10 additions & 13 deletions src/main/kotlin/io/emeraldpay/dshackle/cache/BlocksMemCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,33 @@
*/
package io.emeraldpay.dshackle.cache

import com.github.benmanes.caffeine.cache.Caffeine
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.reader.Reader
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue

open class BlocksMemCache(
val maxSize: Int = 64
maxSize: Int = 64
) : Reader<BlockId, BlockContainer> {

private val mapping = ConcurrentHashMap<BlockId, BlockContainer>()
private val queue = ConcurrentLinkedQueue<BlockId>()
private val mapping = Caffeine.newBuilder()
.maximumSize(maxSize.toLong())
.build<BlockId, BlockContainer>()

override fun read(key: BlockId): Mono<BlockContainer> {
return Mono.justOrEmpty(mapping[key])
return Mono.justOrEmpty(get(key))
}

open fun get(key: BlockId): BlockContainer? {
return mapping[key]
return mapping.getIfPresent(key)
}

open fun add(block: BlockContainer) {
mapping.put(block.hash, block)
queue.add(block.hash)

while (queue.size > maxSize) {
val old = queue.remove()
mapping.remove(old)
}
}

open fun purge() {
mapping.cleanUp()
}
}
57 changes: 34 additions & 23 deletions src/main/kotlin/io/emeraldpay/dshackle/cache/Caches.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,17 @@ open class Caches(
return
}
memTxsByHash.add(tx)
memBlocksByHash.get(tx.blockId)?.let { block ->
redisTxsByHash?.add(tx, block)
}
//TODO move subscription to the caller
getBlocksByHash().read(tx.blockId).flatMap { block ->
redisTxsByHash?.add(tx, block) ?: Mono.empty()
}.subscribe()
}

fun cache(tag: Tag, block: BlockContainer) {
val job = ArrayList<Mono<Void>>()
if (tag == Tag.LATEST) {
//for LATEST data cache in memory, it will be short living so better to avoid Redis
memBlocksByHash.add(block)
val replaced = blocksByHeight.add(block)
//evict cached transactions if an existing block was updated
replaced?.let { replacedBlockHash ->
var evicted = false
redisBlocksByHash?.evict(replacedBlockHash)
memBlocksByHash.get(replacedBlockHash)?.let { block ->
memTxsByHash.evict(block)
redisTxsByHash?.evict(block)
evicted = true
}
if (!evicted) {
memTxsByHash.evict(replacedBlockHash)
}
}
//for LATEST data cache it in memory, it may be short living so better to avoid Redis
memoizeBlock(block)
} else if (tag == Tag.REQUESTED) {
var blockOnlyContainer: BlockContainer? = null
var jsonValue: BlockJson<*>? = null
Expand All @@ -132,6 +119,7 @@ open class Caches(
} else {
blockOnlyContainer = block
}
memoizeBlock(blockOnlyContainer)
memBlocksByHash.add(blockOnlyContainer)
redisBlocksByHash?.add(blockOnlyContainer)?.let(job::add)

Expand All @@ -142,18 +130,41 @@ open class Caches(
val transactions = plainTransactions.map { tx ->
TxContainer.from(tx)
}
transactions.forEach {
cache(Tag.REQUESTED, it)
}
if (redisTxsByHash != null) {
job.add(Flux.fromIterable(transactions).flatMap { redisTxsByHash.add(it, block) }.then())
job.add(Flux.fromIterable(transactions)
.doOnNext { memTxsByHash.add(it) }
.flatMap { redisTxsByHash.add(it, block) }
.then())
}
}
}
}
Flux.fromIterable(job).flatMap { it }.subscribe() //TODO move out to a caller
}

/**
* Cache the block only in memory
*/
fun memoizeBlock(block: BlockContainer) {
memBlocksByHash.add(block)
val replaced = blocksByHeight.add(block)
//evict cached transactions if an existing block was updated
replaced?.let { evict(it) }
}

fun evict(blockId: BlockId) {
var evicted = false
redisBlocksByHash?.evict(blockId)
memBlocksByHash.get(blockId)?.let { block ->
memTxsByHash.evict(block)
redisTxsByHash?.evict(block)
evicted = true
}
if (!evicted) {
memTxsByHash.evict(blockId)
}
}

fun getBlocksByHash(): Reader<BlockId, BlockContainer> {
return blocksByHash
}
Expand Down
32 changes: 13 additions & 19 deletions src/main/kotlin/io/emeraldpay/dshackle/cache/HeightCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,35 @@
*/
package io.emeraldpay.dshackle.cache

import com.github.benmanes.caffeine.cache.Caffeine
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.reader.Reader
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentHashMap

/**
* Memory cache for blocks heights, keeps mapping height->hash.
*/
open class HeightCache(
val maxSize: Int = 256
maxSize: Int = 512
) : Reader<Long, BlockId> {

companion object {
private val log = LoggerFactory.getLogger(HeightCache::class.java)
}

private val heights = ConcurrentHashMap<Long, BlockId>()
private val heights = Caffeine.newBuilder()
.maximumSize(maxSize.toLong())
.build<Long, BlockId>()

override fun read(key: Long): Mono<BlockId> {
return Mono.justOrEmpty(heights[key])
return Mono.justOrEmpty(heights.getIfPresent(key))
}

open fun add(block: BlockContainer): BlockId? {
val existing = heights[block.height]
heights[block.height] = block.hash

// evict old numbers if full
var dropHeight = block.height - maxSize
while (heights.size > maxSize && dropHeight < block.height) {
heights.remove(dropHeight)
dropHeight++
}
val previousId = heights.getIfPresent(block.height)
heights.put(block.height, block.hash)
return previousId
}

return existing
fun purge() {
heights.cleanUp()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ abstract class OnTxRedisCache<T>(
}
}

fun add(id: TxId, value: T, block: BlockContainer?, blockHeight: Long?): Mono<Void> {
open fun add(id: TxId, value: T, block: BlockContainer?, blockHeight: Long?): Mono<Void> {
return Mono.just(id)
.flatMap {
val key = key(it)
Expand Down
29 changes: 13 additions & 16 deletions src/main/kotlin/io/emeraldpay/dshackle/cache/TxMemCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/
package io.emeraldpay.dshackle.cache

import com.github.benmanes.caffeine.cache.Caffeine
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.data.TxContainer
import io.emeraldpay.dshackle.data.TxId
import io.emeraldpay.dshackle.reader.Reader
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue

/**
* Memory cache for transactions
Expand All @@ -37,24 +36,25 @@ open class TxMemCache(
private val log = LoggerFactory.getLogger(TxMemCache::class.java)
}

private val mapping = ConcurrentHashMap<TxId, TxContainer>()
private val queue = ConcurrentLinkedQueue<TxId>()
private val mapping = Caffeine.newBuilder()
.maximumSize(maxSize.toLong())
.build<TxId, TxContainer>()

override fun read(key: TxId): Mono<TxContainer> {
return Mono.justOrEmpty(mapping[key])
return Mono.justOrEmpty(mapping.getIfPresent(key))
}

open fun evict(block: BlockContainer) {
block.transactions.forEach {
mapping.remove(it)
mapping.invalidate(it)
}
}

open fun evict(block: BlockId) {
val ids = mapping.filter { it.value.blockId == block }
ids.forEach {
mapping.remove(it.key)
}
val ids = mapping.asMap()
.filter { it.value.blockId == block }
.map { it.key }
mapping.invalidateAll(ids)
}

open fun add(tx: TxContainer) {
Expand All @@ -63,12 +63,9 @@ open class TxMemCache(
return
}
mapping.put(tx.hash, tx)
queue.add(tx.hash)

while (queue.size > maxSize) {
val old = queue.remove()
mapping.remove(old)
}
}

open fun purge() {
mapping.cleanUp()
}
}
4 changes: 2 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/cache/TxRedisCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import kotlin.math.min
/**
* Cache transactions in Redis, up to 24 hours.
*/
class TxRedisCache(
open class TxRedisCache(
private val redis: RedisReactiveCommands<String, ByteArray>,
private val chain: Chain
) : Reader<TxId, TxContainer>,
Expand Down Expand Up @@ -74,7 +74,7 @@ class TxRedisCache(
)
}

fun add(tx: TxContainer, block: BlockContainer): Mono<Void> {
open fun add(tx: TxContainer, block: BlockContainer): Mono<Void> {
return super.add(tx.hash, tx, block, tx.height)
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/data/TxContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ class TxContainer(
) : SourceContainer(json, parsed) {

companion object {
@JvmStatic
fun from(raw: ByteArray): TxContainer {
val tx = Global.objectMapper.readValue(raw, TransactionJson::class.java)
return from(tx, raw)
}

@JvmStatic
fun from(tx: TransactionJson): TxContainer {
return from(tx, Global.objectMapper.writeValueAsBytes(tx))
Expand Down
Loading

0 comments on commit 74312e1

Please sign in to comment.