1
0
Fork 0
mirror of https://codeberg.org/icewind/haze.git synced 2026-06-03 09:04:12 +02:00

improve websocket proxying

This commit is contained in:
Robin Appelman 2026-05-08 20:24:17 +02:00
commit ad999702aa
6 changed files with 164 additions and 43 deletions

49
Cargo.lock generated
View file

@ -147,6 +147,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [ dependencies = [
"axum-core", "axum-core",
"axum-macros",
"bytes", "bytes",
"form_urlencoded", "form_urlencoded",
"futures-util", "futures-util",
@ -192,6 +193,17 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "axum-macros"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.74" version = "0.3.74"
@ -921,25 +933,6 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "h2"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.15.2" version = "0.15.2"
@ -957,7 +950,7 @@ checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]] [[package]]
name = "haze" name = "haze"
version = "2.2.1" version = "2.2.2"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"atty", "atty",
@ -991,6 +984,7 @@ dependencies = [
"tar", "tar",
"termion", "termion",
"tokio", "tokio",
"tokio-stream",
"toml", "toml",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@ -1098,7 +1092,6 @@ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"h2",
"http", "http",
"http-body", "http-body",
"httparse", "httparse",
@ -1129,8 +1122,9 @@ dependencies = [
[[package]] [[package]]
name = "hyper-reverse-proxy" name = "hyper-reverse-proxy"
version = "0.5.2-dev" version = "0.5.2-dev"
source = "git+https://github.com/chpio/hyper-reverse-proxy?rev=6934877eb74465204f605cc1c05ca5a9772db7c0#6934877eb74465204f605cc1c05ca5a9772db7c0" source = "git+https://code.betamike.com/micropelago/hyper-reverse-proxy.git?rev=d5a6f799189360d9449ae47ab3cdde511f02cf39#d5a6f799189360d9449ae47ab3cdde511f02cf39"
dependencies = [ dependencies = [
"http-body-util",
"hyper", "hyper",
"hyper-util", "hyper-util",
"tokio", "tokio",
@ -2741,6 +2735,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.13" version = "0.7.13"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "haze" name = "haze"
version = "2.2.1" version = "2.2.2"
authors = ["Robin Appelman <robin@icewind.nl>"] authors = ["Robin Appelman <robin@icewind.nl>"]
edition = "2021" edition = "2021"
repository = "https://codeberg.org/icewind/haze" repository = "https://codeberg.org/icewind/haze"
@ -12,6 +12,7 @@ bollard = "0.20.1"
maplit = "1.0.2" maplit = "1.0.2"
camino = { version = "1.2.2", features = ["serde1"] } camino = { version = "1.2.2", features = ["serde1"] }
tokio = { version = "1.49.0", features = ["fs", "macros", "signal", "rt-multi-thread"] } tokio = { version = "1.49.0", features = ["fs", "macros", "signal", "rt-multi-thread"] }
tokio-stream = { version = "0.1.18", features = ["net"] }
futures-util = "0.3.32" futures-util = "0.3.32"
termion = "4.0.6" termion = "4.0.6"
opener = "0.8.4" opener = "0.8.4"
@ -41,10 +42,10 @@ base16ct = { version = "1.0.0", features = ["alloc"] }
indicatif = "0.18.4" indicatif = "0.18.4"
rayon = "1.12.0" rayon = "1.12.0"
hyper-reverse-proxy = { version = "0.5.2-dev", git = "https://github.com/chpio/hyper-reverse-proxy", rev = "6934877eb74465204f605cc1c05ca5a9772db7c0" } hyper-reverse-proxy = { version = "0.5.2-dev", git = "https://code.betamike.com/micropelago/hyper-reverse-proxy.git", rev = "d5a6f799189360d9449ae47ab3cdde511f02cf39" }
hyper = "1.8.1" hyper = "1.8.1"
hyper-util = "0.1.20" hyper-util = "0.1.20"
axum = { version = "0.8.8", features = ["tokio"] } axum = { version = "0.8.8", features = ["tokio", "macros"] }
[profile.release] [profile.release]
lto = true lto = true

View file

@ -20,7 +20,7 @@ in
cargoLock = { cargoLock = {
lockFile = ../Cargo.lock; lockFile = ../Cargo.lock;
outputHashes = { outputHashes = {
"hyper-reverse-proxy-0.5.2-dev" = "sha256-+ebi4FVVkiOpf75e8K5oGkHJBYQjLNJhUPNj+78zd7Q="; "hyper-reverse-proxy-0.5.2-dev" = "sha256-awmj5aLFTea+kj81cwmfP1HWlWezwEKfyQSUJWjtamk=";
}; };
}; };
} }

View file

@ -5,26 +5,34 @@ use axum::http::header::HOST;
use axum::http::HeaderValue; use axum::http::HeaderValue;
use axum::{ use axum::{
body::Body, body::Body,
extract::{Request, State}, extract::Request,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
Router,
}; };
use bollard::Docker; use bollard::Docker;
use futures_util::StreamExt;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::StatusCode; use hyper::StatusCode;
use hyper_util::rt::TokioIo;
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor}; use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor};
use miette::{miette, IntoDiagnostic}; use miette::{miette, IntoDiagnostic};
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible;
use std::fs::{create_dir_all, set_permissions}; use std::fs::{create_dir_all, set_permissions};
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio::signal::ctrl_c; use tokio::signal::ctrl_c;
use tokio::spawn; use tokio::spawn;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
struct ActiveInstances { struct ActiveInstances {
@ -163,20 +171,26 @@ async fn serve(instances: ActiveInstances, listen: String, base_address: String)
ctrl_c().await.ok(); ctrl_c().await.ok();
}; };
let app = Router::new().fallback(handler).with_state(AppState { let state = AppState {
instances: instances.clone(), instances: instances.clone(),
base_address: base_address.clone(), base_address: base_address.clone(),
proxy_client: Arc::new(proxy_client), proxy_client: Arc::new(proxy_client),
}); };
if !listen.starts_with('/') { if !listen.starts_with('/') {
let addr: SocketAddr = listen.parse().into_diagnostic()?; let addr: SocketAddr = listen.parse().into_diagnostic()?;
let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
println!("listening on {}", listener.local_addr().unwrap()); println!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app) let mut connections = pin!(TcpListenerStream::new(listener).take_until(cancel));
.with_graceful_shutdown(cancel)
.await while let Some(stream) = connections.next().await {
.unwrap(); match stream {
Ok(stream) => handle_connection(state.clone(), stream),
Err(error) => {
error!(%error, "connection failed");
}
}
}
} else { } else {
let listen: PathBuf = listen.into(); let listen: PathBuf = listen.into();
if let Some(parent) = listen.parent() { if let Some(parent) = listen.parent() {
@ -187,18 +201,42 @@ async fn serve(instances: ActiveInstances, listen: String, base_address: String)
} }
let _ = tokio::fs::remove_file(&listen).await; let _ = tokio::fs::remove_file(&listen).await;
let uds = UnixListener::bind(&listen).unwrap(); let listener = UnixListener::bind(&listen).unwrap();
println!("listening on {}", listen.display());
set_permissions(&listen, PermissionsExt::from_mode(0o666)).into_diagnostic()?; set_permissions(&listen, PermissionsExt::from_mode(0o666)).into_diagnostic()?;
axum::serve(uds, app) let mut connections = pin!(UnixListenerStream::new(listener).take_until(cancel));
.with_graceful_shutdown(cancel)
.await while let Some(stream) = connections.next().await {
.unwrap(); match stream {
Ok(stream) => handle_connection(state.clone(), stream),
Err(error) => {
error!(%error, "connection failed");
}
}
}
} }
Ok(()) Ok(())
} }
fn handle_connection<I: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
state: AppState,
stream: I,
) {
let io = TokioIo::new(stream);
// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(move |req| handler(state.clone(), req)))
.with_upgrades()
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
async fn get_remote( async fn get_remote(
host: Option<&HeaderValue>, host: Option<&HeaderValue>,
instances: &ActiveInstances, instances: &ActiveInstances,
@ -232,9 +270,9 @@ async fn get_remote(
} }
} }
type Client = hyper_util::client::legacy::Client<HttpConnector, Body>; type Client = hyper_util::client::legacy::Client<HttpConnector, Incoming>;
async fn handler(State(state): State<AppState>, mut req: Request) -> Result<Response, StatusCode> { async fn handler(state: AppState, mut req: Request<Incoming>) -> Result<Response, Infallible> {
let host = req.headers().get(HOST).cloned(); let host = req.headers().get(HOST).cloned();
let remote = match get_remote(host.as_ref(), &state.instances, &state.base_address).await { let remote = match get_remote(host.as_ref(), &state.instances, &state.base_address).await {
Ok(remote) => remote, Ok(remote) => remote,
@ -259,13 +297,13 @@ async fn handler(State(state): State<AppState>, mut req: Request) -> Result<Resp
IpAddr::V4(Ipv4Addr::UNSPECIFIED), IpAddr::V4(Ipv4Addr::UNSPECIFIED),
&uri, &uri,
req, req,
&state.proxy_client, state.proxy_client.as_ref(),
) )
.await .await
{ {
Ok(response) => Ok(response.map(Body::new)), Ok(response) => Ok(response.map(Body::new)),
Err(error) => { Err(error) => {
error!(%error, "error while proxying request"); error!(?error, "error while proxying request");
Ok(StatusCode::BAD_REQUEST.into_response()) Ok(StatusCode::BAD_REQUEST.into_response())
} }
} }

View file

@ -14,6 +14,7 @@ mod sftp;
mod redis; mod redis;
mod sharded; mod sharded;
mod smb; mod smb;
mod webhook;
use crate::cloud::CloudOptions; use crate::cloud::CloudOptions;
use crate::config::{HazeConfig, Preset, ProxyConfig}; use crate::config::{HazeConfig, Preset, ProxyConfig};
@ -32,6 +33,7 @@ use crate::service::redis::Redis;
use crate::service::sftp::Sftp; use crate::service::sftp::Sftp;
use crate::service::sharded::{Sharding, ShardingMigrate, ShardingMigrateUnset, SingleShard}; use crate::service::sharded::{Sharding, ShardingMigrate, ShardingMigrateUnset, SingleShard};
use crate::service::smb::Smb; use crate::service::smb::Smb;
use crate::service::webhook::Webhook;
use bollard::models::ContainerState; use bollard::models::ContainerState;
use bollard::Docker; use bollard::Docker;
use enum_dispatch::enum_dispatch; use enum_dispatch::enum_dispatch;
@ -296,6 +298,8 @@ pub enum ServiceType {
RedisTls, RedisTls,
/// Use FrankenPHP instead of PHP-FPM /// Use FrankenPHP instead of PHP-FPM
FrankenPhp, FrankenPhp,
/// Webhook test listener
Webhook,
} }
#[enum_dispatch] #[enum_dispatch]
@ -326,6 +330,7 @@ pub enum Service {
Redis(Redis), Redis(Redis),
RedisTls(RedisTls), RedisTls(RedisTls),
FrankenPhp(FrankenPhp), FrankenPhp(FrankenPhp),
Webhook(Webhook),
Preset(PresetService), Preset(PresetService),
} }
@ -369,6 +374,7 @@ impl Service {
ServiceType::Redis => Some(vec![Service::Redis(Redis)]), ServiceType::Redis => Some(vec![Service::Redis(Redis)]),
ServiceType::RedisTls => Some(vec![Service::RedisTls(RedisTls)]), ServiceType::RedisTls => Some(vec![Service::RedisTls(RedisTls)]),
ServiceType::FrankenPhp => Some(vec![Service::FrankenPhp(FrankenPhp)]), ServiceType::FrankenPhp => Some(vec![Service::FrankenPhp(FrankenPhp)]),
ServiceType::Webhook => Some(vec![Service::Webhook(Webhook)]),
} }
} else { } else {
presets presets

71
src/service/webhook.rs Normal file
View file

@ -0,0 +1,71 @@
use crate::cloud::CloudOptions;
use crate::config::HazeConfig;
use crate::image::pull_image;
use crate::service::ServiceTrait;
use crate::Result;
use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkingConfig};
use bollard::query_parameters::CreateContainerOptions;
use bollard::Docker;
use maplit::hashmap;
use miette::IntoDiagnostic;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Webhook;
#[async_trait::async_trait]
impl ServiceTrait for Webhook {
fn name(&self) -> &str {
"webhook"
}
async fn spawn(
&self,
docker: &Docker,
cloud_id: &str,
network: &str,
_config: &HazeConfig,
_options: &CloudOptions,
) -> Result<Vec<String>> {
let image = "ghcr.io/tarampampam/webhook-tester";
pull_image(docker, image).await?;
let options = Some(CreateContainerOptions {
name: self.container_name(cloud_id),
..CreateContainerOptions::default()
});
let config = ContainerCreateBody {
image: Some(image.into()),
host_config: Some(HostConfig {
network_mode: Some(network.to_string()),
..Default::default()
}),
labels: Some(hashmap! {
"haze-type".into() => self.name().into(),
"haze-cloud-id".into() => cloud_id.into(),
}),
networking_config: Some(NetworkingConfig {
endpoints_config: Some(hashmap! {
network.into() => EndpointSettings {
aliases: Some(vec![self.name().to_string()]),
..Default::default()
}
}),
}),
..Default::default()
};
let id = docker
.create_container(options, config)
.await
.into_diagnostic()?
.id;
docker.start_container(&id, None).await.into_diagnostic()?;
Ok(vec![id])
}
fn container_name(&self, cloud_id: &str) -> Option<String> {
Some(format!("{}-webhook", cloud_id))
}
fn proxy_port(&self) -> u16 {
8080
}
}