Skip to content

Commit

Permalink
refactor(fetcher): document algorithms used, cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Dec 25, 2024
1 parent 80320a7 commit dcef703
Showing 1 changed file with 99 additions and 85 deletions.
184 changes: 99 additions & 85 deletions src/fetcher/batching.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class BatchingFetcher {
break
}
}
lastResolveBlobs = resolveBlobs(fetchRes.ok, pendingReqs)
lastResolveBlobs = resolveRequests(fetchRes.ok, pendingReqs)
}

// await the last call to resolve blobs
Expand Down Expand Up @@ -157,129 +157,143 @@ class BatchingFetcher {
*/
export const create = (locator) => new BatchingFetcher(locator)

/** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedRange */
/** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedBlobs */

/**
* Fetch blobs from the passed locations. The locations MUST share a common
* site to fetch from.
*/
export const fetchBlobs = withResultSpan('fetchBlobs',
export const fetchBlobs = withResultSpan('fetchBlobs', _fetchBlobs)

/**
* @param {URL} url Desired URL to fetch blobs from.
* @param {Array<{ location: API.Location, range?: API.Range }>} locations
* @returns {Promise<API.Result<AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>, API.NotFound|API.Aborted|API.NetworkError>>}
*/
async (url, locations) => {
if (locations.length === 1) {
const res = await fetchBlob(locations[0].location, locations[0].range)
if (res.error) return res
return {
ok: (async function * () {
yield { blob: res.ok, range: locations[0].range }
return { ok: true }
}())
}
async function _fetchBlobs (url, locations) {
if (locations.length === 1) {
const res = await fetchBlob(locations[0].location, locations[0].range)
if (res.error) return res
return {
ok: (async function * () {
yield { blob: res.ok, range: locations[0].range }
return { ok: true }
}())
}
}

// resolve ranges for blobs

/** @type {ResolvedRange[]} */
const ranges = []
for (const { location, range } of locations) {
for (const s of location.site) {
let found = false
for (const l of s.location) {
if (l.toString() === url.toString()) {
/** @type {ResolvedBlobs[]} */
const resolvedBlobs = []
for (const { location, range } of locations) {
for (const s of location.site) {
let found = false
for (const l of s.location) {
if (l.toString() === url.toString()) {
/** @type {API.AbsoluteRange} */
let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1]
if (range) {
const relRange = resolveRange(range, s.range.length)
resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]]
}
ranges.push({
digest: location.digest,
range: resolvedRange,
orig: range
})
found = true
break
let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1]
if (range) {
const relRange = resolveRange(range, s.range.length)
resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]]
}
resolvedBlobs.push({
digest: location.digest,
range: resolvedRange,
orig: range
})
found = true
break
}
if (found) break
}
if (found) break
}
if (ranges.length !== locations.length) {
throw new Error('no common site')
}
}
if (resolvedBlobs.length !== locations.length) {
throw new Error('no common site')
}

ranges.sort((a, b) => a.range[0] - b.range[0])
const aggregateRangeEnd = ranges.reduce((aggregateEnd, r) => r.range[1] > aggregateEnd ? r.range[1] : aggregateEnd, 0)
const headers = { Range: `bytes=${ranges[0].range[0]}-${aggregateRangeEnd}` }
try {
const res = await fetch(url, { headers })
if (!res.ok) {
return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) }
}
return { ok: consumeMultipartResponse(url, ranges, res) }
} catch (err) {
return { error: new NetworkError(url, { cause: err }) }
// sort blobs by starting byte
resolvedBlobs.sort((a, b) => a.range[0] - b.range[0])
// get last byte to fetch
const aggregateRangeEnd = resolvedBlobs.reduce((aggregateEnd, r) => r.range[1] > aggregateEnd ? r.range[1] : aggregateEnd, 0)
// fetch bytes from the first starting byte to the last byte to fetch
const headers = { Range: `bytes=${resolvedBlobs[0].range[0]}-${aggregateRangeEnd}` }
try {
const res = await fetch(url, { headers })
if (!res.ok) {
return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) }
}
})
return { ok: consumeBatchResponse(url, resolvedBlobs, res) }
} catch (err) {
return { error: new NetworkError(url, { cause: err }) }
}
}

/** @typedef {{blob: API.Blob, range: API.Range | undefined}} BlobResult */

/**
* Consumes a multipart range request to create multiple blobs
* Consumes a batch request to create multiple blobs. Will break up
* a byte range going from first byte byte of first blob to last byte of last blob
* into appropriate ranges for each blob
*/
const consumeMultipartResponse = withAsyncGeneratorSpan('consumeMultipartResponse',
const consumeBatchResponse = withAsyncGeneratorSpan('consumeBatchResponse', _consumeBatchResponse)

/**
* @param {URL} url
* @param {ResolvedRange[]} sortedRanges
* @param {ResolvedBlobs[]} sortedBlobs
* @param {Response} res
* @returns {AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>}
*/
async function * (url, sortedRanges, res) {
if (!res.body) {
return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) }
}
const parts = new Uint8ArrayList()
let farthestRead = sortedRanges[0].range[0]
let farthestConsumed = sortedRanges[0].range[0]
let currentRange = 0
try {
for await (const chunk of res.body) {
// append the chunk to our buffer
parts.append(chunk)
// update the absolute range of what we've read
farthestRead += chunk.byteLength
// read and push any blobs in the current buffer
// note that as long as ranges are sorted ascending by start
// this should be resilient to overlapping ranges
while (farthestRead >= sortedRanges[currentRange].range[1] + 1) {
const blob = new Blob(sortedRanges[currentRange].digest,
parts.subarray(sortedRanges[currentRange].range[0] - farthestConsumed, sortedRanges[currentRange].range[1] + 1 - farthestConsumed))
yield ({ blob, range: sortedRanges[currentRange].orig })
currentRange++
if (currentRange >= sortedRanges.length) {
return { ok: true }
}
let toConsume = sortedRanges[currentRange].range[0] - farthestConsumed
if (toConsume > parts.byteLength) { toConsume = parts.byteLength }
parts.consume(toConsume)
farthestConsumed += toConsume
async function * _consumeBatchResponse (url, sortedBlobs, res) {
if (!res.body) {
return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) }
}
const parts = new Uint8ArrayList()
// start at first byte of first blob
let farthestRead = sortedBlobs[0].range[0]
let farthestConsumed = sortedBlobs[0].range[0]
let currentBlob = 0
try {
for await (const chunk of res.body) {
// append the chunk to our buffer
parts.append(chunk)
// update the absolute position of how far we've read
farthestRead += chunk.byteLength
// resolve any blobs in the current buffer
// note that as long as blobs are sorted ascending by start
// this should be resilient to overlapping ranges
while (farthestRead >= sortedBlobs[currentBlob].range[1] + 1) {
// generate blob out of the current buffer
const blob = new Blob(sortedBlobs[currentBlob].digest,
parts.subarray(sortedBlobs[currentBlob].range[0] - farthestConsumed, sortedBlobs[currentBlob].range[1] + 1 - farthestConsumed))
yield ({ blob, range: sortedBlobs[currentBlob].orig })
currentBlob++
if (currentBlob >= sortedBlobs.length) {
return { ok: true }
}
// consume any bytes we no longer need
// (they are before the beginning of the current range)
let toConsume = sortedBlobs[currentBlob].range[0] - farthestConsumed
if (toConsume > parts.byteLength) { toConsume = parts.byteLength }
parts.consume(toConsume)
farthestConsumed += toConsume
}
return { error: new NetworkError(url, { cause: new Error('did not resolve all chunks') }) }
} catch (err) {
return { error: new NetworkError(url, { cause: err }) }
}
})
return { error: new NetworkError(url, { cause: new Error('did not resolve all chunks') }) }
} catch (err) {
return { error: new NetworkError(url, { cause: err }) }
}
}

/**
* Resolve pending requests from blobs generated out of the last fetch
*
* @param {AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>} results
* @param {DigestMap<API.MultihashDigest, RangedRequests>} pendingReqs
* @returns {Promise<API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>}
*/
const resolveBlobs = async (results, pendingReqs) => {
const resolveRequests = async (results, pendingReqs) => {
for (;;) {
const { value: result, done } = await results.next()
if (done) {
Expand Down

0 comments on commit dcef703

Please sign in to comment.