Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to add_stream() to only receive new messages #13

Open
Boscop opened this issue Apr 17, 2017 · 8 comments
Open

Ability to add_stream() to only receive new messages #13

Boscop opened this issue Apr 17, 2017 · 8 comments
Assignees

Comments

@Boscop
Copy link

Boscop commented Apr 17, 2017

I'm using multiqueue::broadcast_queue in a situation where I spawn a new thread for each websocket client and call add_stream() for each before spawning the thread, so it gets moved into the thread. (Sending msgs from my main thread to the browser clients).
The problem is, a thread receives old messages that were sent before calling .add_stream(), I think it would be useful to have a way to add a stream such that the new stream only receives new messages.

E.g. this is an example of such a scenario:
http://dpaste.com/1CRPJYN
In line 37 I'm emptying the receiver before I call add_stream(), as a workaround, so it doesn't get old messages.

Also it would be useful to have an queue with unbounded capacity.

Btw, my assumption is that I should use add_stream() when I want messages to be sent to both receivers, and clone() when I want both receivers to compete for dispatching the messages (the first one that asks for a message gets it, the other one doesn't). Is this assumption correct?

@schets schets self-assigned this Apr 18, 2017
@schets
Copy link
Owner

schets commented Apr 18, 2017

Yeah, your assumption about clone vs add-stream is correct.

It's pretty simple to make a best-effort attempt to create a new reader at the current write position, but this will either:

  • Be inherently racy
  • Loop indefinitely while writers keep updating

On the unbounded queue: Multiqueue is never going to be unbounded. For the things that I intended it for (high throughput+low latency messaging) unboundedness would incur some pretty undesirable performance costs.

@Boscop
Copy link
Author

Boscop commented Apr 18, 2017

The problem is, even with this workaround, it only works if a new client connects before the receiver queue is full (since if gets emptied when a client connects in line 37). Otherwise it crashes.
Is there a way I can take out the sent value right after I send it in line 18?
The problem is, then the ui_state thread owns it so it can't be moved into the closure below.
What's the best way to solve this?
I need to keep a receiver somewhere to clone it to be able to pass it to the spawned threads for the clients.

@sschetterer
Copy link

The problem is, even with this workaround, it only works if a new client connects before the receiver queue is full (since if gets emptied when a client connects in line 37). Otherwise it crashes.

There are ways to deal with this outside of crashing on a full queue, but with a bounded queue you fundamentally have this issue. What happens if the UI thread can't communicate back to the main thread? It should deal with that instead of crashing or just piling up messages. A prototyping way to deal with this would be to spin on send. I'm going to add a proper blocking api and the futures api already supports blocking send.

I need to keep a receiver somewhere to clone it to be able to pass it to the spawned threads for the clients.

I've been thinking of a way to do this without requiring an existing reader. Using an existing reader provides a rudimentary 'pin' of sorts so that writers can't overrun the index of the existing reader. I think I've figured out a way around this though, but I'm not sure if it works.

@Boscop
Copy link
Author

Boscop commented Apr 18, 2017

Is your solution available somewhere on a branch?
Or do you know how I can solve this problem in my use case in the meantime? :)

@schets
Copy link
Owner

schets commented Apr 18, 2017

I'm not even sure if the solution is correct, it only exists in my head up till this point. If you want to send without crashing, for now you can spin until the send works.

I've got a whole lot on my plate and probably won't be able to look at this in depth until the weekend

@Boscop
Copy link
Author

Boscop commented Apr 18, 2017

By spin do you mean trying to send in a loop?

Then it won't crash, but the problem is, the receiver (the one that gets cloned to pass to the threads) only gets emptied when a new client connects.
In this application, I only have 1 active client at a time (it's a web UI for my Rust app), and it will only reconnect when I refresh the tab. So it goes a long time without being able to empty the queue, but there are a lot of msgs being sent back and forth between UI and application.
So how can I empty the queue more frequently to avoid blocking on send? It has to be moved into the closure that is passed to listen(), so I can't e.g. empty it in the ui_state thread whenever I send from there...

@schets
Copy link
Owner

schets commented Oct 25, 2017

I'm going to make an api change that would support such behavior in a best-effort sense, as I've realized the other way leads to some edge-case race conditions

@Boscop
Copy link
Author

Boscop commented Oct 2, 2019

@schets Any update on this? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants