Skip to content

Commit

Permalink
bring back tokio::spawn which was removed on #122. I thought it was u…
Browse files Browse the repository at this point in the history
…nnecessary but I was wrong. Subscription request needs to complete with Ok(()) while background task will keep sending updates. (#123)
  • Loading branch information
ermalkaleci authored Nov 1, 2023
1 parent ddd665a commit d34cdd8
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions src/middlewares/subscriptions/merge_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,31 +230,34 @@ impl Middleware<SubscriptionRequest, Result<(), StringError>> for MergeSubscript
.await?;

// broadcast new values
let mut stream = subscribe();

loop {
tokio::select! {
resp = stream.recv() => {
match resp {
Ok(new_value) => {
if let Err(e) = sink.send(new_value).await {
tracing::trace!("subscription sink closed {e:?}");
break;
tokio::spawn(async move {
// create receiver inside task to avoid msg been broadcast before stream.recv() is hit
let mut stream = subscribe();

loop {
tokio::select! {
resp = stream.recv() => {
match resp {
Ok(new_value) => {
if let Err(e) = sink.send(new_value).await {
tracing::trace!("subscription sink closed {e:?}");
break;
}
}
Err(e) => {
// this should never happen
tracing::error!("subscription stream error {e:?}");
unreachable!("subscription stream error {e:?}");
}
}
Err(e) => {
// this should never happen
tracing::error!("subscription stream error {e:?}");
unreachable!("subscription stream error {e:?}");
}
}
}
_ = sink.closed() => {
tracing::trace!("subscription sink closed");
break;
_ = sink.closed() => {
tracing::trace!("subscription sink closed");
break;
}
}
}
}
});

Ok(())
}
Expand Down

0 comments on commit d34cdd8

Please sign in to comment.