diff --git a/package-lock.json b/package-lock.json index 4d65e006..fd364b3e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13861,6 +13861,7 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", + "@xmpp/time": "^0.14.0", "@xmpp/xml": "^0.14.0" }, "engines": { diff --git a/packages/client-core/src/bind2/bind2.test.js b/packages/client-core/src/bind2/bind2.test.js index b0aac358..23c5fddb 100644 --- a/packages/client-core/src/bind2/bind2.test.js +++ b/packages/client-core/src/bind2/bind2.test.js @@ -66,7 +66,6 @@ test("with function resource returning string", async () => { test("with function resource throwing", async () => { const error = new Error("foo"); - function resource() { throw error; } @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => { test("with function resource returning rejected promise", async () => { const error = new Error("foo"); - async function resource() { throw error; } diff --git a/packages/stream-management/README.md b/packages/stream-management/README.md index 296b8ad4..8bb8bbd5 100644 --- a/packages/stream-management/README.md +++ b/packages/stream-management/README.md @@ -10,7 +10,50 @@ When the session is resumed the `online` event is not emitted as session resumpt However `entity.status` is set to `online`. If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted. -Automatically responds to acks but does not support requesting acks yet. +- Automatically responds to acks. +- Periodically request acks. +- If server fails to respond, triggers a reconnect. + +## Events + +### resumed + +Indicates that the connection was resumed. When that happens the `online` event is not emitted but `xmpp.status` will be `online`. + +```js +const xmpp = client(...); +const {streamManagement} = xmpp; + +streamManagement.on('resumed', () => { + console.log("session resumed"); +}); +``` + +### fail + +Indicates that a stanza failed to send to the server and will not be retried. + +```js +const xmpp = client(...); +const {streamManagement} = xmpp; + +streamManagement.on('fail', (stanza) => { + console.log("fail to send", stanza.toString()); +}); +``` + +### ack + +Indicates that a stanza has been acknowledged by the server. + +```js +const xmpp = client(...); +const {streamManagement} = xmpp; + +streamManagement.on('ack', (stanza) => { + console.log("stanza acknowledge by the server", stanza.toString()); +}); +``` ## References diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 9f162ce3..78d76f23 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,6 +1,7 @@ import XMPPError from "@xmpp/error"; -import { procedure } from "@xmpp/events"; +import { EventEmitter, procedure } from "@xmpp/events"; import xml from "@xmpp/xml"; +import { datetime } from "@xmpp/time"; // https://xmpp.org/extensions/xep-0198.html @@ -45,24 +46,52 @@ export default function streamManagement({ bind2, sasl2, }) { - const sm = { + let timeoutTimeout = null; + let requestAckTimeout = null; + + const sm = new EventEmitter(); + Object.assign(sm, { allowResume: true, preferredMaximum: null, enabled: false, id: "", + outbound_q: [], outbound: 0, inbound: 0, max: null, - }; + timeout: 60_000, + requestAckInterval: 300_000, + debounceAckRequest: 100, + }); + + entity.on("disconnect", () => { + clearTimeout(timeoutTimeout); + clearTimeout(requestAckTimeout); + }); - function resumed() { + async function resumed(resumed) { sm.enabled = true; + const oldOutbound = sm.outbound; + for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { + let item = sm.outbound_q.shift(); + sm.outbound++; + sm.emit("ack", item.stanza); + } + let q = sm.outbound_q; + sm.outbound_q = []; + // This will trigger the middleware and re-add to the queue + await entity.sendMany(q.map((item) => queueToStanza({ entity, item }))); + sm.emit("resumed"); entity._ready(true); } function failed() { sm.enabled = false; sm.id = ""; + let item; + while ((item = sm.outbound_q.shift())) { + sm.emit("fail", item.stanza); + } sm.outbound = 0; } @@ -73,11 +102,20 @@ export default function streamManagement({ } entity.on("online", () => { + if (sm.outbound_q.length > 0) { + throw new Error( + "Stream Management assertion failure, queue should be empty during online", + ); + } sm.outbound = 0; sm.inbound = 0; }); entity.on("offline", () => { + let item; + while ((item = sm.outbound_q.shift())) { + sm.emit("fail", item.stanza); + } sm.outbound = 0; sm.inbound = 0; sm.enabled = false; @@ -86,6 +124,7 @@ export default function streamManagement({ middleware.use((context, next) => { const { stanza } = context; + clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { @@ -93,7 +132,12 @@ export default function streamManagement({ entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). - sm.outbound = stanza.attrs.h; + const oldOutbound = sm.outbound; + for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { + let item = sm.outbound_q.shift(); + sm.outbound++; + sm.emit("ack", item.stanza); + } } return next(); @@ -105,6 +149,33 @@ export default function streamManagement({ if (sasl2) { setupSasl2({ sasl2, sm, failed, resumed }); } + + function requestAck() { + clearTimeout(timeoutTimeout); + if (sm.timeout) { + timeoutTimeout = setTimeout( + () => entity.disconnect().catch(() => {}), + sm.timeout, + ); + } + entity.send(xml("r", { xmlns: NS })).catch(() => {}); + // Periodically send r to check the connection + // If a stanza goes out it will cancel this and set a sooner timer + requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval); + } + + middleware.filter((context, next) => { + if (!sm.enabled) return next(); + const { stanza } = context; + if (!["presence", "message", "iq"].includes(stanza.name)) return next(); + + sm.outbound_q.push({ stanza, stamp: datetime() }); + // Debounce requests so we send only one after a big run of stanza together + clearTimeout(requestAckTimeout); + requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest); + return next(); + }); + if (streamFeatures) { setupStreamFeature({ streamFeatures, @@ -133,8 +204,8 @@ function setupStreamFeature({ // Resuming if (sm.id) { try { - await resume(entity, sm); - resumed(); + const element = await resume(entity, sm); + await resumed(element); return; // If resumption fails, continue with session establishment } catch { @@ -149,6 +220,12 @@ function setupStreamFeature({ const promiseEnable = enable(entity, sm); + if (sm.outbound_q.length > 0) { + throw new Error( + "Stream Management assertion failure, queue should be empty after enable", + ); + } + // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . sm.outbound = 0; @@ -172,7 +249,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) { }, (element) => { if (element.is("resumed")) { - resumed(); + resumed(element); } else if (element.is(failed)) { // const error = StreamError.fromElement(element) failed(); @@ -198,3 +275,20 @@ function setupBind2({ bind2, sm, failed, enabled }) { }, ); } + +function queueToStanza({ entity, item }) { + const { stanza, stamp } = item; + if ( + stanza.name === "message" && + !stanza.getChild("delay", "urn:xmpp:delay") + ) { + stanza.append( + xml("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp, + }), + ); + } + return stanza; +} diff --git a/packages/stream-management/package.json b/packages/stream-management/package.json index 245bdece..e0e3c3dd 100644 --- a/packages/stream-management/package.json +++ b/packages/stream-management/package.json @@ -16,7 +16,8 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", - "@xmpp/xml": "^0.14.0" + "@xmpp/xml": "^0.14.0", + "@xmpp/time": "^0.14.0" }, "engines": { "node": ">= 20.10" diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index a8edc437..4728a1a1 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -22,6 +22,7 @@ test("enable - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -73,6 +74,7 @@ test("enable - message - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -112,6 +114,7 @@ test("enable - failed", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); entity.streamManagement.enabled = true; entity.mockInput( @@ -125,6 +128,34 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); +test("stanza ack", async () => { + const { entity } = mockClient(); + + entity.streamManagement.enabled = true; + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); + // expect(entity.streamManagement.enabled).toBe(true); + + await entity.send(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + entity.mockInput(); + await tick(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(1); + expect(entity.streamManagement.outbound_q).toHaveLength(0); +}); + test("resume - resumed", async () => { const { entity } = mockClient(); @@ -138,6 +169,10 @@ test("resume - resumed", async () => { ); entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [ + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + ]; expect(await entity.catchOutgoing()).toEqual( , @@ -147,11 +182,87 @@ test("resume - resumed", async () => { expect(entity.status).toBe("offline"); - entity.mockInput(); + entity.mockInput(); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual( + + + , + ); await tick(); - expect(entity.streamManagement.outbound).toBe(45); + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + expect(entity.status).toBe("online"); +}); + +test("resumed event", async () => { + const { entity } = mockClient(); + + entity.status = "offline"; + entity.streamManagement.id = "bar"; + + entity.mockInput( + + + , + ); + + entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [ + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + ]; + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + expect(entity.streamManagement.enabled).toBe(false); + + expect(entity.status).toBe("offline"); + + entity.mockInput(); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual( + + + , + ); + + let resumed = false; + entity.streamManagement.on("resumed", () => { + resumed = true; + }); + + await tick(); + + expect(resumed).toBe(true); + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); expect(entity.status).toBe("online"); }); @@ -162,6 +273,7 @@ test("resume - failed", async () => { entity.streamManagement.id = "bar"; entity.streamManagement.enabled = true; entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = []; entity.mockInput( @@ -185,4 +297,46 @@ test("resume - failed", async () => { expect(entity.streamManagement.id).toBe(""); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); +}); + +test("resume - failed with something in queue", async () => { + const { entity } = mockClient(); + + entity.status = "bar"; + entity.streamManagement.id = "bar"; + entity.streamManagement.enabled = true; + entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [{ stanza: "hai" }]; + + entity.mockInput( + + + , + ); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + entity.mockInput( + + + , + ); + + let failures = 0; + entity.streamManagement.on("fail", (failed) => { + failures++; + expect(failed).toBe("hai"); + }); + + await tick(); + + expect(failures).toBe(1); + expect(entity.status).toBe("bar"); + expect(entity.streamManagement.id).toBe(""); + expect(entity.streamManagement.enabled).toBe(false); + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); }); diff --git a/packages/test/mockClient.js b/packages/test/mockClient.js index c7025edd..45eff046 100644 --- a/packages/test/mockClient.js +++ b/packages/test/mockClient.js @@ -5,6 +5,11 @@ import context from "./context.js"; export default function mockClient(options) { const xmpp = client(options); xmpp.send = Connection.prototype.send; + xmpp.sendMany = async (stanzas) => { + for (const stanza of stanzas) { + await xmpp.send(stanza); + } + }; const ctx = context(xmpp); return Object.assign(xmpp, ctx); } diff --git a/test/stream-management.js b/test/stream-management.js new file mode 100644 index 00000000..ef66faed --- /dev/null +++ b/test/stream-management.js @@ -0,0 +1,106 @@ +import { client } from "../packages/client/index.js"; +import { promise } from "../packages/events/index.js"; +import { datetime } from "../packages/time/index.js"; +import debug from "../packages/debug/index.js"; +import server from "../server/index.js"; + +const username = "client"; +const password = "foobar"; +const credentials = { username, password }; +const domain = "localhost"; + +let xmpp; + +afterEach(async () => { + await xmpp?.stop(); + await server.reset(); +}); + +test("client ack stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "ack"); + await xmpp.start(); + await xmpp.send( + + + , + ); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client fail stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "fail"); + await xmpp.start(); + // Expect send but don't actually send to server, so it will fail + await xmpp.streamManagement.outbound_q.push({ + stanza: ( + + + + ), + stamp: datetime(), + }); + await xmpp.stop(); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client retry stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "ack"); + await xmpp.start(); + // Add to queue but don't actually send so it can retry after disconnect + await xmpp.streamManagement.outbound_q.push({ + stanza: ( + + + + ), + stamp: datetime(), + }); + await xmpp.disconnect(); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client reconnects when server fails to ack", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + xmpp.streamManagement.timeout = 10; + xmpp.streamManagement.debounceAckRequest = 1; + debug(xmpp); + + const resumedP = promise(xmpp.streamManagement, "resumed"); + await xmpp.start(); + await xmpp.send( + + + , + ); + xmpp.socket.socket.pause(); + + await resumedP; + expect().pass(); +}); diff --git a/test/stream-management.test.js b/test/stream-management.test.js new file mode 100644 index 00000000..3033ded8 --- /dev/null +++ b/test/stream-management.test.js @@ -0,0 +1,115 @@ +import { client } from "../packages/client/index.js"; +import { promise } from "../packages/events/index.js"; +import { datetime } from "../packages/time/index.js"; +import debug from "../packages/debug/index.js"; +import server from "../server/index.js"; + +const username = "client"; +const password = "foobar"; +const credentials = { username, password }; +const domain = "localhost"; + +let xmpp; + +afterEach(async () => { + await xmpp?.stop(); + await server.reset(); +}); + +test("client ack stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const promise_ack = promise(xmpp.streamManagement, "ack"); + await xmpp.start(); + await xmpp.send( + + + , + ); + + const el = await promise_ack; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client fail stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const promise_fail = promise(xmpp.streamManagement, "fail"); + await xmpp.start(); + // Expect send but don't actually send to server, so it will fail + await xmpp.streamManagement.outbound_q.push({ + stanza: ( + + + + ), + stamp: datetime(), + }); + await xmpp.stop(); + + const el = await promise_fail; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client retry stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const promise_ack = promise(xmpp.streamManagement, "ack"); + await xmpp.start(); + // Add to queue but don't actually send so it can retry after disconnect + await xmpp.streamManagement.outbound_q.push({ + stanza: ( + + + + ), + stamp: datetime(), + }); + // Do not close the stream so that stream resumption can happen + await xmpp._closeSocket(); + await xmpp.disconnect(); + + const el = await promise_ack; + expect(el.attrs.id).toEqual("ping"); +}); + +test( + "client reconnects when server fails to ack stanza", + async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + xmpp.streamManagement.timeout = 10; + xmpp.streamManagement.debounceAckRequest = 1; + debug(xmpp, true); + + const promise_resumed = promise(xmpp.streamManagement, "resumed"); + await xmpp.start(); + xmpp.send( + + + , + ); + + // Pretend we don't receive the ack by removing event listeners + // on the socket + xmpp._detachSocket(); + + await promise_resumed; + expect().pass(); + }, + 1000 * 10, +);