Skip to content
This repository has been archived by the owner on Sep 13, 2023. It is now read-only.

Commit

Permalink
fix: use stream api instead of channel (#40)
Browse files Browse the repository at this point in the history
Spawning a task has some slightly different async properties compared to the stream api, and in this case, we should prefer the stream api.

This allows us to buffer a preset amount and ties the cancellation of the downstream request more closely to the upstream request.
  • Loading branch information
Daniel-Bloom-dfinity authored Jun 17, 2022
1 parent 0cd1a22 commit 76b4159
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 43 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 34 additions & 41 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use axum::{handler::Handler, routing::get, Extension, Router};
use clap::{crate_authors, crate_version, Parser};
use flate2::read::{DeflateDecoder, GzDecoder};
use futures::{future::OptionFuture, try_join, FutureExt};
use futures::{future::OptionFuture, try_join, FutureExt, StreamExt};
use http_body::{LengthLimitError, Limited};
use hyper::{
body,
body::Bytes,
http::{header::CONTENT_TYPE, uri::Parts},
service::{make_service_fn, service_fn},
Body, Client, Request, Response, Server, StatusCode, Uri,
Expand Down Expand Up @@ -54,7 +53,10 @@ use crate::config::dns_canister_config::DnsCanisterConfig;
type HttpResponseAny = HttpResponse<Token, HttpRequestStreamingCallbackAny>;

// Limit the total number of calls to an HTTP Request loop to 1000 for now.
const MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT: i32 = 1000;
const MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT: usize = 1000;

// Limit the number of Stream Callbacks buffered
const STREAM_CALLBACK_BUFFFER: usize = 2;

// The maximum length of a body we should log as tracing.
const MAX_LOG_BODY_SIZE: usize = 100;
Expand Down Expand Up @@ -419,53 +421,44 @@ async fn forward_request(
};
let is_streaming = http_response.streaming_strategy.is_some();
let response = if let Some(streaming_strategy) = http_response.streaming_strategy {
let (mut sender, body) = body::Body::channel();
let agent = agent.as_ref().clone();
sender.send_data(Bytes::from(http_response.body)).await?;

match streaming_strategy {
StreamingStrategy::Callback(callback) => {
let streaming_canister_id = callback.callback.0.principal;
let method_name = callback.callback.0.method;
let mut callback_token = callback.token;
let logger = logger.clone();
tokio::spawn(async move {
let canister = HttpRequestCanister::create(&agent, streaming_canister_id);
// We have not yet called http_request_stream_callback.
let mut count = 0;
loop {
count += 1;
if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT {
sender.abort();
break;
}

let body = http_response.body;
let body = futures::stream::once(async move { Ok(body) });
let body = match streaming_strategy {
StreamingStrategy::Callback(callback) => body::Body::wrap_stream(
body.chain(futures::stream::try_unfold(
(
logger.clone(),
agent,
callback.callback.0,
Some(callback.token),
),
move |(logger, agent, callback, callback_token)| async move {
let callback_token = match callback_token {
Some(callback_token) => callback_token,
None => return Ok(None),
};

let canister = HttpRequestCanister::create(&agent, callback.principal);
match canister
.http_request_stream_callback(&method_name, callback_token)
.http_request_stream_callback(&callback.method, callback_token)
.call()
.await
{
Ok((StreamingCallbackHttpResponse { body, token },)) => {
if sender.send_data(Bytes::from(body)).await.is_err() {
sender.abort();
break;
}
if let Some(next_token) = token {
callback_token = next_token;
} else {
break;
}
Ok(Some((body, (logger, agent, callback, token))))
}
Err(e) => {
slog::debug!(logger, "Error happened during streaming: {}", e);
sender.abort();
break;
slog::warn!(logger, "Error happened during streaming: {}", e);
Err(e)
}
}
}
});
}
}
},
))
.take(MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT)
.map(|x| async move { x })
.buffered(STREAM_CALLBACK_BUFFFER),
),
};

builder.body(body)?
} else {
Expand Down

0 comments on commit 76b4159

Please sign in to comment.