diff --git a/src/route/app.rs b/src/route/app.rs index 5e04332..d3b7a63 100644 --- a/src/route/app.rs +++ b/src/route/app.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::error; use std::fmt::{self, Display, Formatter}; use std::str; +use std::str::FromStr; use std::sync::Arc; use byteorder::{ByteOrder, LittleEndian}; @@ -28,10 +29,12 @@ use cocaine::protocol::{self, Flatten}; use logging::AccessLogger; use pool::{Event, EventDispatch, Settings}; use route::{Match, Route}; +use self::CocaineHttpProtoVersion::{Proto10, Proto11}; header! { (XCocaineService, "X-Cocaine-Service") => [String] } header! { (XCocaineEvent, "X-Cocaine-Event") => [String] } header! { (XPoweredBy, "X-Powered-By") => [String] } +header! { (XCocaineHTTPProtoVersion, "X-Cocaine-HTTP-Proto-Version") => [String] } #[derive(Clone, Copy, Debug, PartialEq)] struct XRequestId(u64); @@ -305,6 +308,7 @@ impl AppWithSafeRetry { tx: tx, body: None, response: Some(Response::new()), + version: None, }).and_then(move |tx| { let buf = rmps::to_vec(&request.frame).unwrap(); tx.send(0, &[unsafe { ::std::str::from_utf8_unchecked(&buf) }]); @@ -370,6 +374,8 @@ enum Error { // RetryLimitExceeded(u32), // Service(cocaine::Error), InvalidBodyRead(hyper::Error), + /// X-Cocaine-HTTP-Proto-Version is invalid + InvalidCocaineHttpProtoVersion, Canceled, } @@ -379,6 +385,7 @@ impl Error { Error::IncompleteHeadersMatch | Error::InvalidRequestIdHeader(..) => StatusCode::BadRequest, Error::InvalidBodyRead(..) | + Error::InvalidCocaineHttpProtoVersion | Error::Canceled => StatusCode::InternalServerError, } } @@ -392,6 +399,7 @@ impl Display for Error { write!(fmt, "Invalid `{}` header value", name) } Error::InvalidBodyRead(ref err) => write!(fmt, "{}", err), + Error::InvalidCocaineHttpProtoVersion => fmt.write_str("unsupported X-Cocaine-HTTP-Proto-Version"), Error::Canceled => fmt.write_str("canceled"), } } @@ -405,15 +413,40 @@ impl error::Error for Error { } Error::InvalidRequestIdHeader(..) => "invalid tracing header value", Error::InvalidBodyRead(..) => "failed to read HTTP body", + Error::InvalidCocaineHttpProtoVersion => "unsupported X-Cocaine-HTTP-Proto-Version", Error::Canceled => "canceled", } } } +/// App can set it by X-Cocaine-HTTP-Proto-Version header +/// It controls when the proxy determines to close the HTTP request +#[derive(Copy, Clone, Debug)] +enum CocaineHttpProtoVersion { + /// "1.0" + /// Default behavior. Finish HTTP reply when Close arrives from an app + Proto10, + /// "1.1" + /// Finish HTTP reply when Close or empty body arrives from an app + Proto11, +} + +impl FromStr for CocaineHttpProtoVersion { + type Err = Error; + fn from_str(s: &str) -> Result { + Ok(match s { + "1.0" => Proto10, + "1.1" => Proto11, + _ => return Err(Error::InvalidCocaineHttpProtoVersion), + }) + } +} + struct AppReadDispatch { tx: oneshot::Sender>, body: Option>, response: Option, + version: Option, } impl Dispatch for AppReadDispatch { @@ -429,12 +462,50 @@ impl Dispatch for AppReadDispatch { // TODO: Filter headers - https://tools.ietf.org/html/draft-ietf-httpbis-p1-messaging-14#section-7.1.3 resp.headers_mut().set_raw(name, value); } + + match resp.headers().get::() { + Some(h) => { + match CocaineHttpProtoVersion::from_str(h) { + Err(err) => { + let body = err.to_string(); + let body_len = body.len() as u64; + + // TODO: If service is not available - write 503 (or other custom code). + let res = Response::new() + .with_status(StatusCode::InternalServerError) + .with_body(body); + drop(self.tx.send(Some((res, body_len)))); + return None + }, + Ok(protoversion) => self.version = Some(protoversion), + } + }, + None => self.version = Some(Proto10), + } + self.response = Some(resp); self.body = Some(Vec::with_capacity(64)); } else { // TODO: If TE: chunked - feed parser. Consume chunks until None and send. // TODO: Otherwise - just send. - self.body.as_mut().unwrap().extend(data.as_bytes()); + let blob = data.as_bytes(); + if !blob.is_empty() { + self.body.as_mut().unwrap().extend(blob); + } else { + match self.version { + Some(Proto11) => { + let body = self.body.take().unwrap(); + let body_len = body.len() as u64; + + let mut res = self.response.take().unwrap(); + res.set_body(body); + drop(self.tx.send(Some((res, body_len)))); + return None + }, + None | + Some(Proto10) => {}, + } + } } Some(self) }