From dd93f3315c2bdcec8dac6d4e0b19496fc87f81c3 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Mon, 20 Jan 2025 11:32:18 +0800 Subject: [PATCH 1/3] fix: check if stream is orphan on connection --- src/api.js | 45 ++++++++++++++++++++++++++++++++++ src/y-socket-io/y-socket-io.js | 4 +++ 2 files changed, 49 insertions(+) diff --git a/src/api.js b/src/api.js index a3db515..2d4aff0 100644 --- a/src/api.js +++ b/src/api.js @@ -120,6 +120,41 @@ export class Api { url, // scripting: https://github.com/redis/node-redis/#lua-scripts scripts: { + checkAndRecoverWorkerStream: redis.defineScript({ + NUMBER_OF_KEYS: 1, + SCRIPT: ` + local found = false + local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${this.redisWorkerStreamName}", "0-0") + + if messages and #messages > 0 then + local entries = messages[1][2] + for _, entry in ipairs(entries) do + -- Each entry is an array where entry[2] is the message fields + if entry[2][2] == KEYS[1] then + found = true + break + end + end + end + + -- If stream not found in y:worker and the stream exists, add it + if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then + redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) + end + `, + /** + * @param {string} key + */ + transformArguments (key) { + return [key] + }, + /** + * @param {null} x + */ + transformReply (x) { + return x + } + }), addMessage: redis.defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: ` @@ -211,6 +246,16 @@ export class Api { return this.redis.addMessage(computeRedisRoomStreamName(room, docid, this.prefix), m) } + /** + * @param {string} room + * @param {string} docid + */ + async checkAndRecoveryWorkerStream (room, docid) { + await this.redis.checkAndRecoverWorkerStream( + computeRedisRoomStreamName(room, docid, this.prefix), + ) + } + /** * @param {string} room * @param {string} docid diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 2993fd0..4f46346 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -157,6 +157,10 @@ export class YSocketIO { this.initAwarenessListeners(socket) this.initSocketListeners(socket) + await this.client.checkAndRecoveryWorkerStream( + this.getNamespaceString(socket.nsp), + 'index', + ) const doc = await this.client.getDoc(namespace, 'index') if ( From 441b251f0b084c2362b703bf5b21f8151b889037 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Mon, 20 Jan 2025 11:34:38 +0800 Subject: [PATCH 2/3] fix: lint --- src/api.js | 2 +- src/y-socket-io/y-socket-io.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api.js b/src/api.js index 2d4aff0..2ca13c9 100644 --- a/src/api.js +++ b/src/api.js @@ -252,7 +252,7 @@ export class Api { */ async checkAndRecoveryWorkerStream (room, docid) { await this.redis.checkAndRecoverWorkerStream( - computeRedisRoomStreamName(room, docid, this.prefix), + computeRedisRoomStreamName(room, docid, this.prefix) ) } diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 4f46346..5e1903e 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -159,7 +159,7 @@ export class YSocketIO { await this.client.checkAndRecoveryWorkerStream( this.getNamespaceString(socket.nsp), - 'index', + 'index' ) const doc = await this.client.getDoc(namespace, 'index') From 6f73173b3654b6ad90f732c3ade8ead4bf1c6a1a Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Mon, 20 Jan 2025 11:43:58 +0800 Subject: [PATCH 3/3] chore: reuse existing vars --- src/y-socket-io/y-socket-io.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 5e1903e..78779c0 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -157,10 +157,7 @@ export class YSocketIO { this.initAwarenessListeners(socket) this.initSocketListeners(socket) - await this.client.checkAndRecoveryWorkerStream( - this.getNamespaceString(socket.nsp), - 'index' - ) + await this.client.checkAndRecoveryWorkerStream(namespace, 'index') const doc = await this.client.getDoc(namespace, 'index') if (