1
0
Fork 0
mirror of https://codeberg.org/icewind/haze.git synced 2026-06-03 17:14:08 +02:00

switch proxy to allow webscokets

This commit is contained in:
Robin Appelman 2023-02-15 21:25:03 +01:00
commit 6aad67473d
4 changed files with 111 additions and 208 deletions

View file

@ -2,31 +2,29 @@ use crate::service::ServiceTrait;
use crate::{Cloud, HazeConfig};
use crate::{Result, Service};
use bollard::Docker;
use futures_util::future::Either;
use futures_util::FutureExt;
use miette::{miette, Context, IntoDiagnostic};
use hyper::http::uri::Builder;
use miette::{miette, IntoDiagnostic};
use std::collections::HashMap;
use std::convert::Infallible;
use std::fs::{create_dir_all, remove_file, set_permissions};
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::UnixListener;
use tokio::net::{UnixListener, UnixStream};
use tokio::signal::ctrl_c;
use tokio::spawn;
use tokio::time::sleep;
use tokio_stream::wrappers::UnixListenerStream;
use tracing::info;
use warp::host::Authority;
use warp::http::{HeaderMap, HeaderValue, Method};
use warp::hyper::body::Bytes;
use warp::path::FullPath;
use warp::Filter;
use warp_reverse_proxy::{
extract_request_data_filter, proxy_to_and_forward_response, QueryParameters,
};
use warp::http::header::HOST;
use warp::http::{HeaderValue, Uri};
use warp::hyper::server::accept::from_stream;
use warp::hyper::server::conn::AddrStream;
use warp::hyper::service::{make_service_fn, service_fn};
use warp::hyper::{Body, Request, Response, Server, StatusCode};
struct ActiveInstances {
known: Mutex<HashMap<String, SocketAddr>>,
@ -126,8 +124,6 @@ async fn serve(instances: ActiveInstances, listen: String, base_address: String)
let instances = Arc::new(instances);
let base_address = Arc::new(base_address);
let last_instances = instances.clone();
let instances = warp::any().map(move || instances.clone());
let base_address = warp::any().map(move || base_address.clone());
spawn(async move {
loop {
@ -136,73 +132,31 @@ async fn serve(instances: ActiveInstances, listen: String, base_address: String)
}
});
let proxy = warp::any()
.and(warp::filters::host::optional())
.and(instances)
.and(base_address)
.and_then(
move |host: Option<Authority>,
instances: Arc<ActiveInstances>,
base_address: Arc<String>| async move {
let host = match host {
Some(host) => host,
None => return Err(warp::reject::not_found()),
};
let ip = if host.as_str() == base_address.as_str() {
instances
.last()
.ok_or_else(|| String::from("No running instance known"))
} else {
let requested_instance = host.as_str().split('.').next().unwrap();
instances
.get(requested_instance)
.await
.ok_or_else(|| format!("Error {} has no known ip", requested_instance))
};
match ip {
Ok(ip) => Ok((format!("http://{}", ip), host.to_string())),
Err(e) => {
eprintln!("{}", e);
Err(warp::reject::not_found())
}
}
},
)
.untuple_one()
.and(extract_request_data_filter())
.and_then(
move |proxy_address: String,
host: String,
uri: FullPath,
params: QueryParameters,
method: Method,
mut headers: HeaderMap,
body: Bytes| {
headers.insert("host", HeaderValue::from_str(&host).unwrap());
proxy_to_and_forward_response(
proxy_address,
String::new(),
uri,
params,
method,
headers,
body,
)
},
);
let cancel = async {
ctrl_c().await.ok();
};
let warp_server = warp::serve(proxy);
let handler = move |remote_addr| {
let instances = instances.clone();
let base_address = base_address.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
handle(remote_addr, req, instances.clone(), base_address.clone())
}))
}
};
let server = if !listen.starts_with('/') {
let listen = SocketAddr::from_str(&listen)
.into_diagnostic()
.wrap_err("Failed to parse proxy listen address")?;
Either::Left(warp_server.bind_with_graceful_shutdown(listen, cancel).1)
if !listen.starts_with('/') {
let make_svc = make_service_fn(|conn: &AddrStream| handler(conn.remote_addr().ip()));
let addr: SocketAddr = listen.parse().into_diagnostic()?;
Server::bind(&addr)
.serve(make_svc)
.with_graceful_shutdown(cancel)
.await
.into_diagnostic()?;
} else {
let make_svc =
make_service_fn(move |_conn: &UnixStream| handler(Ipv4Addr::UNSPECIFIED.into()));
let listen: PathBuf = listen.into();
if let Some(parent) = listen.parent() {
if !parent.exists() {
@ -215,15 +169,70 @@ async fn serve(instances: ActiveInstances, listen: String, base_address: String)
let listener = UnixListener::bind(&listen).into_diagnostic()?;
set_permissions(&listen, PermissionsExt::from_mode(0o666)).into_diagnostic()?;
let stream = UnixListenerStream::new(listener);
Either::Right(
warp_server
.serve_incoming_with_graceful_shutdown(stream, cancel)
.map(move |_| {
remove_file(&listen).ok();
}),
)
};
let acceptor = from_stream(stream);
Server::builder(acceptor)
.serve(make_svc)
.with_graceful_shutdown(cancel)
.await
.into_diagnostic()?;
}
server.await;
Ok(())
}
async fn get_remote(
host: Option<&HeaderValue>,
instances: &ActiveInstances,
base_address: &str,
) -> Result<SocketAddr, String> {
let host = match host.and_then(|host| host.to_str().ok()) {
Some(host) => host,
None => return Err("No or invalid hostname provided".into()),
};
let ip = if host == base_address {
instances
.last()
.ok_or_else(|| String::from("No running instance known"))
} else {
let requested_instance = host.split('.').next().unwrap();
instances
.get(requested_instance)
.await
.ok_or_else(|| format!("Error {} has no known ip", requested_instance))
};
match ip {
Ok(ip) => Ok(ip),
Err(e) => {
eprintln!("{}", e);
Err(e)
}
}
}
async fn handle(
client_ip: IpAddr,
req: Request<Body>,
instances: Arc<ActiveInstances>,
base_address: Arc<String>,
) -> Result<Response<Body>, Infallible> {
let host = req.headers().get(HOST);
let remote = match get_remote(host, &instances, &base_address).await {
Ok(remote) => remote,
Err(e) => {
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(e.into())
.unwrap())
}
};
let forward = format!("http://{}", remote);
let client = hyper::Client::builder().build_http();
match hyper_reverse_proxy::call(client_ip, &forward, req, &client).await {
Ok(response) => Ok(response),
Err(_error) => Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap()),
}
}