diff --git a/e2e/src/boot_cfworker.rs b/e2e/src/boot_cfworker.rs index a80cb69..09eddd0 100644 --- a/e2e/src/boot_cfworker.rs +++ b/e2e/src/boot_cfworker.rs @@ -32,6 +32,30 @@ pub fn boot_worker() -> Result { Ok(cmd) } +pub fn wait_worker_started() -> Result<()> { + let mut count = 0; + + loop { + let output = Command::new("bash") + .arg("-c") + .arg("lsof -i tcp:8787") + .output() + .expect("Failed to execute command"); + + if output.status.success() { + println!("Worker started."); + break; + } else if count == 20 { + return Err(anyhow::anyhow!("Command failed after 20 retries")); + } else { + count += 1; + thread::sleep(Duration::from_millis(500)); + } + } + + Ok(()) +} + pub fn kill_worker() -> Result<()> { // Run pgrep to find the process ID of the wrangler process let pgrep_output = Command::new("pgrep").arg("wrangler").output()?; diff --git a/e2e/src/main.rs b/e2e/src/main.rs index 98f8b18..14fbd35 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -21,6 +21,8 @@ use boot_cfworker::{boot_worker, kill_worker}; use boot_server::boot_background_web_server; use run_cli::{build_cli_project, run_cli_binary}; +use crate::boot_cfworker::wait_worker_started; + type CleanupFunc = Box Result<()> + 'static + Send>; struct Cleanup { @@ -83,8 +85,7 @@ fn run_with_cleanup() -> Result<()> { boot_worker()?; cleanup.add(kill_worker); - - thread::sleep(Duration::from_secs(5)); + wait_worker_started()?; let (out, err) = run_cli_binary(vec!["start", "-c", "e2e_conf.yml"])?; println!("out: {}", out); diff --git a/linkup-cli/src/local_server.rs b/linkup-cli/src/local_server.rs index bf4d076..1523182 100644 --- a/linkup-cli/src/local_server.rs +++ b/linkup-cli/src/local_server.rs @@ -1,6 +1,8 @@ use std::{collections::HashMap, io}; -use actix_web::{middleware, web, App, HttpRequest, HttpResponse, HttpServer, Responder}; +use actix_web::{ + http::header, middleware, web, App, HttpRequest, HttpResponse, HttpServer, Responder, +}; use thiserror::Error; use linkup::*; @@ -21,7 +23,11 @@ async fn linkup_config_handler( let input_json_conf = match String::from_utf8(req_body.to_vec()) { Ok(input_json_conf) => input_json_conf, - Err(_) => return HttpResponse::BadRequest().body("Invalid request body encoding"), + Err(_) => { + return HttpResponse::BadRequest() + .append_header(header::ContentType::plaintext()) + .body("Invalid request body encoding - local server") + } }; match update_session_req_from_json(input_json_conf) { @@ -32,10 +38,16 @@ async fn linkup_config_handler( match session_name { Ok(session_name) => HttpResponse::Ok().body(session_name), Err(e) => HttpResponse::InternalServerError() + .append_header(header::ContentType::plaintext()) .body(format!("Failed to store server config: {}", e)), } } - Err(e) => HttpResponse::BadRequest().body(format!("Failed to parse server config: {}", e)), + Err(e) => HttpResponse::BadRequest() + .append_header(header::ContentType::plaintext()) + .body(format!( + "Failed to parse server config: {} - local server", + e + )), } } @@ -59,14 +71,22 @@ async fn linkup_request_handler( let (session_name, config) = match session_result { Ok(result) => result, - Err(_) => return HttpResponse::UnprocessableEntity().body("Unprocessable Content"), + Err(_) => { + return HttpResponse::UnprocessableEntity() + .append_header(header::ContentType::plaintext()) + .body("Unprocessable Content - local server") + } }; - let destination_url = - match get_target_url(url.clone(), headers.clone(), &config, &session_name) { - Some(result) => result, - None => return HttpResponse::NotFound().body("Not target url for request"), - }; + let destination_url = match get_target_url(url.clone(), headers.clone(), &config, &session_name) + { + Some(result) => result, + None => { + return HttpResponse::NotFound() + .append_header(header::ContentType::plaintext()) + .body("Not target url for request - local server") + } + }; let extra_headers = get_additional_headers(url, &headers, &session_name); @@ -79,14 +99,23 @@ async fn linkup_request_handler( .send() .await; - let response = match response_result { - Ok(response) => response, - Err(_) => return HttpResponse::BadGateway().finish(), - }; + let response = + match response_result { + Ok(response) => response, + Err(_) => return HttpResponse::BadGateway() + .append_header(header::ContentType::plaintext()) + .body( + "Bad Gateway from local server, could you have forgotten to start the server?", + ), + }; convert_reqwest_response(response) .await - .unwrap_or_else(|_| HttpResponse::InternalServerError().finish()) + .unwrap_or_else(|_| { + HttpResponse::InternalServerError() + .append_header(header::ContentType::plaintext()) + .body("Could not convert response from reqwest - local server") + }) } fn merge_headers( diff --git a/linkup-cli/src/status.rs b/linkup-cli/src/status.rs index ea4e537..1c4fede 100644 --- a/linkup-cli/src/status.rs +++ b/linkup-cli/src/status.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::time::Duration; use crate::{ @@ -11,18 +10,18 @@ use crate::{ #[derive(Deserialize, Serialize)] struct Status { session: SessionStatus, - services: HashMap, + services: Vec, } #[derive(Deserialize, Serialize)] struct SessionStatus { session_name: String, - session_token: String, domains: Vec, } #[derive(Deserialize, Serialize)] struct ServiceStatus { + name: String, status: String, component_kind: String, location: String, @@ -35,14 +34,25 @@ pub fn status(json: bool) -> Result<(), CliError> { let service_statuses = service_status(&state)?; services.extend(service_statuses); + // Filter out domains that are subdomains of other domains + let filtered_domains = state + .domains + .iter() + .filter(|&d| { + !state + .domains + .iter() + .any(|other| other.domain != d.domain && d.domain.ends_with(&other.domain)) + }) + .map(|d| d.domain.clone()) + .collect::>(); + let status = Status { session: SessionStatus { session_name: state.linkup.session_name.clone(), - session_token: state.linkup.session_token, - domains: state - .domains + domains: filtered_domains .iter() - .map(|d| format!("{}.{}", state.linkup.session_name.clone(), d.domain.clone())) + .map(|d| format!("{}.{}", state.linkup.session_name.clone(), d.clone())) .collect(), }, services, @@ -54,7 +64,6 @@ pub fn status(json: bool) -> Result<(), CliError> { // Display session information println!("Session Information:"); println!(" Session Name: {}", status.session.session_name); - println!(" Session Token: {}", status.session.session_token); println!(" Domains: "); for domain in &status.session.domains { println!(" {}", domain); @@ -67,10 +76,10 @@ pub fn status(json: bool) -> Result<(), CliError> { "{:<15} {:<15} {:<15} {:<15}", "Service Name", "Component Kind", "Status", "Location" ); - for (name, status) in &status.services { + for status in &status.services { println!( "{:<15} {:<15} {:<15} {:<15}", - name, status.component_kind, status.status, status.location + status.name, status.component_kind, status.status, status.location ); } println!(); @@ -79,42 +88,38 @@ pub fn status(json: bool) -> Result<(), CliError> { Ok(()) } -fn linkup_status(state: &LocalState) -> HashMap { - let mut linkup_status_map: HashMap = HashMap::new(); +fn linkup_status(state: &LocalState) -> Vec { + let mut linkup_statuses: Vec = Vec::new(); let local_url = format!("http://localhost:{}", LINKUP_LOCALSERVER_PORT); - linkup_status_map.insert( - "local server".to_string(), - ServiceStatus { - component_kind: "linkup".to_string(), - location: local_url.to_string(), - status: server_status(local_url), - }, - ); - - linkup_status_map.insert( - "remote server".to_string(), - ServiceStatus { - component_kind: "linkup".to_string(), - location: state.linkup.remote.to_string(), - status: server_status(state.linkup.remote.to_string()), - }, - ); - - linkup_status_map.insert( - "tunnel".to_string(), - ServiceStatus { - component_kind: "linkup".to_string(), - location: state.linkup.tunnel.to_string(), - status: server_status(state.linkup.tunnel.to_string()), - }, - ); - - linkup_status_map + linkup_statuses.push(ServiceStatus { + name: "local_server".to_string(), + component_kind: "linkup".to_string(), + location: local_url.to_string(), + status: server_status(local_url), + }); + + // linkup_statuses.append(local_status); + + linkup_statuses.push(ServiceStatus { + name: "remote_server".to_string(), + component_kind: "linkup".to_string(), + location: state.linkup.remote.to_string(), + status: server_status(state.linkup.remote.to_string()), + }); + + linkup_statuses.push(ServiceStatus { + name: "tunnel".to_string(), + component_kind: "linkup".to_string(), + location: state.linkup.tunnel.to_string(), + status: server_status(state.linkup.tunnel.to_string()), + }); + + linkup_statuses } -fn service_status(state: &LocalState) -> Result, CliError> { - let mut service_status_map: HashMap = HashMap::new(); +fn service_status(state: &LocalState) -> Result, CliError> { + let mut service_statuses: Vec = Vec::new(); for service in state.services.iter().cloned() { let url = match service.current { @@ -124,17 +129,15 @@ fn service_status(state: &LocalState) -> Result, let status = server_status(url.to_string()); - service_status_map.insert( - service.name, - ServiceStatus { - location: url.to_string(), - component_kind: service.current.to_string(), - status, - }, - ); + service_statuses.push(ServiceStatus { + name: service.name, + location: url.to_string(), + component_kind: service.current.to_string(), + status, + }); } - Ok(service_status_map) + Ok(service_statuses) } fn server_status(url: String) -> String { diff --git a/linkup-cli/src/stop.rs b/linkup-cli/src/stop.rs index 6fbbfa3..2752b4e 100644 --- a/linkup-cli/src/stop.rs +++ b/linkup-cli/src/stop.rs @@ -1,5 +1,6 @@ use std::fs::{self, File, OpenOptions}; use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; use crate::signal::{send_sigint, PidError}; use crate::start::get_state; @@ -39,9 +40,13 @@ pub fn stop() -> Result<(), CliError> { let state = get_state()?; for service in &state.services { - match &service.directory { - Some(d) => remove_service_env(d.clone())?, - None => {} + let remove_res = match &service.directory { + Some(d) => remove_service_env(d.clone(), state.linkup.config_path.clone()), + None => Ok(()), + }; + + if let Err(e) = remove_res { + println!("Could not remove env for service {}: {}", service.name, e); } } @@ -63,9 +68,16 @@ fn get_pid(file_name: &str) -> Result { } } -fn remove_service_env(directory: String) -> Result<(), CliError> { - let env_path = format!("{}/.env", directory); - let temp_env_path = format!("{}/.env.temp", directory); +fn remove_service_env(directory: String, config_path: String) -> Result<(), CliError> { + let config_dir = Path::new(&config_path).parent().ok_or_else(|| { + CliError::SetServiceEnv( + directory.clone(), + "config_path does not have a parent directory".to_string(), + ) + })?; + + let env_path = PathBuf::from(config_dir).join(&directory).join(".env"); + let temp_env_path = PathBuf::from(config_dir).join(&directory).join(".env.temp"); let input_file = File::open(&env_path).map_err(|e| { CliError::RemoveServiceEnv(directory.clone(), format!("could not open env file: {}", e)) diff --git a/linkup/src/lib.rs b/linkup/src/lib.rs index e87acd7..830bc84 100644 --- a/linkup/src/lib.rs +++ b/linkup/src/lib.rs @@ -90,12 +90,18 @@ pub fn get_additional_headers( pub fn common_response_headers() -> HashMap { let mut headers = HashMap::new(); - headers.insert("Access-Control-Allow-Methods".to_string(), "GET, POST, PUT, PATCH, DELETE, HEAD, CONNECT, TRACE, OPTIONS".to_string()); + headers.insert( + "Access-Control-Allow-Methods".to_string(), + "GET, POST, PUT, PATCH, DELETE, HEAD, CONNECT, TRACE, OPTIONS".to_string(), + ); headers.insert("Access-Control-Allow-Origin".to_string(), "*".to_string()); headers.insert("Access-Control-Allow-Headers".to_string(), "*".to_string()); headers.insert("Access-Control-Max-Age".to_string(), "86400".to_string()); // This can be discussed / tweaked later, probably PR-only sessions should respect cache headers, but dev ones not. - headers.insert("Cache-Control".to_string(), "no-store, must-revalidate".to_string()); + headers.insert( + "Cache-Control".to_string(), + "no-store, must-revalidate".to_string(), + ); headers } @@ -111,6 +117,15 @@ pub fn get_target_url( let path = target.path(); let url_target = config.domains.get(&get_target_domain(&url, session_name)); + + // Forwarded hosts persist over the tunnel + let forwarded_host_target = config.domains.get( + headers + .get("x-forwarded-host") + .unwrap_or(&"does-not-exist".to_string()), + ); + + // This is more for e2e tests to work let referer_target = config.domains.get(&get_target_domain( headers .get("referer") @@ -120,6 +135,8 @@ pub fn get_target_url( let target_domain = if url_target.is_some() { url_target + } else if forwarded_host_target.is_some() { + forwarded_host_target } else { referer_target }; @@ -386,7 +403,7 @@ mod tests { &name ) .unwrap(), - "http://localhost:8000/?a=b".to_string(), + "http://localhost:8000/?a=b".to_string(), ); // With path assert_eq!( @@ -397,7 +414,7 @@ mod tests { &name ) .unwrap(), - "http://localhost:8000/a/b/c/?a=b".to_string(), + "http://localhost:8000/a/b/c/?a=b".to_string(), ); // Test rewrites assert_eq!( @@ -408,7 +425,7 @@ mod tests { &name ) .unwrap(), - "http://localhost:8000/bar/b/c/?a=b".to_string(), + "http://localhost:8000/bar/b/c/?a=b".to_string(), ); // Test domain routes assert_eq!( @@ -419,7 +436,7 @@ mod tests { &name ) .unwrap(), - "http://localhost:8001/api/v1/?a=b".to_string(), + "http://localhost:8001/api/v1/?a=b".to_string(), ); // Test no named subdomain assert_eq!( @@ -430,7 +447,7 @@ mod tests { &name ) .unwrap(), - "http://localhost:8001/api/v1/?a=b".to_string(), + "http://localhost:8001/api/v1/?a=b".to_string(), ); } } diff --git a/linkup/src/session.rs b/linkup/src/session.rs index 76b0dd6..7bbac90 100644 --- a/linkup/src/session.rs +++ b/linkup/src/session.rs @@ -28,13 +28,13 @@ pub struct Rewrite { pub target: String, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Domain { pub default_service: String, pub routes: Vec, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Route { pub path: Regex, pub service: String, diff --git a/worker/src/lib.rs b/worker/src/lib.rs index dac9f63..749ef4b 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -107,6 +107,35 @@ async fn linkup_request_handler(mut req: Request, sessions: SessionAllocator) -> convert_reqwest_response_to_cf(response, common_response_headers()).await } +async fn linkup_ws_handler(req: Request, sessions: SessionAllocator) -> Result { + let url = match req.url() { + Ok(url) => url.to_string(), + Err(_) => return plaintext_error("Bad or missing request url", 400), + }; + + let headers = req + .headers() + .clone() + .entries() + .collect::>(); + + let (session_name, config) = + match sessions.get_request_session(url.clone(), headers.clone()).await { + Ok(result) => result, + Err(_) => return plaintext_error("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate", 422), + }; + + let destination_url = match get_target_url(url.clone(), headers.clone(), &config, &session_name) + { + Some(result) => result, + None => return plaintext_error("No target URL for request", 422), + }; + + let redirect_dest = Url::parse(&destination_url)?; + + Response::redirect(redirect_dest) +} + #[event(fetch)] pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result { log_request(&req); @@ -123,6 +152,10 @@ pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result