1
0
Fork 0
mirror of https://codeberg.org/icewind/haze.git synced 2026-06-03 09:04:12 +02:00
haze/src/proxy.rs

243 lines
7.2 KiB
Rust

use crate::service::ServiceTrait;
use crate::Result;
use crate::{Cloud, HazeConfig};
use axum::http::header::HOST;
use axum::http::HeaderValue;
use axum::{
body::Body,
extract::{Request, State},
response::{IntoResponse, Response},
Router,
};
use bollard::Docker;
use hyper::StatusCode;
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor};
use miette::{miette, IntoDiagnostic};
use std::collections::HashMap;
use std::fs::{create_dir_all, set_permissions};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::UnixListener;
use tokio::signal::ctrl_c;
use tokio::spawn;
use tokio::time::sleep;
use tracing::{debug, error, info};
struct ActiveInstances {
known: Mutex<HashMap<String, SocketAddr>>,
last: Mutex<Option<SocketAddr>>,
docker: Docker,
config: HazeConfig,
}
impl ActiveInstances {
pub fn new(docker: Docker, config: HazeConfig) -> Self {
ActiveInstances {
known: Mutex::default(),
last: Mutex::default(),
docker,
config,
}
}
pub async fn get(&self, name: &str) -> Option<SocketAddr> {
if let Some(ip) = self.known.lock().unwrap().get(name).cloned() {
return Some(ip);
}
// service proxy
let addr = if name.matches('-').count() == 2 {
let (name, service_name) = name.rsplit_once('-').unwrap();
let cloud = Cloud::get_by_filter(&self.docker, Some(name.into()), &self.config)
.await
.ok()?;
let service = cloud
.services()
.find(|service| service.name() == service_name)?;
let ip = service
.get_ips(&self.docker, &cloud.id)
.await
.ok()?
.next()?;
SocketAddr::new(ip, service.proxy_port())
} else {
SocketAddr::new(
Cloud::get_by_filter(&self.docker, Some(name.into()), &self.config)
.await
.ok()?
.ip?,
80,
)
};
println!("{name} => {addr}");
self.known.lock().unwrap().insert(name.into(), addr);
Some(addr)
}
pub fn last(&self) -> Option<SocketAddr> {
*self.last.lock().unwrap()
}
async fn update_last(&self) {
let last = Cloud::get_by_filter(&self.docker, None, &self.config)
.await
.ok()
.and_then(|cloud| Some(SocketAddr::new(cloud.ip?, 80)));
let mut old = self.last.lock().unwrap();
if old.as_ref() != last.as_ref() {
info!(instance = ?last, "Found new instance");
*old = last;
}
}
}
pub async fn proxy(docker: Docker, config: HazeConfig) -> Result<()> {
if config.proxy.listen.is_empty() {
return Err(miette!("Proxy not configured"));
}
let listen = config.proxy.listen.clone();
let base_address = config.proxy.address.clone();
let instances = ActiveInstances::new(docker, config);
serve(instances, listen, base_address).await
}
#[derive(Clone)]
struct AppState {
instances: Arc<ActiveInstances>,
base_address: Arc<String>,
proxy_client: Arc<Client>,
}
async fn serve(instances: ActiveInstances, listen: String, base_address: String) -> Result<()> {
let instances = Arc::new(instances);
let base_address = Arc::new(base_address);
let last_instances = instances.clone();
let proxy_client: Client =
hyper_util::client::legacy::Client::<(), ()>::builder(TokioExecutor::new())
.build(HttpConnector::new());
spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
last_instances.update_last().await;
}
});
let cancel = async {
ctrl_c().await.ok();
};
let app = Router::new().fallback(handler).with_state(AppState {
instances: instances.clone(),
base_address: base_address.clone(),
proxy_client: Arc::new(proxy_client),
});
if !listen.starts_with('/') {
let addr: SocketAddr = listen.parse().into_diagnostic()?;
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
println!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app)
.with_graceful_shutdown(cancel)
.await
.unwrap();
} else {
let listen: PathBuf = listen.into();
if let Some(parent) = listen.parent() {
if !parent.exists() {
create_dir_all(parent).into_diagnostic()?;
set_permissions(parent, PermissionsExt::from_mode(0o755)).into_diagnostic()?;
}
}
let _ = tokio::fs::remove_file(&listen).await;
let uds = UnixListener::bind(&listen).unwrap();
set_permissions(&listen, PermissionsExt::from_mode(0o666)).into_diagnostic()?;
axum::serve(uds, app)
.with_graceful_shutdown(cancel)
.await
.unwrap();
}
Ok(())
}
async fn get_remote(
host: Option<&HeaderValue>,
instances: &ActiveInstances,
base_address: &str,
) -> Result<SocketAddr, String> {
let host = match host.and_then(|host| host.to_str().ok()) {
Some(host) => host,
None => return Err("No or invalid hostname provided".into()),
};
let ip = if host == base_address {
instances
.last()
.ok_or_else(|| String::from("No running instance known"))
} else {
let requested_instance = host.split('.').next().unwrap();
if requested_instance == "host-push" {
Ok((IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7867).into())
} else {
instances
.get(requested_instance)
.await
.ok_or_else(|| format!("Error {} has no known ip", requested_instance))
}
};
match ip {
Ok(ip) => Ok(ip),
Err(e) => {
eprintln!("{}", e);
Err(e)
}
}
}
type Client = hyper_util::client::legacy::Client<HttpConnector, Body>;
async fn handler(State(state): State<AppState>, mut req: Request) -> Result<Response, StatusCode> {
let host = req.headers().get(HOST).cloned();
let remote = match get_remote(host.as_ref(), &state.instances, &state.base_address).await {
Ok(remote) => remote,
Err(e) => {
return Ok(hyper::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(e.into())
.unwrap())
}
};
let uri = format!("http://{remote}");
debug!(target = uri, "proxying request");
// fix weird duplicate host header
req.headers_mut().remove(HOST);
if let Some(host) = host {
req.headers_mut().insert(HOST, host.clone());
}
match hyper_reverse_proxy::call(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
&uri,
req,
&state.proxy_client,
)
.await
{
Ok(response) => Ok(response.map(Body::new)),
Err(error) => {
error!(%error, "error while proxying request");
Ok(StatusCode::BAD_REQUEST.into_response())
}
}
}