Skip to content

Commit

Permalink
feat: support XCocaineHTTPProtoVersion logic
Browse files Browse the repository at this point in the history
XCocaineHTTPProtoVersion sets when the reply must be sent

Signed-off-by: Anton Tiurin <noxiouz@yandex.ru>
  • Loading branch information
noxiouz committed Jun 20, 2017
1 parent 98b13f3 commit 2c0f6e8
Showing 1 changed file with 72 additions and 1 deletion.
73 changes: 72 additions & 1 deletion src/route/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::error;
use std::fmt::{self, Display, Formatter};
use std::str;
use std::str::FromStr;
use std::sync::Arc;

use byteorder::{ByteOrder, LittleEndian};
Expand All @@ -28,10 +29,12 @@ use cocaine::protocol::{self, Flatten};
use logging::AccessLogger;
use pool::{Event, EventDispatch, Settings};
use route::{Match, Route};
use self::CocaineHttpProtoVersion::{Proto10, Proto11};

header! { (XCocaineService, "X-Cocaine-Service") => [String] }
header! { (XCocaineEvent, "X-Cocaine-Event") => [String] }
header! { (XPoweredBy, "X-Powered-By") => [String] }
header! { (XCocaineHTTPProtoVersion, "X-Cocaine-HTTP-Proto-Version") => [String] }

#[derive(Clone, Copy, Debug, PartialEq)]
struct XRequestId(u64);
Expand Down Expand Up @@ -305,6 +308,7 @@ impl AppWithSafeRetry {
tx: tx,
body: None,
response: Some(Response::new()),
version: None,
}).and_then(move |tx| {
let buf = rmps::to_vec(&request.frame).unwrap();
tx.send(0, &[unsafe { ::std::str::from_utf8_unchecked(&buf) }]);
Expand Down Expand Up @@ -370,6 +374,8 @@ enum Error {
// RetryLimitExceeded(u32),
// Service(cocaine::Error),
InvalidBodyRead(hyper::Error),
/// X-Cocaine-HTTP-Proto-Version is invalid
InvalidCocaineHttpProtoVersion,
Canceled,
}

Expand All @@ -379,6 +385,7 @@ impl Error {
Error::IncompleteHeadersMatch |
Error::InvalidRequestIdHeader(..) => StatusCode::BadRequest,
Error::InvalidBodyRead(..) |
Error::InvalidCocaineHttpProtoVersion |
Error::Canceled => StatusCode::InternalServerError,
}
}
Expand All @@ -392,6 +399,7 @@ impl Display for Error {
write!(fmt, "Invalid `{}` header value", name)
}
Error::InvalidBodyRead(ref err) => write!(fmt, "{}", err),
Error::InvalidCocaineHttpProtoVersion => fmt.write_str("unsupported X-Cocaine-HTTP-Proto-Version"),
Error::Canceled => fmt.write_str("canceled"),
}
}
Expand All @@ -405,15 +413,40 @@ impl error::Error for Error {
}
Error::InvalidRequestIdHeader(..) => "invalid tracing header value",
Error::InvalidBodyRead(..) => "failed to read HTTP body",
Error::InvalidCocaineHttpProtoVersion => "unsupported X-Cocaine-HTTP-Proto-Version",
Error::Canceled => "canceled",
}
}
}

/// App can set it by X-Cocaine-HTTP-Proto-Version header
/// It controls when the proxy determines to close the HTTP request
#[derive(Copy, Clone, Debug)]
enum CocaineHttpProtoVersion {
/// "1.0"
/// Default behavior. Finish HTTP reply when Close arrives from an app
Proto10,
/// "1.1"
/// Finish HTTP reply when Close or empty body arrives from an app
Proto11,
}

impl FromStr for CocaineHttpProtoVersion {
type Err = Error;
fn from_str(s: &str) -> Result<CocaineHttpProtoVersion, Error> {
Ok(match s {
"1.0" => Proto10,
"1.1" => Proto11,
_ => return Err(Error::InvalidCocaineHttpProtoVersion),
})
}
}

struct AppReadDispatch {
tx: oneshot::Sender<Option<(Response, u64)>>,
body: Option<Vec<u8>>,
response: Option<Response>,
version: Option<CocaineHttpProtoVersion>,
}

impl Dispatch for AppReadDispatch {
Expand All @@ -429,12 +462,50 @@ impl Dispatch for AppReadDispatch {
// TODO: Filter headers - https://tools.ietf.org/html/draft-ietf-httpbis-p1-messaging-14#section-7.1.3
resp.headers_mut().set_raw(name, value);
}

match resp.headers().get::<XCocaineHTTPProtoVersion>() {
Some(h) => {
match CocaineHttpProtoVersion::from_str(h) {
Err(err) => {
let body = err.to_string();
let body_len = body.len() as u64;

// TODO: If service is not available - write 503 (or other custom code).
let res = Response::new()
.with_status(StatusCode::InternalServerError)
.with_body(body);
drop(self.tx.send(Some((res, body_len))));
return None
},
Ok(protoversion) => self.version = Some(protoversion),
}
},
None => self.version = Some(Proto10),
}

self.response = Some(resp);
self.body = Some(Vec::with_capacity(64));
} else {
// TODO: If TE: chunked - feed parser. Consume chunks until None and send.
// TODO: Otherwise - just send.
self.body.as_mut().unwrap().extend(data.as_bytes());
let blob = data.as_bytes();
if !blob.is_empty() {
self.body.as_mut().unwrap().extend(blob);
} else {
match self.version {
Some(Proto11) => {
let body = self.body.take().unwrap();
let body_len = body.len() as u64;

let mut res = self.response.take().unwrap();
res.set_body(body);
drop(self.tx.send(Some((res, body_len))));
return None
},
None |
Some(Proto10) => {},
}
}
}
Some(self)
}
Expand Down

0 comments on commit 2c0f6e8

Please sign in to comment.