Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluffy: Implement offer cache #2827

Closed
wants to merge 13 commits into from
20 changes: 18 additions & 2 deletions fluffy/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,30 @@ type
"Size of the in memory local content cache. This is the max number " &
"of content values that can be stored in the cache.",
defaultValue: defaultPortalProtocolConfig.contentCacheSize,
name: "content-cache-size"
name: "debug-content-cache-size"
.}: int

disableContentCache* {.
hidden,
desc: "Disable the in memory local content cache",
defaultValue: defaultPortalProtocolConfig.disableContentCache,
name: "disable-content-cache"
name: "debug-disable-content-cache"
.}: bool

offerCacheSize* {.
hidden,
desc:
"Size of the in memory local offer cache. This is the max number " &
"of content values that can be stored in the cache.",
defaultValue: defaultPortalProtocolConfig.offerCacheSize,
name: "debug-offer-cache-size"
.}: int

disableOfferCache* {.
hidden,
desc: "Disable the in memory local offer cache",
defaultValue: defaultPortalProtocolConfig.disableOfferCache,
name: "debug-disable-offer-cache"
.}: bool

disablePoke* {.
Expand Down
2 changes: 1 addition & 1 deletion fluffy/fluffy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ proc run(
portalProtocolConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig,
config.disablePoke, config.maxGossipNodes, config.contentCacheSize,
config.disableContentCache,
config.disableContentCache, config.offerCacheSize, config.disableOfferCache,
)

portalNodeConfig = PortalNodeConfig(
Expand Down
4 changes: 3 additions & 1 deletion fluffy/network/beacon/beacon_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@ proc validateContent(
return false

let contentId = contentIdOpt.get()
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
n.portalProtocol.storeContent(
contentKey, contentId, contentItem, cacheOffer = true
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure about whether to cache in the store call. It may not be necessary and might be better to only cache offers received over the network.


debug "Received offered content validated successfully", contentKey
else:
Expand Down
4 changes: 3 additions & 1 deletion fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,9 @@ proc validateContent(
error "Received offered content with invalid content key", contentKey
return false

n.portalProtocol.storeContent(contentKey, contentId, contentItem)
n.portalProtocol.storeContent(
contentKey, contentId, contentItem, cacheOffer = true
)

debug "Received offered content validated successfully", contentKey
else:
Expand Down
5 changes: 4 additions & 1 deletion fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ proc processOffer*(
return err("Received offered content with invalid content key")

n.portalProtocol.storeContent(
contentKeyBytes, contentId, contentValue.toRetrievalValue().encode()
contentKeyBytes,
contentId,
contentValue.toRetrievalValue().encode(),
cacheOffer = true,
)

await gossipOffer(
Expand Down
145 changes: 86 additions & 59 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ declareCounter portal_gossip_without_lookup,
"Portal wire protocol neighborhood gossip that did not require a node lookup",
labels = ["protocol_id"]
declareCounter portal_content_cache_hits,
"Portal wire protocol local content lookups that hit the cache",
"Portal wire protocol local content lookups that hit the content cache",
labels = ["protocol_id"]
declareCounter portal_content_cache_misses,
"Portal wire protocol local content lookups that don't hit the cache",
"Portal wire protocol local content lookups that don't hit the content cache",
labels = ["protocol_id"]
declareCounter portal_offer_cache_hits,
"Portal wire protocol local content lookups that hit the offer cache",
labels = ["protocol_id"]
declareCounter portal_offer_cache_misses,
"Portal wire protocol local content lookups that don't hit the offer cache",
labels = ["protocol_id"]

declareCounter portal_poke_offers,
Expand Down Expand Up @@ -161,8 +167,16 @@ type

RadiusCache* = LruCache[NodeId, UInt256]

# Caches content fetched from the network during lookups.
# Content outside our radius is also cached in order to improve performance
# of queries which may lookup data outside our radius.
ContentCache = LruCache[ContentId, seq[byte]]

# Caches the most recently received content/offers.
# Content is only stored in this cache if it falls within our radius and similarly
# the cache is only checked if the content id is within our radius.
OfferCache = LruCache[ContentId, seq[byte]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought of this as a cache that only holds the content ids or the content keys (and thus could store more of them for the same data size). The cache would only be there to avoid hammering the database on a spam of offers.

After checking the PR I understand that it is similar as the ContentCache, in the sense that it is also used to check for data on requests. But with the difference that it only gets stored via the offer/accept flow (and json-rpc store method). This was not immediately clear to me.

In general the ContentCache is more useful because it foremost avoids re-doing recent network requests. This is something that OfferCache does not avoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I remember we discussed only storing the content keys/ids and mapping to a bool in order to save space. I'm open to doing it this way which does provide some benefit for rejecting existing offers quickly but I believe that caching the recently received offers provides some other benefits because the cached offers are reusable in the other flows.

The Sqlite in memory caching is limited and having some form of in memory caching per sub-network that is not specifically tied to recent queries will be useful. For example once we have the network fully synced and following the latest blocks, the most recently received offered content is likely to be looked up more frequently during normal usage of portal network by applications.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compare the in memory caching of Sqlite to RocksDb:

  • Sqlite only uses the os page cache
  • RocksDb uses the os page cache, the block cache (cache for recent reads) and MemTable (cache of recent writes)

For RocksDb the block cache improves the performance of recent reads and the MemTable improves the performance of reading recently written values (this is not the only purpose of the MemTable but one of its benefits). For the caching in Fluffy I was thinking we should have something similar in purpose to the RocksDb MemTable where we cache recent writes (we can keep the default size small to avoid increased memory usage).

For Fluffy we have a single database/table shared across multiple logically isolated sub-networks for which the data doesn't normally need to be queried together which ideally should be stored in separate tables (but that leads to compilations with pruning). With the offer cache we at least have a separate write cache per subnetwork to alleviate load on the shared database.

My motivation for this change was partly because I noticed that the performance of returning a value from a large fully populated LRU cache is dramatically faster than even doing a contains check on a small Sqlite db.


ContentKV* = object
contentKey*: ContentKeyByteList
content*: seq[byte]
Expand Down Expand Up @@ -197,6 +211,7 @@ type
radiusCache: RadiusCache
offerQueue: AsyncQueue[OfferRequest]
offerWorkers: seq[Future[void]]
offerCache: OfferCache
pingTimings: Table[NodeId, chronos.Moment]
config*: PortalProtocolConfig

Expand Down Expand Up @@ -401,6 +416,55 @@ proc handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
let enrs = List[ByteList[2048], 32](@[])
encodeMessage(NodesMessage(total: 1, enrs: enrs))

proc storeContent*(
p: PortalProtocol,
contentKey: ContentKeyByteList,
contentId: ContentId,
content: seq[byte],
cacheContent = false,
cacheOffer = false,
): bool {.discardable.} =
if cacheContent and not p.config.disableContentCache:
# We cache content regardless of whether it is in our radius or not
p.contentCache.put(contentId, content)

# Always re-check that the key is still in the node range to make sure only
# content in range is stored.
if p.inRange(contentId):
p.dbPut(contentKey, contentId, content)

if cacheOffer and not p.config.disableOfferCache:
p.offerCache.put(contentId, content)

true
else:
false

proc getLocalContent*(
p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId
): Opt[seq[byte]] =
# The cache can contain content that is not in our radius
let maybeContent = p.contentCache.get(contentId)
if maybeContent.isSome():
portal_content_cache_hits.inc(labelValues = [$p.protocolId])
return maybeContent

portal_content_cache_misses.inc(labelValues = [$p.protocolId])

# Check first if content is in range, as this is a cheaper operation
# than the database lookup.
if p.inRange(contentId):
let maybeContent = p.offerCache.get(contentId)
if maybeContent.isSome():
portal_offer_cache_hits.inc(labelValues = [$p.protocolId])
return maybeContent

portal_offer_cache_misses.inc(labelValues = [$p.protocolId])

p.dbGet(contentKey, contentId)
else:
Opt.none(seq[byte])

proc handleFindContent(
p: PortalProtocol, fc: FindContentMessage, srcId: NodeId
): seq[byte] =
Expand All @@ -420,25 +484,21 @@ proc handleFindContent(
int64(logDistance), labelValues = [$p.protocolId]
)

# Check first if content is in range, as this is a cheaper operation
if p.inRange(contentId):
let contentResult = p.dbGet(fc.contentKey, contentId)
if contentResult.isOk():
let content = contentResult.get()
if content.len <= maxPayloadSize:
return encodeMessage(
ContentMessage(
contentMessageType: contentType, content: ByteList[2048](content)
)
let contentResult = p.getLocalContent(fc.contentKey, contentId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When handling findContent messages we should return the content from the cache if available. When following the latest blocks, we likely will see more of these queries looking up the latest received offers.

if contentResult.isOk():
let content = contentResult.get()
if content.len <= maxPayloadSize:
return encodeMessage(
ContentMessage(
contentMessageType: contentType, content: ByteList[2048](content)
)
else:
let connectionId = p.stream.addContentRequest(srcId, content)
)
else:
let connectionId = p.stream.addContentRequest(srcId, content)

return encodeMessage(
ContentMessage(
contentMessageType: connectionIdType, connectionId: connectionId
)
)
return encodeMessage(
ContentMessage(contentMessageType: connectionIdType, connectionId: connectionId)
)

# Node does not have the content, or content is not even in radius,
# send closest neighbours to the requested content id.
Expand Down Expand Up @@ -479,7 +539,10 @@ proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
)

if p.inRange(contentId):
if not p.dbContains(contentKey, contentId):
# Checking the offer cache first to reduce the load on the database
# for the case when the offer already exists and it was recently accepted
if not p.offerCache.contains(contentId) and
not p.dbContains(contentKey, contentId):
contentKeysBitList.setBit(i)
discard contentKeys.add(contentKey)
else:
Expand Down Expand Up @@ -592,6 +655,8 @@ proc new*(
stream: stream,
radiusCache: RadiusCache.init(256),
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers),
offerCache:
OfferCache.init(if config.disableOfferCache: 0 else: config.offerCacheSize),
pingTimings: Table[NodeId, chronos.Moment](),
config: config,
)
Expand Down Expand Up @@ -955,7 +1020,7 @@ proc offer(
if contentIdResult.isOk():
let
contentId = contentIdResult.get()
contentResult = p.dbGet(contentKey, contentId)
contentResult = p.getLocalContent(contentKey, contentId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case when offering content via the database, the cache should be checked. The only downside I can think of is that it would influence the order of the content cache which may not be desired. It might be better to just check the offer cache and not the content cache, not sure about this yet.


var output = memoryOutput()
if contentResult.isOk():
Expand Down Expand Up @@ -1600,44 +1665,6 @@ proc randomGossipDiscardPeers*(
): Future[void] {.async: (raises: [CancelledError]).} =
discard await p.randomGossip(srcNodeId, contentKeys, content)

proc storeContent*(
p: PortalProtocol,
contentKey: ContentKeyByteList,
contentId: ContentId,
content: seq[byte],
cacheContent = false,
): bool {.discardable.} =
if cacheContent and not p.config.disableContentCache:
# We cache content regardless of whether it is in our radius or not
p.contentCache.put(contentId, content)

# Always re-check that the key is still in the node range to make sure only
# content in range is stored.
if p.inRange(contentId):
doAssert(p.dbPut != nil)
p.dbPut(contentKey, contentId, content)
true
else:
false

proc getLocalContent*(
p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId
): Opt[seq[byte]] =
# The cache can contain content that is not in our radius
let maybeContent = p.contentCache.get(contentId)
if maybeContent.isSome():
portal_content_cache_hits.inc(labelValues = [$p.protocolId])
return maybeContent

portal_content_cache_misses.inc(labelValues = [$p.protocolId])

# Check first if content is in range, as this is a cheaper operation
# than the database lookup.
if p.inRange(contentId):
p.dbGet(contentKey, contentId)
else:
Opt.none(seq[byte])

proc seedTable*(p: PortalProtocol) =
## Seed the table with specifically provided Portal bootstrap nodes. These are
## nodes that must support the wire protocol for the specific content network.
Expand Down
10 changes: 10 additions & 0 deletions fluffy/network/wire/portal_protocol_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type
maxGossipNodes*: int
contentCacheSize*: int
disableContentCache*: bool
offerCacheSize*: int
disableOfferCache*: bool

const
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
Expand All @@ -51,6 +53,8 @@ const
defaultMaxGossipNodes* = 4
defaultContentCacheSize* = 100
defaultDisableContentCache* = false
defaultOfferCacheSize* = 100
defaultDisableOfferCache* = false
revalidationTimeout* = chronos.seconds(30)

defaultPortalProtocolConfig* = PortalProtocolConfig(
Expand All @@ -61,6 +65,8 @@ const
maxGossipNodes: defaultMaxGossipNodes,
contentCacheSize: defaultContentCacheSize,
disableContentCache: defaultDisableContentCache,
offerCacheSize: defaultOfferCacheSize,
disableOfferCache: defaultDisableOfferCache,
)

proc init*(
Expand All @@ -73,6 +79,8 @@ proc init*(
maxGossipNodes: int,
contentCacheSize: int,
disableContentCache: bool,
offerCacheSize: int,
disableOfferCache: bool,
): T =
PortalProtocolConfig(
tableIpLimits:
Expand All @@ -83,6 +91,8 @@ proc init*(
maxGossipNodes: maxGossipNodes,
contentCacheSize: contentCacheSize,
disableContentCache: disableContentCache,
offerCacheSize: offerCacheSize,
disableOfferCache: disableOfferCache,
)

func fromLogRadius*(T: type UInt256, logRadius: uint16): T =
Expand Down
2 changes: 1 addition & 1 deletion fluffy/rpc/rpc_portal_beacon_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ proc installPortalBeaconApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
contentId = p.toContentId(key).valueOr:
raise invalidKeyErr()

p.storeContent(key, contentId, contentValueBytes)
p.storeContent(key, contentId, contentValueBytes, cacheOffer = true)

rpcServer.rpc("portal_beaconLocalContent") do(contentKey: string) -> string:
let
Expand Down
2 changes: 1 addition & 1 deletion fluffy/rpc/rpc_portal_history_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ proc installPortalHistoryApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
contentId = p.toContentId(key).valueOr:
raise invalidKeyErr()

p.storeContent(key, contentId, contentValueBytes)
p.storeContent(key, contentId, contentValueBytes, cacheOffer = true)

rpcServer.rpc("portal_historyLocalContent") do(contentKey: string) -> string:
let
Expand Down
4 changes: 2 additions & 2 deletions fluffy/rpc/rpc_portal_state_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
contentValue = validateOfferGetValue(Opt.none(Hash32), key, contentBytes).valueOr:
raise invalidValueErr()

p.storeContent(keyBytes, contentId, contentValue)
p.storeContent(keyBytes, contentId, contentValue, cacheOffer = true)

rpcServer.rpc("portal_stateLocalContent") do(contentKey: string) -> string:
let
Expand All @@ -159,7 +159,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
contentValue = validateOfferGetValue(Opt.none(Hash32), key, contentBytes).valueOr:
raise invalidValueErr()

p.storeContent(keyBytes, contentId, contentValue)
p.storeContent(keyBytes, contentId, contentValue, cacheOffer = true)

await p.neighborhoodGossip(
Opt.none(NodeId), ContentKeysList(@[keyBytes]), @[contentBytes]
Expand Down
Loading