Skip to content

Commit

Permalink
Merge pull request #18 from hackmdio/fix/recovery-mechanism-for-orpha…
Browse files Browse the repository at this point in the history
…n-streams

fix/recovery mechanism for orphan streams
  • Loading branch information
Yukaii authored Jan 20, 2025
2 parents 23494c9 + 6f73173 commit b361280
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
45 changes: 45 additions & 0 deletions src/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,41 @@ export class Api {
url,
// scripting: https://github.com/redis/node-redis/#lua-scripts
scripts: {
checkAndRecoverWorkerStream: redis.defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
local found = false
local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${this.redisWorkerStreamName}", "0-0")
if messages and #messages > 0 then
local entries = messages[1][2]
for _, entry in ipairs(entries) do
-- Each entry is an array where entry[2] is the message fields
if entry[2][2] == KEYS[1] then
found = true
break
end
end
end
-- If stream not found in y:worker and the stream exists, add it
if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
end
`,
/**
* @param {string} key
*/
transformArguments (key) {
return [key]
},
/**
* @param {null} x
*/
transformReply (x) {
return x
}
}),
addMessage: redis.defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
Expand Down Expand Up @@ -211,6 +246,16 @@ export class Api {
return this.redis.addMessage(computeRedisRoomStreamName(room, docid, this.prefix), m)
}

/**
* @param {string} room
* @param {string} docid
*/
async checkAndRecoveryWorkerStream (room, docid) {
await this.redis.checkAndRecoverWorkerStream(
computeRedisRoomStreamName(room, docid, this.prefix)
)
}

/**
* @param {string} room
* @param {string} docid
Expand Down
1 change: 1 addition & 0 deletions src/y-socket-io/y-socket-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ export class YSocketIO {
this.initAwarenessListeners(socket)
this.initSocketListeners(socket)

await this.client.checkAndRecoveryWorkerStream(namespace, 'index')
const doc = await this.client.getDoc(namespace, 'index')

if (
Expand Down

0 comments on commit b361280

Please sign in to comment.