Skip to content

Commit

Permalink
code clean up and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Jan 23, 2025
1 parent 98906fd commit 7d150c9
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 18 deletions.
88 changes: 72 additions & 16 deletions src/middleware/withContentClaimsDagula.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import * as dagPb from '@ipld/dag-pb'

/**
* Creates a dagula instance backed by content claims.
* Get operations for DAG Protobuf content are cached if the DAGPB_CONTENT_CACHE is enabled.
*
* @type {(
* Middleware<
Expand All @@ -32,22 +33,9 @@ export function withContentClaimsDagula(handler) {
const fetcher = BatchingFetcher.create(locator, ctx.fetch)
const dagula = new Dagula({
async get(cid) {
if (env.FF_DAGPB_CONTENT_CACHE_ENABLED === 'true' && cid.code === dagPb.code) {
const dagPbBytes = await env.DAGPB_CONTENT_CACHE.get(cid.multihash.digest.toString(), 'arrayBuffer')
if (dagPbBytes) {
return { cid, bytes: new Uint8Array(dagPbBytes) }
}
const res = await fetcher.fetch(cid.multihash)
if (res.ok) {
const bytes = await res.ok.bytes()
if (env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB && bytes.length <= env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB * 1024 * 1024) {
await env.DAGPB_CONTENT_CACHE.put(cid.multihash.digest.toString(), bytes.buffer, {
expirationTtl: env.FF_DAGPB_CONTENT_CACHE_TTL > 60 ? env.FF_DAGPB_CONTENT_CACHE_TTL : undefined,
})
}
return { cid, bytes }
}
return undefined
const dagPbContent = await getDagPbContent(env, fetcher, cid)
if (dagPbContent) {
return dagPbContent
}
const res = await fetcher.fetch(cid.multihash)
return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined
Expand All @@ -64,3 +52,71 @@ export function withContentClaimsDagula(handler) {
return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula })
}
}

/**
* Returns the cached DAG Protobuf bytes if they exist, otherwise fetches the DAG Protobuf bytes
* from the fetcher and caches them in the KV store.
*
* @param {Environment} env
* @param {import('@web3-storage/blob-fetcher').Fetcher} fetcher
* @param {import('multiformats').UnknownLink} cid
* @returns {Promise<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array } | undefined>}
*/
async function getDagPbContent(env, fetcher, cid) {
if (env.FF_DAGPB_CONTENT_CACHE_ENABLED === 'true' && cid.code === dagPb.code) {
const cachedBytes = await getCachedDagPbBytes(env, cid);
if (cachedBytes) {
return { cid, bytes: cachedBytes };
}

const res = await fetcher.fetch(cid.multihash);
if (res.ok) {
const bytes = await res.ok.bytes();
const cachedBytes = await cacheDagPbBytes(env, cid, bytes);
return { cid, bytes: cachedBytes };
}
}
return undefined;
}

/**
* Caches the DAG Protobuf content into the KV store if the content size is less than or equal to the max size.
* The content is cached for the duration of the TTL (seconds), if the TTL is not set, the content is cached indefinitely.
*
* @param {Environment} env
* @param {import('multiformats').UnknownLink} cid
* @param {Uint8Array} bytes
* @returns {Promise<Uint8Array>}
*/
async function cacheDagPbBytes(env, cid, bytes) {

if (env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB && bytes.length <= env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB * 1024 * 1024) {
try {
await env.DAGPB_CONTENT_CACHE.put(cid.multihash.digest.toString(), bytes.buffer, {
expirationTtl: env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS > 60 ? env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS : undefined,
})
} catch (/** @type {any} */ error) {
// TODO retry with exponential backoff?
if (error.message.includes("KV PUT failed: 429 Too Many Requests")) {
return bytes
}
console.error(error)
}
}
return bytes
}

/**
* Returns the cached DAG Protobuf bytes if they exist, otherwise returns null.
*
* @param {Environment} env
* @param {import('multiformats').UnknownLink} cid
* @returns {Promise<Uint8Array | null>}
*/
async function getCachedDagPbBytes(env, cid) {
const dagPbBytes = await env.DAGPB_CONTENT_CACHE.get(cid.multihash.digest.toString(), 'arrayBuffer')
if (dagPbBytes) {
return new Uint8Array(dagPbBytes)
}
return null
}
2 changes: 1 addition & 1 deletion src/middleware/withContentClaimsDagula.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface Environment extends MiddlewareEnvironment {
* The number that represents when to expire the key-value pair in seconds from now.
* The minimum value is 60 seconds. Any value less than 60MB will not be used.
*/
FF_DAGPB_CONTENT_CACHE_TTL: number
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS: number
/**
* The maximum size of the key-value pair in MB.
* The minimum value is 1 MB. Any value less than 1MB will not be used.
Expand Down
2 changes: 1 addition & 1 deletion wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ FF_EGRESS_TRACKER_ENABLED = "true"
FF_TELEMETRY_ENABLED = "true"
FF_DELEGATIONS_STORAGE_ENABLED = "true"
FF_RAMP_UP_PROBABILITY = "0"
FF_DAGPB_CONTENT_CACHE_TTL = 300
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS = 300
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB = 2
FF_DAGPB_CONTENT_CACHE_ENABLED = "true"
TELEMETRY_RATIO = 1.0
Expand Down

0 comments on commit 7d150c9

Please sign in to comment.