diff --git a/packages/connection/index.js b/packages/connection/index.js index 5c1cfcbb..600ea1ff 100644 --- a/packages/connection/index.js +++ b/packages/connection/index.js @@ -285,6 +285,8 @@ class Connection extends EventEmitter { * https://tools.ietf.org/html/rfc7395#section-3.6 */ async _closeStream(timeout = this.timeout) { + await this.#runHooks("close"); + const fragment = this.footer(this.footerElement()); await this.write(fragment); @@ -360,6 +362,49 @@ class Connection extends EventEmitter { // Override socketParameters() {} + + /* Experimental hooks */ + #hooks = new Map(); + #hook_events = new Set(["close"]); + hook(event, handler /*priority = 0 TODO */) { + this.#assertHookEventName(event); + + if (!this.#hooks.has(event)) { + this.#hooks.set(event, new Set()); + } + + this.#hooks.get(event).add([handler]); + } + #assertHookEventName(event) { + if (!this.#hook_events.has(event)) { + throw new Error(`Hook event name "${event}" is unknown.`); + } + } + unhook(event, handler) { + this.#assertHookEventName(event); + const handlers = this.#hooks.get("event"); + const item = [...handlers].find((item) => item.handler === handler); + handlers.remove(item); + } + async #runHooks(event, ...args) { + this.#assertHookEventName(event); + + const hooks = this.#hooks.get(event); + if (!hooks) return; + + // TODO run hooks by priority + // run hooks with the same priority in parallel + + await Promise.all( + [...hooks].map(async ([handler]) => { + try { + await handler(...args); + } catch (err) { + this.emit("error", err); + } + }), + ); + } } // Override diff --git a/packages/middleware/README.md b/packages/middleware/README.md index 612f01a4..3f2f38af 100644 --- a/packages/middleware/README.md +++ b/packages/middleware/README.md @@ -6,7 +6,7 @@ Supports Node.js and browsers. ## Install -``` +```sh npm install @xmpp/middleware ``` diff --git a/packages/stream-management/README.md b/packages/stream-management/README.md index 8bb8bbd5..a5ea5054 100644 --- a/packages/stream-management/README.md +++ b/packages/stream-management/README.md @@ -13,6 +13,19 @@ If the session fails to resume, entity will fallback to regular session establis - Automatically responds to acks. - Periodically request acks. - If server fails to respond, triggers a reconnect. +- On reconnect retry sending the queue + +When a stanza is re-sent, a [delay element](https://xmpp.org/extensions/xep-0203.html) will be added to it. + +- `from` client jid +- `stamp` [date/time](https://xmpp.org/extensions/xep-0082.html) at which the stanza was meant to be sent + +```xml + +``` ## Events diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 5d161d97..6980585e 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -63,9 +63,22 @@ export default function streamManagement({ requestAckInterval: 30_000, }); + async function sendAck() { + try { + await entity.send(xml("a", { xmlns: NS, h: sm.inbound })); + } catch {} + } + entity.on("disconnect", () => { clearTimeout(timeoutTimeout); clearTimeout(requestAckTimeout); + sm.enabled = false; + }); + + // It is RECOMMENDED that initiating entities (usually clients) send an element right before they gracefully close the stream, in order to inform the peer about received stanzas + entity.hook("close", async () => { + if (!sm.enabled) return; + await sendAck(); }); async function resumed(resumed) { @@ -127,14 +140,14 @@ export default function streamManagement({ sm.id = ""; }); - middleware.use((context, next) => { + middleware.use(async (context, next) => { const { stanza } = context; clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { // > When an element ("request") is received, the recipient MUST acknowledge it by sending an element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the element. - entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); + await sendAck(); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). ackQueue(+stanza.attrs.h); diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 69f2fa64..79a07292 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -336,3 +336,18 @@ test("resume - failed with something in queue", async () => { expect(entity.streamManagement.outbound).toBe(0); expect(entity.streamManagement.outbound_q).toBeEmpty(); }); + +test("sends an element before closing", async () => { + const { entity, streamManagement } = mockClient(); + streamManagement.enabled = true; + streamManagement.inbound = 42; + entity.status = "online"; + + const promise_disconnect = entity.disconnect(); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + await promise_disconnect; +});