Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-management: Implement requesting ACKs #1005

Closed
wants to merge 21 commits into from

Conversation

singpolyma
Copy link
Contributor

Queue outgoing stanzas and periodically request ACK. Remove from the queue anything ack'd and notify of the ack so apps can know the stanza has for sure sent.

On resume, anything not ack'd is re-sent. On reconnect, anything not ack'd notify of the failure to send this stanza so apps can know the stanza failed.

Even when there is no traffic, send an at least every 5 minutes to check the connection. If there is no inbound traffic (such as an ) within timeout (default 60 seconds) then consider the connection disconnected.

Copy link
Member

@sonnyp sonnyp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

Tests are needed, in particular because this implements quite a complex behavior

In general I try to avoid "internal" XMPP events on entity . They also don't appear to be used. Are they needed? What for and in which case they should be documented. Possibly also emitted on the object returned by function streamManagement

packages/xml/index.js Outdated Show resolved Hide resolved
packages/stream-management/index.js Outdated Show resolved Hide resolved
package.json Outdated Show resolved Hide resolved
@singpolyma
Copy link
Contributor Author

In general I try to avoid "internal" XMPP events on entity . They also don't appear to be used. Are they needed?

Yes I am using these events in the snikket SDK to eg know when a message has been acknowledged or not, so I can update my state to know if it made it to the server. Most apps even show this status information (eg one check in conversations means "stream management ack")

Queue outgoing stanzas and periodically request ACK.  Remove from the
queue anything ack'd and notify of the ack so apps can know the stanza
has for sure sent.

On resume, anything not ack'd is re-sent.  On reconnect, anything not
ack'd notify of the failure to send this stanza so apps can know the
stanza failed.

Even when there is no traffic, send an <r/> at least every 5 minutes to check the
connection.  If there is no inbound traffic (such as an <a/>) within
timeout (default 60 seconds) then consider the connection disconnected.
@singpolyma singpolyma force-pushed the implement-sm-for-outgoing branch from 01f2a17 to c4d4bb8 Compare January 7, 2025 17:01
packages/stream-management/index.js Outdated Show resolved Hide resolved
packages/stream-management/index.js Outdated Show resolved Hide resolved
packages/stream-management/index.js Outdated Show resolved Hide resolved
!qStanza.getChild("delay", "urn:xmpp:delay")
) {
qStanza = xml.clone(qStanza);
qStanza.c("delay", {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use append

packages/stream-management/index.js Outdated Show resolved Hide resolved
packages/stream-management/index.js Outdated Show resolved Hide resolved
packages/stream-management/index.js Outdated Show resolved Hide resolved
let q = sm.outbound_q;
sm.outbound_q = [];
for (const item of q) {
await entity.send(item); // This will trigger the middleware and re-add to the queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if entity.send rejects ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then resumed will fail. What context causes send to reject?

expect(entity.streamManagement.outbound_q).toHaveLength(1);
expect(
entity.streamManagement.outbound_q[0].getChild("delay", "urn:xmpp:delay"),
).not.toBeUndefined();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this to a new test

@sonnyp
Copy link
Member

sonnyp commented Jan 8, 2025

Also

  • Update stream management README
  • Add references to the spec as comments where relevant
  • Add an e2e test ( see /test/sasl.js for example)

@singpolyma
Copy link
Contributor Author

singpolyma commented Jan 8, 2025

Add references to the spec as comments where relevant

looks like link to xep is already there. was there something else you're looking for?

@sonnyp sonnyp changed the title Implement stream management requesting ACKs stream-management: Implement requesting ACKs Jan 9, 2025
Copy link
Member

@sonnyp sonnyp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like link to xep is already there. was there something else you're looking for?

direct quotes

Comment on lines +187 to +191
// Debounce requests so we send only one after a big run of stanza together
clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xmpp.js has a sendMany method, could we hook to that instead (if not in this PR then later)?

packages/stream-management/index.js Show resolved Hide resolved
test/stream-management.js Show resolved Hide resolved
test/stream-management.js Outdated Show resolved Hide resolved
test/stream-management.js Outdated Show resolved Hide resolved
Comment on lines +105 to +110
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of

Suggested change
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
sm.emit("fail", sm.outbound_q);
sm.outbound_q = [];

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That feels like it would make the library internals very slightly simpler in exchange for making the API slightly more complex? Is there a reason you're interested in this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands xmpp.js is a fairly low level XMPP library. I prefer abstractions to reflect the protocols to leave room for optimizations.

If that changes then we can think of what the API should be like and deal with persistence, tab reload etc

Comment on lines +88 to +95
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of something like that instead

Suggested change
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
const items = sm.outbound_q.slice(-resumed.attrs.h);
sm.outbound += resumed.attrs.h;
sm.emit("ack", items);

packages/stream-management/index.js Outdated Show resolved Hide resolved
packages/stream-management/index.js Outdated Show resolved Hide resolved
Needs to use force-disconnect especially if we emulate an issue using pause
@sonnyp
Copy link
Member

sonnyp commented Jan 15, 2025

#1054

@sonnyp sonnyp closed this Jan 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants