Skip to content

Commit

Permalink
fix: Ensure loading a stream at a CommitID of an anchor can repair a …
Browse files Browse the repository at this point in the history
…stream with a CACAO error
  • Loading branch information
stbrody committed Sep 24, 2024
1 parent d46f3de commit 5e92e19
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 145 deletions.
51 changes: 36 additions & 15 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,15 @@ export class Repository {
})
}

/**
* Only used for testing.
* Clears the in-memory cache of stream states.
* @private
*/
_clearCache() {
this.inmemory.clear()
}

/**
* Must be called from within the ExecutionQueue to be safe.
*/
Expand Down Expand Up @@ -487,28 +496,40 @@ export class Repository {
// We also skip CACAO expiration checking during this initial load as its possible
// that the CommitID we are being asked to load may in fact be an anchor commit with
// the timestamp information that will reveal to us that the CACAO didn't actually expire.
const base$ = await this.load(commitId.baseID, opts, false)

return this._atCommit(commitId, base$)
}
let existingState$ = null
try {
existingState$ = await this.load(commitId.baseID, opts, false)
} catch (err) {
this.logger.warn(
`Error loading existing state for stream ${commitId.baseID} while loading stream at commit ${commitId.commit}`
)
}

private async _atCommit(
commitId: CommitID,
existingState$: RunningState
): Promise<SnapshotState> {
return this.executionQ.forStream(commitId).run(async () => {
const stateAtCommit = await this.streamLoader.stateAtCommit(existingState$.state, commitId)
const stateAtCommit = existingState$
? await this.streamLoader.resetStateToCommit(existingState$.state, commitId)
: await this.streamLoader.stateAtCommit(commitId)

// Since we skipped CACAO expiration checking earlier when we loaded the current stream state,
// we need to make sure to do it here.
SignatureUtils.checkForCacaoExpiration(stateAtCommit)

// If the provided CommitID is ahead of what we have in the cache and state store, then we
// should update them to include it.
if (StreamUtils.isStateSupersetOf(stateAtCommit, existingState$.value)) {
existingState$.next(stateAtCommit)
await this._updateStateIfPinned(existingState$)
// Check if we need to update what state is stored in our state store based on any
// information learned from this CommitID.
if (existingState$) {
if (StreamUtils.isStateSupersetOf(stateAtCommit, existingState$.value)) {
// If the provided CommitID is ahead of what we have in the cache and state store, then we
// should update them to include it.
existingState$.next(stateAtCommit)
await this._updateStateIfPinned(existingState$)
}
} else {
// No existing state for this stream exists, so save it now.
const newState$ = new RunningState(stateAtCommit, false)
this._registerRunningState(newState$)
await this._updateStateIfPinned(newState$)
}

return new SnapshotState(stateAtCommit)
})
}
Expand All @@ -522,7 +543,7 @@ export class Repository {
async loadAtTime(streamId: StreamID, opts: LoadOpts): Promise<SnapshotState> {
const base$ = await this.load(streamId.baseID, opts)
const commitId = commitAtTime(base$.state, opts.atTime)
return this._atCommit(commitId, base$)
return this.loadAtCommit(commitId, opts)
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/state-management/state-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export class StateCache<T> implements Iterable<[string, T]> {
this.volatile.delete(key)
}

clear() {
this.durable.clear()
this.volatile.clear()
}

/**
* Set value to durable bucket. Remove from volatile.
*/
Expand Down
31 changes: 20 additions & 11 deletions packages/core/src/stream-loading/__tests__/stream-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,31 +180,31 @@ describeIfV3('Streamloader', () => {
expect(doc.id.toString()).toEqual(commit.baseID.toString())
}

const stateV0 = await streamLoader.stateAtCommit(doc.state, commits[0])
const stateV0 = await streamLoader.resetStateToCommit(doc.state, commits[0])
expect(stateV0.log.length).toEqual(1)
expect(stateV0.content).toEqual(CONTENT0)
expect(stateV0.next).toBeUndefined()
expect(stateV0.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)

const stateV1 = await streamLoader.stateAtCommit(doc.state, commits[1])
const stateV1 = await streamLoader.resetStateToCommit(doc.state, commits[1])
expect(stateV1.log.length).toEqual(2)
expect(stateV1.content).toEqual(CONTENT0)
expect(stateV1.next).toBeUndefined()
expect(stateV1.anchorStatus).toEqual(AnchorStatus.ANCHORED)

const stateV2 = await streamLoader.stateAtCommit(doc.state, commits[2])
const stateV2 = await streamLoader.resetStateToCommit(doc.state, commits[2])
expect(stateV2.log.length).toEqual(3)
expect(stateV2.content).toEqual(CONTENT0)
expect(stateV2.next.content).toEqual(CONTENT1)
expect(stateV2.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)

const stateV3 = await streamLoader.stateAtCommit(doc.state, commits[3])
const stateV3 = await streamLoader.resetStateToCommit(doc.state, commits[3])
expect(stateV3.log.length).toEqual(4)
expect(stateV3.content).toEqual(CONTENT0)
expect(stateV3.next.content).toEqual(CONTENT2)
expect(stateV3.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)

const stateV4 = await streamLoader.stateAtCommit(doc.state, commits[4])
const stateV4 = await streamLoader.resetStateToCommit(doc.state, commits[4])
expect(stateV4.log.length).toEqual(5)
expect(stateV4.content).toEqual(CONTENT2)
expect(stateV4.next).toBeUndefined()
Expand All @@ -218,13 +218,19 @@ describeIfV3('Streamloader', () => {
await CoreUtils.anchorUpdate(ceramic, stream)

// Now load the stream at a commitID ahead of what is currently in the state
const updatedState1 = await streamLoader.stateAtCommit(initialState, stream.allCommitIds[1])
const updatedState1 = await streamLoader.resetStateToCommit(
initialState,
stream.allCommitIds[1]
)
expect(updatedState1.log.length).toEqual(2)
expect(updatedState1.content).toEqual(CONTENT0)
expect(updatedState1.next.content).toEqual(CONTENT1)
expect(updatedState1.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)

const updatedState2 = await streamLoader.stateAtCommit(initialState, stream.allCommitIds[2])
const updatedState2 = await streamLoader.resetStateToCommit(
initialState,
stream.allCommitIds[2]
)
expect(updatedState2.log.length).toEqual(3)
expect(updatedState2.content).toEqual(CONTENT1)
expect(updatedState2.next).toBeUndefined()
Expand All @@ -236,9 +242,9 @@ describeIfV3('Streamloader', () => {

const nonExistentCommitID = CommitID.make(doc.id, TestUtils.randomCID())

await expect(streamLoader.stateAtCommit(doc.state, nonExistentCommitID)).rejects.toThrow(
/Timeout error while loading CID/
)
await expect(
streamLoader.resetStateToCommit(doc.state, nonExistentCommitID)
).rejects.toThrow(/Timeout error while loading CID/)
})

test('throw if commit rejected by conflict resolution', async () => {
Expand All @@ -250,7 +256,10 @@ describeIfV3('Streamloader', () => {
const conflictingUpdateCID = await dispatcher.storeEvent(conflictingUpdate, stream.id)

await expect(
streamLoader.stateAtCommit(stream.state, CommitID.make(stream.id, conflictingUpdateCID))
streamLoader.resetStateToCommit(
stream.state,
CommitID.make(stream.id, conflictingUpdateCID)
)
).rejects.toThrow(/rejected by conflict resolution/)
})
})
Expand Down
21 changes: 16 additions & 5 deletions packages/core/src/stream-loading/stream-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export class StreamLoader {
async _loadStateFromTip(
streamID: StreamID,
tip: CID,
allowSyncErrors: boolean
allowSyncErrors: boolean,
opts = { throwOnInvalidCommit: false }
): Promise<StreamState | null> {
let logWithoutTimestamps
try {
Expand All @@ -44,9 +45,7 @@ export class StreamLoader {
streamID,
logWithoutTimestamps
)
return this.stateManipulator.applyFullLog(streamID.type, logWithTimestamps, {
throwOnInvalidCommit: false,
})
return this.stateManipulator.applyFullLog(streamID.type, logWithTimestamps, opts)
}

async _applyTips(streamID: StreamID, tipSource$: Observable<CID>): Promise<StreamState> {
Expand Down Expand Up @@ -135,7 +134,7 @@ export class StreamLoader {
* @param initialState
* @param commitId
*/
async stateAtCommit(initialState: StreamState, commitId: CommitID): Promise<StreamState> {
async resetStateToCommit(initialState: StreamState, commitId: CommitID): Promise<StreamState> {
// Throw if any commit fails to apply as we are trying to load at a specific commit and want
// to error if we can't.
const opts = { throwOnInvalidCommit: true, throwOnConflict: true, throwIfStale: false }
Expand All @@ -161,6 +160,18 @@ export class StreamLoader {
})
}

/**
* Return the state of a stream at a specific CommitID.
* @param commitId
*/
async stateAtCommit(commitId: CommitID): Promise<StreamState> {
// Throw if any commit fails to apply as we are trying to load at a specific commit and want
// to error if we can't.
const opts = { throwOnInvalidCommit: true, throwOnConflict: true, throwIfStale: false }

return this._loadStateFromTip(commitId.baseID, commitId.commit, false, opts)
}

/**
* Completely loads the current state of a Stream from the p2p network just from the StreamID.
* TODO(CDB-2761): Delete this method.
Expand Down
Loading

0 comments on commit 5e92e19

Please sign in to comment.