Skip to content

Commit

Permalink
Status & local proxy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ostenbom committed May 11, 2023
1 parent e1f5f35 commit 89c1dc3
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 82 deletions.
24 changes: 24 additions & 0 deletions e2e/src/boot_cfworker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,30 @@ pub fn boot_worker() -> Result<Child> {
Ok(cmd)
}

pub fn wait_worker_started() -> Result<()> {
let mut count = 0;

loop {
let output = Command::new("bash")
.arg("-c")
.arg("lsof -i tcp:8787")
.output()
.expect("Failed to execute command");

if output.status.success() {
println!("Worker started.");
break;
} else if count == 20 {
return Err(anyhow::anyhow!("Command failed after 20 retries"));
} else {
count += 1;
thread::sleep(Duration::from_millis(500));
}
}

Ok(())
}

pub fn kill_worker() -> Result<()> {
// Run pgrep to find the process ID of the wrangler process
let pgrep_output = Command::new("pgrep").arg("wrangler").output()?;
Expand Down
5 changes: 3 additions & 2 deletions e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use boot_cfworker::{boot_worker, kill_worker};
use boot_server::boot_background_web_server;
use run_cli::{build_cli_project, run_cli_binary};

use crate::boot_cfworker::wait_worker_started;

type CleanupFunc = Box<dyn FnOnce() -> Result<()> + 'static + Send>;

struct Cleanup {
Expand Down Expand Up @@ -83,8 +85,7 @@ fn run_with_cleanup() -> Result<()> {

boot_worker()?;
cleanup.add(kill_worker);

thread::sleep(Duration::from_secs(5));
wait_worker_started()?;

let (out, err) = run_cli_binary(vec!["start", "-c", "e2e_conf.yml"])?;
println!("out: {}", out);
Expand Down
57 changes: 43 additions & 14 deletions linkup-cli/src/local_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{collections::HashMap, io};

use actix_web::{middleware, web, App, HttpRequest, HttpResponse, HttpServer, Responder};
use actix_web::{
http::header, middleware, web, App, HttpRequest, HttpResponse, HttpServer, Responder,
};
use thiserror::Error;

use linkup::*;
Expand All @@ -21,7 +23,11 @@ async fn linkup_config_handler(

let input_json_conf = match String::from_utf8(req_body.to_vec()) {
Ok(input_json_conf) => input_json_conf,
Err(_) => return HttpResponse::BadRequest().body("Invalid request body encoding"),
Err(_) => {
return HttpResponse::BadRequest()
.append_header(header::ContentType::plaintext())
.body("Invalid request body encoding - local server")
}
};

match update_session_req_from_json(input_json_conf) {
Expand All @@ -32,10 +38,16 @@ async fn linkup_config_handler(
match session_name {
Ok(session_name) => HttpResponse::Ok().body(session_name),
Err(e) => HttpResponse::InternalServerError()
.append_header(header::ContentType::plaintext())
.body(format!("Failed to store server config: {}", e)),
}
}
Err(e) => HttpResponse::BadRequest().body(format!("Failed to parse server config: {}", e)),
Err(e) => HttpResponse::BadRequest()
.append_header(header::ContentType::plaintext())
.body(format!(
"Failed to parse server config: {} - local server",
e
)),
}
}

Expand All @@ -59,14 +71,22 @@ async fn linkup_request_handler(

let (session_name, config) = match session_result {
Ok(result) => result,
Err(_) => return HttpResponse::UnprocessableEntity().body("Unprocessable Content"),
Err(_) => {
return HttpResponse::UnprocessableEntity()
.append_header(header::ContentType::plaintext())
.body("Unprocessable Content - local server")
}
};

let destination_url =
match get_target_url(url.clone(), headers.clone(), &config, &session_name) {
Some(result) => result,
None => return HttpResponse::NotFound().body("Not target url for request"),
};
let destination_url = match get_target_url(url.clone(), headers.clone(), &config, &session_name)
{
Some(result) => result,
None => {
return HttpResponse::NotFound()
.append_header(header::ContentType::plaintext())
.body("Not target url for request - local server")
}
};

let extra_headers = get_additional_headers(url, &headers, &session_name);

Expand All @@ -79,14 +99,23 @@ async fn linkup_request_handler(
.send()
.await;

let response = match response_result {
Ok(response) => response,
Err(_) => return HttpResponse::BadGateway().finish(),
};
let response =
match response_result {
Ok(response) => response,
Err(_) => return HttpResponse::BadGateway()
.append_header(header::ContentType::plaintext())
.body(
"Bad Gateway from local server, could you have forgotten to start the server?",
),
};

convert_reqwest_response(response)
.await
.unwrap_or_else(|_| HttpResponse::InternalServerError().finish())
.unwrap_or_else(|_| {
HttpResponse::InternalServerError()
.append_header(header::ContentType::plaintext())
.body("Could not convert response from reqwest - local server")
})
}

fn merge_headers(
Expand Down
105 changes: 54 additions & 51 deletions linkup-cli/src/status.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;

use crate::{
Expand All @@ -11,18 +10,18 @@ use crate::{
#[derive(Deserialize, Serialize)]
struct Status {
session: SessionStatus,
services: HashMap<String, ServiceStatus>,
services: Vec<ServiceStatus>,
}

#[derive(Deserialize, Serialize)]
struct SessionStatus {
session_name: String,
session_token: String,
domains: Vec<String>,
}

#[derive(Deserialize, Serialize)]
struct ServiceStatus {
name: String,
status: String,
component_kind: String,
location: String,
Expand All @@ -35,14 +34,25 @@ pub fn status(json: bool) -> Result<(), CliError> {
let service_statuses = service_status(&state)?;
services.extend(service_statuses);

// Filter out domains that are subdomains of other domains
let filtered_domains = state
.domains
.iter()
.filter(|&d| {
!state
.domains
.iter()
.any(|other| other.domain != d.domain && d.domain.ends_with(&other.domain))
})
.map(|d| d.domain.clone())
.collect::<Vec<String>>();

let status = Status {
session: SessionStatus {
session_name: state.linkup.session_name.clone(),
session_token: state.linkup.session_token,
domains: state
.domains
domains: filtered_domains
.iter()
.map(|d| format!("{}.{}", state.linkup.session_name.clone(), d.domain.clone()))
.map(|d| format!("{}.{}", state.linkup.session_name.clone(), d.clone()))
.collect(),
},
services,
Expand All @@ -54,7 +64,6 @@ pub fn status(json: bool) -> Result<(), CliError> {
// Display session information
println!("Session Information:");
println!(" Session Name: {}", status.session.session_name);
println!(" Session Token: {}", status.session.session_token);
println!(" Domains: ");
for domain in &status.session.domains {
println!(" {}", domain);
Expand All @@ -67,10 +76,10 @@ pub fn status(json: bool) -> Result<(), CliError> {
"{:<15} {:<15} {:<15} {:<15}",
"Service Name", "Component Kind", "Status", "Location"
);
for (name, status) in &status.services {
for status in &status.services {
println!(
"{:<15} {:<15} {:<15} {:<15}",
name, status.component_kind, status.status, status.location
status.name, status.component_kind, status.status, status.location
);
}
println!();
Expand All @@ -79,42 +88,38 @@ pub fn status(json: bool) -> Result<(), CliError> {
Ok(())
}

fn linkup_status(state: &LocalState) -> HashMap<String, ServiceStatus> {
let mut linkup_status_map: HashMap<String, ServiceStatus> = HashMap::new();
fn linkup_status(state: &LocalState) -> Vec<ServiceStatus> {
let mut linkup_statuses: Vec<ServiceStatus> = Vec::new();

let local_url = format!("http://localhost:{}", LINKUP_LOCALSERVER_PORT);
linkup_status_map.insert(
"local server".to_string(),
ServiceStatus {
component_kind: "linkup".to_string(),
location: local_url.to_string(),
status: server_status(local_url),
},
);

linkup_status_map.insert(
"remote server".to_string(),
ServiceStatus {
component_kind: "linkup".to_string(),
location: state.linkup.remote.to_string(),
status: server_status(state.linkup.remote.to_string()),
},
);

linkup_status_map.insert(
"tunnel".to_string(),
ServiceStatus {
component_kind: "linkup".to_string(),
location: state.linkup.tunnel.to_string(),
status: server_status(state.linkup.tunnel.to_string()),
},
);

linkup_status_map
linkup_statuses.push(ServiceStatus {
name: "local_server".to_string(),
component_kind: "linkup".to_string(),
location: local_url.to_string(),
status: server_status(local_url),
});

// linkup_statuses.append(local_status);

linkup_statuses.push(ServiceStatus {
name: "remote_server".to_string(),
component_kind: "linkup".to_string(),
location: state.linkup.remote.to_string(),
status: server_status(state.linkup.remote.to_string()),
});

linkup_statuses.push(ServiceStatus {
name: "tunnel".to_string(),
component_kind: "linkup".to_string(),
location: state.linkup.tunnel.to_string(),
status: server_status(state.linkup.tunnel.to_string()),
});

linkup_statuses
}

fn service_status(state: &LocalState) -> Result<HashMap<String, ServiceStatus>, CliError> {
let mut service_status_map: HashMap<String, ServiceStatus> = HashMap::new();
fn service_status(state: &LocalState) -> Result<Vec<ServiceStatus>, CliError> {
let mut service_statuses: Vec<ServiceStatus> = Vec::new();

for service in state.services.iter().cloned() {
let url = match service.current {
Expand All @@ -124,17 +129,15 @@ fn service_status(state: &LocalState) -> Result<HashMap<String, ServiceStatus>,

let status = server_status(url.to_string());

service_status_map.insert(
service.name,
ServiceStatus {
location: url.to_string(),
component_kind: service.current.to_string(),
status,
},
);
service_statuses.push(ServiceStatus {
name: service.name,
location: url.to_string(),
component_kind: service.current.to_string(),
status,
});
}

Ok(service_status_map)
Ok(service_statuses)
}

fn server_status(url: String) -> String {
Expand Down
24 changes: 18 additions & 6 deletions linkup-cli/src/stop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};

use crate::signal::{send_sigint, PidError};
use crate::start::get_state;
Expand Down Expand Up @@ -39,9 +40,13 @@ pub fn stop() -> Result<(), CliError> {

let state = get_state()?;
for service in &state.services {
match &service.directory {
Some(d) => remove_service_env(d.clone())?,
None => {}
let remove_res = match &service.directory {
Some(d) => remove_service_env(d.clone(), state.linkup.config_path.clone()),
None => Ok(()),
};

if let Err(e) = remove_res {
println!("Could not remove env for service {}: {}", service.name, e);
}
}

Expand All @@ -63,9 +68,16 @@ fn get_pid(file_name: &str) -> Result<String, PidError> {
}
}

fn remove_service_env(directory: String) -> Result<(), CliError> {
let env_path = format!("{}/.env", directory);
let temp_env_path = format!("{}/.env.temp", directory);
fn remove_service_env(directory: String, config_path: String) -> Result<(), CliError> {
let config_dir = Path::new(&config_path).parent().ok_or_else(|| {
CliError::SetServiceEnv(
directory.clone(),
"config_path does not have a parent directory".to_string(),
)
})?;

let env_path = PathBuf::from(config_dir).join(&directory).join(".env");
let temp_env_path = PathBuf::from(config_dir).join(&directory).join(".env.temp");

let input_file = File::open(&env_path).map_err(|e| {
CliError::RemoveServiceEnv(directory.clone(), format!("could not open env file: {}", e))
Expand Down
Loading

0 comments on commit 89c1dc3

Please sign in to comment.