Skip to content

Commit

Permalink
feat: batch db calls when loading candidates
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 committed May 21, 2024
1 parent bbca649 commit a476998
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 10 deletions.
48 changes: 41 additions & 7 deletions src/repositories/__tests__/metadata-repository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import { expectPresent } from '../../__tests__/expect-present.util.js'
let dbConnection: Knex
let repository: MetadataRepository

const FRESH_METADATA: FreshMetadata = {
streamId: randomStreamID(),
metadata: {
controllers: [asDIDString('did:key:controller')],
model: randomStreamID(),
tags: ['hello'],
},
const FRESH_METADATA = createFreshMetadata()

function createFreshMetadata(): FreshMetadata {
return {
streamId: randomStreamID(),
metadata: {
controllers: [asDIDString('did:key:controller')],
model: randomStreamID(),
tags: ['hello'],
},
}
}

beforeAll(async () => {
Expand Down Expand Up @@ -94,3 +98,33 @@ test('touch', async () => {
expectPresent(retrieved2)
expect(retrieved2.usedAt.valueOf()).toBeCloseTo(now1.valueOf(), -2)
})

test('batchRetrieve', async () => {
const metadata1 = createFreshMetadata()
const metadata2 = createFreshMetadata()

await repository.save(metadata1)
await repository.save(metadata2)
const retrieved = await repository.batchRetrieve([
metadata1.streamId,
metadata2.streamId,
FRESH_METADATA.streamId,
])
expect(retrieved.length).toEqual(2)

const retrieved1 = retrieved.find((m) => m.streamId.toString() === metadata1.streamId.toString())
const retrieved2 = retrieved.find((m) => m.streamId.toString() === metadata2.streamId.toString())
const retreived3 = retrieved.find(
(m) => m.streamId.toString() === FRESH_METADATA.streamId.toString()
)

expectPresent(retrieved1)
expect(retrieved1.streamId).toEqual(metadata1.streamId)
expect(retrieved1.metadata).toEqual(metadata1.metadata)

expectPresent(retrieved2)
expect(retrieved2.streamId).toEqual(metadata2.streamId)
expect(retrieved2.metadata).toEqual(metadata2.metadata)

expect(retreived3).toBeUndefined()
})
12 changes: 12 additions & 0 deletions src/repositories/anchor-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Knex } from 'knex'
import type { AnchorWithRequest, IAnchorRepository } from './anchor-repository.type.js'
import { parseCountResult } from './parse-count-result.util.js'
import { decode } from 'codeco'
import { request } from 'https'

const TABLE_NAME = 'anchor'

Expand Down Expand Up @@ -57,4 +58,15 @@ export class AnchorRepository implements IAnchorRepository {
if (!row) return null
return decode(StoredAnchor, row)
}

async findByRequests(requests: Request[]): Promise<AnchorWithRequest[]> {
const rows = await this.table.whereIn(
'requestId',
requests.map((r) => r.id)
)
return rows.map((row) => {
const anchor = decode(StoredAnchor, row)
return { ...anchor, request }
})
}
}
1 change: 1 addition & 0 deletions src/repositories/anchor-repository.type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ export interface IAnchorRepository {
findByRequest(request: Request): Promise<AnchorWithRequest | null>
findByRequestId(id: string): Promise<StoredAnchor | null>
withConnection(connection: Knex): IAnchorRepository
findByRequests(requests: Request[]): Promise<AnchorWithRequest[]>
}
15 changes: 15 additions & 0 deletions src/repositories/metadata-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export interface IMetadataRepository {
* Mark an entry as used `now`. Return true if touched, i.e. if the entry was in the database.
*/
touch(streamId: StreamID, now?: Date): Promise<boolean>
/**
* Find all entries for the given `streamIds`. Return an empty array if none found.
*/
batchRetrieve(streamIds: StreamID[]): Promise<StoredMetadata[]>
}

/**
Expand Down Expand Up @@ -90,4 +94,15 @@ export class MetadataRepository implements IMetadataRepository {
.update({ usedAt: date.encode(now) })
return rowsTouched > 0
}

/**
* Find all entries for the given `streamIds`. Return an empty array if none found.
*/
async batchRetrieve(streamIds: StreamID[]): Promise<StoredMetadata[]> {
const rows = await this.table.whereIn(
'streamId',
streamIds.map((s) => s.toString())
)
return rows.map((row: any) => decode(StoredMetadata, row))
}
}
23 changes: 20 additions & 3 deletions src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ export class AnchorService {
}

try {
logger.imp(`Anchoring ${batchMessage.data.rids.length} requests from batch ${batchMessage.data.bid}`)
logger.imp(
`Anchoring ${batchMessage.data.rids.length} requests from batch ${batchMessage.data.bid}`
)
const requests = await this.requestRepository.findByIds(batchMessage.data.rids)

const requestsNotReplaced = requests.filter(
Expand Down Expand Up @@ -645,9 +647,15 @@ export class AnchorService {
*/
async _buildCandidates(requests: Request[]): Promise<Array<Candidate>> {
const candidates = []
const metadataByStreamId = await this.metadataService
.batchRetrieve(requests.map((r) => StreamID.fromString(r.streamId)))
.then((metadata) => {
return Object.fromEntries(metadata.map((m) => [m.streamId.toString(), m]))
})

for (const request of requests) {
const streamId = StreamID.fromString(request.streamId)
const metadata = await this.metadataService.retrieve(streamId)
const metadata = metadataByStreamId[request.streamId]
if (metadata) {
const candidate = new Candidate(streamId, request, metadata.metadata)
candidates.push(candidate)
Expand Down Expand Up @@ -680,6 +688,15 @@ export class AnchorService {
candidateLimit = candidates.length
}

// batch load anchor commits for all candidates. If one already exists we can skip that candidate.
const anchorCommitsByRequest = await this.anchorRepository
.findByRequests(candidates.map((candidate) => candidate.request))
.then((anchorCommits) => {
return Object.fromEntries(
anchorCommits.map((anchorCommit) => [anchorCommit.requestId, anchorCommit])
)
})

for (const candidate of candidates) {
if (numSelectedCandidates >= candidateLimit) {
// No need to process this candidate, we've already filled our anchor batch
Expand All @@ -689,7 +706,7 @@ export class AnchorService {

// anchor commit may already exist so check first
const existingAnchorCommit = candidate.shouldAnchor()
? await this.anchorRepository.findByRequest(candidate.request)
? anchorCommitsByRequest[candidate.request.id]
: null

if (existingAnchorCommit) {
Expand Down
5 changes: 5 additions & 0 deletions src/services/metadata-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface IMetadataService {
fill(streamId: StreamID, genesisFields: GenesisFields): Promise<void>
fillFromIpfs(streamId: StreamID, options?: AbortOptions): Promise<GenesisFields>
retrieve(streamId: StreamID): Promise<StoredMetadata | undefined>
batchRetrieve(streamIds: StreamID[]): Promise<StoredMetadata[]>
}

/**
Expand Down Expand Up @@ -103,6 +104,10 @@ export class MetadataService implements IMetadataService {
return this.metadataRepository.retrieve(streamId)
}

async batchRetrieve(streamIds: StreamID[]): Promise<StoredMetadata[]> {
return this.metadataRepository.batchRetrieve(streamIds)
}

/**
* Store genesis header fields in a database.
*/
Expand Down

0 comments on commit a476998

Please sign in to comment.