Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-management: Send ack on close #1059

Merged
merged 2 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions packages/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ class Connection extends EventEmitter {
* https://tools.ietf.org/html/rfc7395#section-3.6
*/
async _closeStream(timeout = this.timeout) {
await this.#runHooks("close");

const fragment = this.footer(this.footerElement());

await this.write(fragment);
Expand Down Expand Up @@ -360,6 +362,49 @@ class Connection extends EventEmitter {

// Override
socketParameters() {}

/* Experimental hooks */
#hooks = new Map();
#hook_events = new Set(["close"]);
hook(event, handler /*priority = 0 TODO */) {
this.#assertHookEventName(event);

if (!this.#hooks.has(event)) {
this.#hooks.set(event, new Set());
}

this.#hooks.get(event).add([handler]);
}
#assertHookEventName(event) {
if (!this.#hook_events.has(event)) {
throw new Error(`Hook event name "${event}" is unknown.`);
}
}
unhook(event, handler) {
this.#assertHookEventName(event);
const handlers = this.#hooks.get("event");
const item = [...handlers].find((item) => item.handler === handler);
handlers.remove(item);
}
async #runHooks(event, ...args) {
this.#assertHookEventName(event);

const hooks = this.#hooks.get(event);
if (!hooks) return;

// TODO run hooks by priority
// run hooks with the same priority in parallel

await Promise.all(
[...hooks].map(async ([handler]) => {
try {
await handler(...args);
} catch (err) {
this.emit("error", err);
}
}),
);
}
}

// Override
Expand Down
2 changes: 1 addition & 1 deletion packages/middleware/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Supports Node.js and browsers.

## Install

```
```sh
npm install @xmpp/middleware
```

Expand Down
13 changes: 13 additions & 0 deletions packages/stream-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ If the session fails to resume, entity will fallback to regular session establis
- Automatically responds to acks.
- Periodically request acks.
- If server fails to respond, triggers a reconnect.
- On reconnect retry sending the queue

When a stanza is re-sent, a [delay element](https://xmpp.org/extensions/xep-0203.html) will be added to it.

- `from` client jid
- `stamp` [date/time](https://xmpp.org/extensions/xep-0082.html) at which the stanza was meant to be sent

```xml
<delay xmlns="urn:xmpp:delay"
from="username@example.net/resource"
stamp="1990-01-01T00:00:00Z"
/>
```

## Events

Expand Down
17 changes: 15 additions & 2 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,22 @@ export default function streamManagement({
requestAckInterval: 30_000,
});

async function sendAck() {
try {
await entity.send(xml("a", { xmlns: NS, h: sm.inbound }));
} catch {}
}

entity.on("disconnect", () => {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);
sm.enabled = false;
});

// It is RECOMMENDED that initiating entities (usually clients) send an element right before they gracefully close the stream, in order to inform the peer about received stanzas
entity.hook("close", async () => {
if (!sm.enabled) return;
await sendAck();
});

async function resumed(resumed) {
Expand Down Expand Up @@ -127,14 +140,14 @@ export default function streamManagement({
sm.id = "";
});

middleware.use((context, next) => {
middleware.use(async (context, next) => {
const { stanza } = context;
clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
await sendAck();
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
ackQueue(+stanza.attrs.h);
Expand Down
15 changes: 15 additions & 0 deletions packages/stream-management/stream-features.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,18 @@ test("resume - failed with something in queue", async () => {
expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
});

test("sends an <a/> element before closing", async () => {
const { entity, streamManagement } = mockClient();
streamManagement.enabled = true;
streamManagement.inbound = 42;
entity.status = "online";

const promise_disconnect = entity.disconnect();

expect(await entity.catchOutgoing()).toEqual(
<a xmlns="urn:xmpp:sm:3" h={streamManagement.inbound} />,
);

await promise_disconnect;
});
Loading