Skip to content

Commit

Permalink
Reqwest as proxy library in cf worker. Again.
Browse files Browse the repository at this point in the history
  • Loading branch information
ostenbom committed May 10, 2023
1 parent 8b75962 commit 658adc8
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 105 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions linkup-cli/src/local_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ async fn linkup_request_handler(
Err(_) => return HttpResponse::UnprocessableEntity().body("Unprocessable Content"),
};

let (destination_url, service) =
let destination_url =
match get_target_url(url.clone(), headers.clone(), &config, &session_name) {
Some(result) => result,
None => return HttpResponse::NotFound().body("Not target url for request"),
};

let extra_headers = get_additional_headers(url, &headers, &session_name, &service);
let extra_headers = get_additional_headers(url, &headers, &session_name);

// Proxy the request using the destination_url and the merged headers
let client = reqwest::Client::new();
Expand Down
83 changes: 20 additions & 63 deletions linkup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub fn get_additional_headers(
url: String,
headers: &HashMap<String, String>,
session_name: &str,
service: &str,
) -> HashMap<String, String> {
let mut additional_headers = HashMap::new();

Expand All @@ -66,14 +65,13 @@ pub fn get_additional_headers(

let tracestate = headers.get("tracestate");
let linkup_session = format!("linkup-session={}", session_name);
let linkup_service = format!("linkup-service={}", service);
match tracestate {
Some(ts) if !ts.contains(&linkup_session) => {
let new_tracestate = format!("{},{},{}", ts, linkup_session, linkup_service);
let new_tracestate = format!("{},{}", ts, linkup_session);
additional_headers.insert("tracestate".to_string(), new_tracestate);
}
None => {
let new_tracestate = format!("{},{}", linkup_session, linkup_service);
let new_tracestate = linkup_session;
additional_headers.insert("tracestate".to_string(), new_tracestate);
}
_ => {}
Expand All @@ -89,29 +87,29 @@ pub fn get_additional_headers(
additional_headers
}

pub fn common_response_headers() -> HashMap<String, String> {
let mut headers = HashMap::new();

headers.insert("Access-Control-Allow-Methods".to_string(), "GET, POST, PUT, PATCH, DELETE, HEAD, CONNECT, TRACE, OPTIONS".to_string());
headers.insert("Access-Control-Allow-Origin".to_string(), "*".to_string());
headers.insert("Access-Control-Allow-Headers".to_string(), "*".to_string());
headers.insert("Access-Control-Max-Age".to_string(), "86400".to_string());
// This can be discussed / tweaked later, probably PR-only sessions should respect cache headers, but dev ones not.
headers.insert("Cache-Control".to_string(), "no-store, must-revalidate".to_string());

headers
}

// Returns a url for the destination service and the service name, if the request could be served by the config
pub fn get_target_url(
url: String,
headers: HashMap<String, String>,
config: &Session,
session_name: &str,
) -> Option<(String, String)> {
) -> Option<String> {
let target = Url::parse(&url).unwrap();
let tracestate = headers.get("tracestate");
let path = target.path();

// If the request hit linkup before, we shouldn't need to do routing again.
if let Some(tracestate_value) = tracestate {
let service_name = extract_tracestate_service(tracestate_value);
if !service_name.is_empty() {
if let Some(service) = config.services.get(&service_name) {
// We don't want to re-apply rewrites here, they should have been applied already
let target = redirect(target, &service.origin, None);
return Some((String::from(target), service_name));
}
}
}

let url_target = config.domains.get(&get_target_domain(&url, session_name));
let referer_target = config.domains.get(&get_target_domain(
headers
Expand Down Expand Up @@ -152,7 +150,7 @@ pub fn get_target_url(
}

let target = redirect(target, &service.origin, Some(new_path));
return Some((String::from(target), service_name));
return Some(String::from(target));
}
}

Expand Down Expand Up @@ -209,10 +207,6 @@ fn extract_tracestate_session(tracestate: &str) -> String {
extrace_tracestate(tracestate, String::from("linkup-session"))
}

fn extract_tracestate_service(tracestate: &str) -> String {
extrace_tracestate(tracestate, String::from("linkup-service"))
}

fn extrace_tracestate(tracestate: &str, linkup_key: String) -> String {
tracestate
.split(',')
Expand Down Expand Up @@ -311,13 +305,12 @@ mod tests {
"https://tiny-cow.example.com/abc-xyz".to_string(),
&headers,
&session_name,
"frontend",
);

assert_eq!(add_headers.get("traceparent").unwrap().len(), 55);
assert_eq!(
add_headers.get("tracestate").unwrap(),
"linkup-session=tiny-cow,linkup-service=frontend"
"linkup-session=tiny-cow"
);
assert_eq!(add_headers.get("X-Forwarded-Host").unwrap(), "example.com");

Expand All @@ -332,7 +325,6 @@ mod tests {
"https://abc.some-tunnel.com/abc-xyz".to_string(),
&already_headers,
&session_name,
"frontend",
);

assert!(add_headers.get("traceparent").is_none());
Expand All @@ -347,14 +339,13 @@ mod tests {
"https://abc.some-tunnel.com/abc-xyz".to_string(),
&already_headers_two,
&session_name,
"frontend",
);

assert!(add_headers.get("traceparent").is_none());
assert!(add_headers.get("X-Forwarded-Host").is_none());
assert_eq!(
add_headers.get("tracestate").unwrap(),
"other-service=32,linkup-session=tiny-cow,linkup-service=frontend"
"other-service=32,linkup-session=tiny-cow"
);
}

Expand Down Expand Up @@ -395,10 +386,7 @@ mod tests {
&name
)
.unwrap(),
(
"http://localhost:8000/?a=b".to_string(),
"frontend".to_string()
)
);
// With path
assert_eq!(
Expand All @@ -409,10 +397,7 @@ mod tests {
&name
)
.unwrap(),
(
"http://localhost:8000/a/b/c/?a=b".to_string(),
"frontend".to_string()
)
);
// Test rewrites
assert_eq!(
Expand All @@ -423,24 +408,18 @@ mod tests {
&name
)
.unwrap(),
(
"http://localhost:8000/bar/b/c/?a=b".to_string(),
"frontend".to_string()
)
);
// Test domain routes
assert_eq!(
get_target_url(
format!("http://{}.example.com/api/v1/?a=b", &name),
format!("http://{}.example.com/api/v1/abc?a=b", &name),
HashMap::new(),
&config,
&name
)
.unwrap(),
(
"http://localhost:8001/api/v1/?a=b".to_string(),
"backend".to_string()
)
);
// Test no named subdomain
assert_eq!(
Expand All @@ -451,29 +430,7 @@ mod tests {
&name
)
.unwrap(),
(
"http://localhost:8001/api/v1/?a=b".to_string(),
"backend".to_string()
)
);
// Test has already been through another linkup server
let mut service_state_headers: HashMap<String, String> = HashMap::new();
service_state_headers.insert(
"tracestate".to_string(),
"linkup-service=frontend".to_string(),
);
assert_eq!(
get_target_url(
"https://literally-any-url.com/foo/a/b".to_string(),
service_state_headers,
&config,
&name
)
.unwrap(),
(
"http://localhost:8000/foo/a/b".to_string(),
"frontend".to_string()
)
);
}
}
1 change: 1 addition & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ futures = "0.3"
# code size when deploying.
console_error_panic_hook = { version = "0.1.1", optional = true }
http = "0.2.9"
reqwest = "0.11.17"

# [profile.release]
# Tell `rustc` to optimize for small code size.
Expand Down
65 changes: 59 additions & 6 deletions worker/src/http_util.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,71 @@
use std::collections::HashMap;
use reqwest::{Method as ReqwestMethod, Response as ReqwestResponse};
use std::{collections::HashMap, convert::TryFrom};
use worker::{console_log, Headers as CfHeaders, Method as CfMethod, Response as CfResponse};

use worker::{Headers, Result};
pub fn convert_cf_method_to_reqwest(
cf_method: &CfMethod,
) -> Result<ReqwestMethod, http::method::InvalidMethod> {
let method_str = match cf_method {
CfMethod::Get => "GET",
CfMethod::Post => "POST",
CfMethod::Put => "PUT",
CfMethod::Delete => "DELETE",
CfMethod::Options => "OPTIONS",
CfMethod::Head => "HEAD",
CfMethod::Patch => "PATCH",
CfMethod::Connect => "CONNECT",
CfMethod::Trace => "TRACE",
};

ReqwestMethod::try_from(method_str)
}

pub fn merge_headers(
original_headers: HashMap<String, String>,
extra_headers: HashMap<String, String>,
) -> Result<Headers> {
let mut new_headers = Headers::new();
) -> reqwest::header::HeaderMap {
let mut header_map = reqwest::header::HeaderMap::new();
for (key, value) in original_headers
.into_iter()
.chain(extra_headers.into_iter())
{
new_headers.set(&key, &value)?;
if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value) {
header_map.append(header_name, header_value);
}
}
}
header_map
}

pub async fn convert_reqwest_response_to_cf(
response: ReqwestResponse,
extra_headers: HashMap<String, String>,
) -> worker::Result<CfResponse> {
let status = response.status();
let headers = response.headers().clone();

let body_bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(_) => return CfResponse::error("Error reading response body", 502),
};

let cf_response = match CfResponse::from_bytes(body_bytes.to_vec()) {
Ok(response) => response,
Err(_) => return CfResponse::error("Error creating response body", 500),
};

let mut cf_headers = CfHeaders::from(headers);

for (key, value) in extra_headers {
let header_res = cf_headers.set(&key, &value);
if header_res.is_err() {
console_log!("failed to set response header: {}", header_res.unwrap_err());
}
}

let cf_response = cf_response.with_headers(cf_headers);
let cf_response = cf_response.with_status(status.into());

Ok(new_headers)
Ok(cf_response)
}
Loading

0 comments on commit 658adc8

Please sign in to comment.