Skip to content
This repository has been archived by the owner on Sep 13, 2023. It is now read-only.

Commit

Permalink
fix: unzip and streaming bugs (#29)
Browse files Browse the repository at this point in the history
Also bump compiler because we have to.
  • Loading branch information
Daniel-Bloom-dfinity authored Mar 31, 2022
1 parent 330deb1 commit f289318
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/fmt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust: [ '1.55.0' ]
rust: [ '1.58.1' ]
os: [ ubuntu-latest ]

steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust: [ '1.55.0', '1.58.1' ]
rust: [ '1.58.1' ]
os: [ ubuntu-latest ]

steps:
Expand Down
10 changes: 2 additions & 8 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
rust: [ '1.55.0' ]
rust: [ '1.58.1' ]
target: [ x86_64-apple-darwin, x86_64-unknown-linux-musl, x86_64-unknown-linux-gnu ]
include:
- os: macos-latest
Expand Down Expand Up @@ -51,17 +51,11 @@ jobs:
override: true
if: contains(matrix.os, 'macos')

- name: Linux hack (musl only)
run: |
echo "1.58.1" >./rust-toolchain
if: contains(matrix.target, 'linux-musl')

- name: Linux build (musl)
uses: dfinity/rust-musl-action@master
with:
args: |
cargo install cargo-deb --target x86_64-unknown-linux-musl
echo "1.55.0" >./rust-toolchain
rustup target add x86_64-unknown-linux-musl
RUSTFLAGS="--remap-path-prefix=${GITHUB_WORKSPACE}=/builds/dfinity" cargo deb --target x86_64-unknown-linux-musl -- --locked --features=skip_body_verification
if: contains(matrix.target, 'linux-musl')
Expand Down Expand Up @@ -123,7 +117,7 @@ jobs:
strategy:
fail-fast: false
matrix:
rust: [ '1.55.0' ]
rust: [ '1.58.1' ]
target: [ x86_64-apple-darwin, x86_64-unknown-linux-musl, x86_64-unknown-linux-gnu ]
include:
- os: macos-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust: [ '1.55.0' ]
rust: [ '1.58.1' ]
os: [ ubuntu-latest, macos-latest, windows-latest ]

steps:
Expand Down
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ garcon = { version = "0.2", features = ["async"] }
hex = "0.4"
hyper = { version = "0.14", features = ["full"] }
hyper-tls = "0.5"
ic-agent = "0.13"
ic-utils = "0.13"
ic-agent = { version = "0.15" }
ic-utils = { version = "0.15", features = ["raw"] }
lazy-regex = "2"
tokio = { version = "1", features = ["full"] }
serde = "1"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.55.0
1.58.1
77 changes: 38 additions & 39 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::dns_canister_config::DnsCanisterConfig;
use clap::{crate_authors, crate_version, AppSettings, Parser};
use clap::{crate_authors, crate_version, Parser};
use flate2::read::{DeflateDecoder, GzDecoder};
use hyper::{
body,
Expand All @@ -18,8 +18,8 @@ use ic_utils::{
call::AsyncCall,
call::SyncCall,
interfaces::http_request::{
HeaderField, HttpRequestCanister, HttpResponse, StreamingCallbackHttpResponse,
StreamingStrategy,
HeaderField, HttpRequestCanister, HttpRequestStreamingCallbackAny, HttpResponse,
StreamingCallbackHttpResponse, StreamingStrategy, Token,
},
};
use lazy_regex::regex_captures;
Expand All @@ -41,6 +41,8 @@ use std::{
mod config;
mod logging;

type HttpResponseAny = HttpResponse<Token, HttpRequestStreamingCallbackAny>;

// Limit the total number of calls to an HTTP Request loop to 1000 for now.
const MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT: i32 = 1000;

Expand Down Expand Up @@ -290,23 +292,24 @@ async fn forward_request(
&request.version()
);

let method = request.method().to_string();
let uri = request.uri().clone();
let headers = request
.headers()
.into_iter()
let (parts, body) = request.into_parts();
let method = parts.method;
let uri = parts.uri.to_string();
let headers = parts
.headers
.iter()
.filter_map(|(name, value)| {
Some(HeaderField(
name.to_string(),
value.to_str().ok()?.to_string(),
name.as_str().into(),
value.to_str().ok()?.into(),
))
})
.inspect(|HeaderField(name, value)| {
slog::trace!(logger, "<< {}: {}", name, value);
})
.collect::<Vec<_>>();

let entire_body = body::to_bytes(request.into_body()).await?.to_vec();
let entire_body = body::to_bytes(body).await?.to_vec();

slog::trace!(logger, "<<");
if logger.is_trace_enabled() {
Expand All @@ -327,18 +330,18 @@ async fn forward_request(

let canister = HttpRequestCanister::create(agent.as_ref(), canister_id);
let query_result = canister
.http_request(
method.clone(),
uri.to_string(),
headers.clone(),
.http_request_custom(
method.as_str(),
uri.as_str(),
headers.iter().cloned(),
&entire_body,
)
.call()
.await;

fn handle_result(
result: Result<(HttpResponse,), AgentError>,
) -> Result<HttpResponse, Result<Response<Body>, Box<dyn Error>>> {
result: Result<(HttpResponseAny,), AgentError>,
) -> Result<HttpResponseAny, Result<Response<Body>, Box<dyn Error>>> {
// If the result is a Replica error, returns the 500 code and message. There is no information
// leak here because a user could use `dfx` to get the same reply.
match result {
Expand All @@ -365,7 +368,12 @@ async fn forward_request(
.timeout(std::time::Duration::from_secs(15))
.build();
let update_result = canister
.http_request_update(method, uri.to_string(), headers, &entire_body)
.http_request_update_custom(
method.as_str(),
uri.as_str(),
headers.iter().cloned(),
&entire_body,
)
.call_and_wait(waiter)
.await;
let http_response = match handle_result(update_result) {
Expand All @@ -379,7 +387,7 @@ async fn forward_request(

let mut builder = Response::builder().status(StatusCode::from_u16(http_response.status_code)?);
for HeaderField(name, value) in &http_response.headers {
builder = builder.header(name, value);
builder = builder.header(name.as_ref(), value.as_ref());
}

let headers_data = extract_headers_data(&http_response.headers, &logger);
Expand All @@ -396,12 +404,12 @@ async fn forward_request(

match streaming_strategy {
StreamingStrategy::Callback(callback) => {
let streaming_canister_id_id = callback.callback.principal;
let method_name = callback.callback.method;
let streaming_canister_id = callback.callback.0.principal;
let method_name = callback.callback.0.method;
let mut callback_token = callback.token;
let logger = logger.clone();
tokio::spawn(async move {
let canister = HttpRequestCanister::create(&agent, streaming_canister_id_id);
let canister = HttpRequestCanister::create(&agent, streaming_canister_id);
// We have not yet called http_request_stream_callback.
let mut count = 0;
loop {
Expand Down Expand Up @@ -444,7 +452,7 @@ async fn forward_request(
&headers_data,
&canister_id,
&agent,
&uri,
&parts.uri,
&http_response.body,
logger.clone(),
);
Expand Down Expand Up @@ -520,13 +528,8 @@ fn validate(
&body_sha,
logger.clone(),
) {
Ok(valid) => {
if valid {
Ok(())
} else {
Err("Body does not pass verification".to_string())
}
}
Ok(true) => Ok(()),
Ok(false) => Err("Body does not pass verification".to_string()),
Err(e) => Err(format!("Certificate validation failed: {}", e)),
},
(Some(_), _) | (_, Some(_)) => Err("Body does not pass verification".to_string()),
Expand All @@ -552,30 +555,26 @@ fn decode_body_to_sha256(body: &[u8], encoding: Option<String>) -> Option<[u8; 3
let mut decoder = GzDecoder::new(body);
for _ in 0..MAX_CHUNKS_TO_DECOMPRESS {
let bytes = decoder.read(&mut decoded).ok()?;
sha256.update(&decoded[0..bytes]);
if bytes < MAX_CHUNK_SIZE_TO_DECOMPRESS {
if bytes == 0 {
return Some(sha256.finalize().into());
}
sha256.update(&decoded[0..bytes]);
}
if decoder.bytes().next().is_some() {
return None;
} else {
return Some(sha256.finalize().into());
}
}
Some("deflate") => {
let mut decoder = DeflateDecoder::new(body);
for _ in 0..MAX_CHUNKS_TO_DECOMPRESS {
let bytes = decoder.read(&mut decoded).ok()?;
sha256.update(&decoded[0..bytes]);
if bytes < MAX_CHUNK_SIZE_TO_DECOMPRESS {
if bytes == 0 {
return Some(sha256.finalize().into());
}
sha256.update(&decoded[0..bytes]);
}
if decoder.bytes().next().is_some() {
return None;
} else {
return Some(sha256.finalize().into());
}
}
_ => sha256.update(body),
Expand All @@ -601,7 +600,7 @@ fn validate_body(
let tree: HashTree =
serde_cbor::from_slice(certificates.tree).map_err(AgentError::InvalidCborData)?;

if let Err(e) = agent.verify(&cert) {
if let Err(e) = agent.verify(&cert, *canister_id, false) {
slog::trace!(logger, ">> certificate failed verification: {}", e);
return Ok(false);
}
Expand Down

0 comments on commit f289318

Please sign in to comment.