diff --git a/Cargo.lock b/Cargo.lock index 90f1ad9..ccedb17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,6 +147,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ "axum-core", + "axum-macros", "bytes", "form_urlencoded", "futures-util", @@ -192,6 +193,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -921,25 +933,6 @@ dependencies = [ "url", ] -[[package]] -name = "h2" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2" -dependencies = [ - "atomic-waker", - "bytes", - "fnv", - "futures-core", - "futures-sink", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.15.2" @@ -957,7 +950,7 @@ checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" [[package]] name = "haze" -version = "2.2.1" +version = "2.2.2" dependencies = [ "async-trait", "atty", @@ -991,6 +984,7 @@ dependencies = [ "tar", "termion", "tokio", + "tokio-stream", "toml", "tracing", "tracing-subscriber", @@ -1098,7 +1092,6 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2", "http", "http-body", "httparse", @@ -1129,8 +1122,9 @@ dependencies = [ [[package]] name = "hyper-reverse-proxy" version = "0.5.2-dev" -source = "git+https://github.com/chpio/hyper-reverse-proxy?rev=6934877eb74465204f605cc1c05ca5a9772db7c0#6934877eb74465204f605cc1c05ca5a9772db7c0" +source = "git+https://code.betamike.com/micropelago/hyper-reverse-proxy.git?rev=d5a6f799189360d9449ae47ab3cdde511f02cf39#d5a6f799189360d9449ae47ab3cdde511f02cf39" dependencies = [ + "http-body-util", "hyper", "hyper-util", "tokio", @@ -2741,6 +2735,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.13" diff --git a/Cargo.toml b/Cargo.toml index 947de7c..6da76a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "haze" -version = "2.2.1" +version = "2.2.2" authors = ["Robin Appelman "] edition = "2021" repository = "https://codeberg.org/icewind/haze" @@ -12,6 +12,7 @@ bollard = "0.20.1" maplit = "1.0.2" camino = { version = "1.2.2", features = ["serde1"] } tokio = { version = "1.49.0", features = ["fs", "macros", "signal", "rt-multi-thread"] } +tokio-stream = { version = "0.1.18", features = ["net"] } futures-util = "0.3.32" termion = "4.0.6" opener = "0.8.4" @@ -41,10 +42,10 @@ base16ct = { version = "1.0.0", features = ["alloc"] } indicatif = "0.18.4" rayon = "1.12.0" -hyper-reverse-proxy = { version = "0.5.2-dev", git = "https://github.com/chpio/hyper-reverse-proxy", rev = "6934877eb74465204f605cc1c05ca5a9772db7c0" } +hyper-reverse-proxy = { version = "0.5.2-dev", git = "https://code.betamike.com/micropelago/hyper-reverse-proxy.git", rev = "d5a6f799189360d9449ae47ab3cdde511f02cf39" } hyper = "1.8.1" hyper-util = "0.1.20" -axum = { version = "0.8.8", features = ["tokio"] } +axum = { version = "0.8.8", features = ["tokio", "macros"] } [profile.release] lto = true diff --git a/nix/package.nix b/nix/package.nix index fcd6489..34a4388 100644 --- a/nix/package.nix +++ b/nix/package.nix @@ -20,7 +20,7 @@ in cargoLock = { lockFile = ../Cargo.lock; outputHashes = { - "hyper-reverse-proxy-0.5.2-dev" = "sha256-+ebi4FVVkiOpf75e8K5oGkHJBYQjLNJhUPNj+78zd7Q="; + "hyper-reverse-proxy-0.5.2-dev" = "sha256-awmj5aLFTea+kj81cwmfP1HWlWezwEKfyQSUJWjtamk="; }; }; } diff --git a/src/proxy.rs b/src/proxy.rs index d5a3b0e..4784a18 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -5,26 +5,34 @@ use axum::http::header::HOST; use axum::http::HeaderValue; use axum::{ body::Body, - extract::{Request, State}, + extract::Request, response::{IntoResponse, Response}, - Router, }; use bollard::Docker; +use futures_util::StreamExt; +use hyper::body::Incoming; +use hyper::server::conn::http1; +use hyper::service::service_fn; use hyper::StatusCode; +use hyper_util::rt::TokioIo; use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor}; use miette::{miette, IntoDiagnostic}; use std::collections::HashMap; +use std::convert::Infallible; use std::fs::{create_dir_all, set_permissions}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; +use std::pin::pin; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::UnixListener; use tokio::signal::ctrl_c; use tokio::spawn; use tokio::time::sleep; +use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream}; use tracing::{debug, error, info}; struct ActiveInstances { @@ -163,20 +171,26 @@ async fn serve(instances: ActiveInstances, listen: String, base_address: String) ctrl_c().await.ok(); }; - let app = Router::new().fallback(handler).with_state(AppState { + let state = AppState { instances: instances.clone(), base_address: base_address.clone(), proxy_client: Arc::new(proxy_client), - }); + }; if !listen.starts_with('/') { let addr: SocketAddr = listen.parse().into_diagnostic()?; let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); println!("listening on {}", listener.local_addr().unwrap()); - axum::serve(listener, app) - .with_graceful_shutdown(cancel) - .await - .unwrap(); + let mut connections = pin!(TcpListenerStream::new(listener).take_until(cancel)); + + while let Some(stream) = connections.next().await { + match stream { + Ok(stream) => handle_connection(state.clone(), stream), + Err(error) => { + error!(%error, "connection failed"); + } + } + } } else { let listen: PathBuf = listen.into(); if let Some(parent) = listen.parent() { @@ -187,18 +201,42 @@ async fn serve(instances: ActiveInstances, listen: String, base_address: String) } let _ = tokio::fs::remove_file(&listen).await; - let uds = UnixListener::bind(&listen).unwrap(); + let listener = UnixListener::bind(&listen).unwrap(); + println!("listening on {}", listen.display()); set_permissions(&listen, PermissionsExt::from_mode(0o666)).into_diagnostic()?; - axum::serve(uds, app) - .with_graceful_shutdown(cancel) - .await - .unwrap(); + let mut connections = pin!(UnixListenerStream::new(listener).take_until(cancel)); + + while let Some(stream) = connections.next().await { + match stream { + Ok(stream) => handle_connection(state.clone(), stream), + Err(error) => { + error!(%error, "connection failed"); + } + } + } } Ok(()) } +fn handle_connection( + state: AppState, + stream: I, +) { + let io = TokioIo::new(stream); + // Spawn a tokio task to serve multiple connections concurrently + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .serve_connection(io, service_fn(move |req| handler(state.clone(), req))) + .with_upgrades() + .await + { + eprintln!("Error serving connection: {:?}", err); + } + }); +} + async fn get_remote( host: Option<&HeaderValue>, instances: &ActiveInstances, @@ -232,9 +270,9 @@ async fn get_remote( } } -type Client = hyper_util::client::legacy::Client; +type Client = hyper_util::client::legacy::Client; -async fn handler(State(state): State, mut req: Request) -> Result { +async fn handler(state: AppState, mut req: Request) -> Result { let host = req.headers().get(HOST).cloned(); let remote = match get_remote(host.as_ref(), &state.instances, &state.base_address).await { Ok(remote) => remote, @@ -259,13 +297,13 @@ async fn handler(State(state): State, mut req: Request) -> Result Ok(response.map(Body::new)), Err(error) => { - error!(%error, "error while proxying request"); + error!(?error, "error while proxying request"); Ok(StatusCode::BAD_REQUEST.into_response()) } } diff --git a/src/service.rs b/src/service.rs index 49bb636..490f617 100644 --- a/src/service.rs +++ b/src/service.rs @@ -14,6 +14,7 @@ mod sftp; mod redis; mod sharded; mod smb; +mod webhook; use crate::cloud::CloudOptions; use crate::config::{HazeConfig, Preset, ProxyConfig}; @@ -32,6 +33,7 @@ use crate::service::redis::Redis; use crate::service::sftp::Sftp; use crate::service::sharded::{Sharding, ShardingMigrate, ShardingMigrateUnset, SingleShard}; use crate::service::smb::Smb; +use crate::service::webhook::Webhook; use bollard::models::ContainerState; use bollard::Docker; use enum_dispatch::enum_dispatch; @@ -296,6 +298,8 @@ pub enum ServiceType { RedisTls, /// Use FrankenPHP instead of PHP-FPM FrankenPhp, + /// Webhook test listener + Webhook, } #[enum_dispatch] @@ -326,6 +330,7 @@ pub enum Service { Redis(Redis), RedisTls(RedisTls), FrankenPhp(FrankenPhp), + Webhook(Webhook), Preset(PresetService), } @@ -369,6 +374,7 @@ impl Service { ServiceType::Redis => Some(vec![Service::Redis(Redis)]), ServiceType::RedisTls => Some(vec![Service::RedisTls(RedisTls)]), ServiceType::FrankenPhp => Some(vec![Service::FrankenPhp(FrankenPhp)]), + ServiceType::Webhook => Some(vec![Service::Webhook(Webhook)]), } } else { presets diff --git a/src/service/webhook.rs b/src/service/webhook.rs new file mode 100644 index 0000000..3967483 --- /dev/null +++ b/src/service/webhook.rs @@ -0,0 +1,71 @@ +use crate::cloud::CloudOptions; +use crate::config::HazeConfig; +use crate::image::pull_image; +use crate::service::ServiceTrait; +use crate::Result; +use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkingConfig}; +use bollard::query_parameters::CreateContainerOptions; +use bollard::Docker; +use maplit::hashmap; +use miette::IntoDiagnostic; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Webhook; + +#[async_trait::async_trait] +impl ServiceTrait for Webhook { + fn name(&self) -> &str { + "webhook" + } + + async fn spawn( + &self, + docker: &Docker, + cloud_id: &str, + network: &str, + _config: &HazeConfig, + _options: &CloudOptions, + ) -> Result> { + let image = "ghcr.io/tarampampam/webhook-tester"; + pull_image(docker, image).await?; + let options = Some(CreateContainerOptions { + name: self.container_name(cloud_id), + ..CreateContainerOptions::default() + }); + let config = ContainerCreateBody { + image: Some(image.into()), + host_config: Some(HostConfig { + network_mode: Some(network.to_string()), + ..Default::default() + }), + labels: Some(hashmap! { + "haze-type".into() => self.name().into(), + "haze-cloud-id".into() => cloud_id.into(), + }), + networking_config: Some(NetworkingConfig { + endpoints_config: Some(hashmap! { + network.into() => EndpointSettings { + aliases: Some(vec![self.name().to_string()]), + ..Default::default() + } + }), + }), + ..Default::default() + }; + let id = docker + .create_container(options, config) + .await + .into_diagnostic()? + .id; + docker.start_container(&id, None).await.into_diagnostic()?; + Ok(vec![id]) + } + + fn container_name(&self, cloud_id: &str) -> Option { + Some(format!("{}-webhook", cloud_id)) + } + + fn proxy_port(&self) -> u16 { + 8080 + } +}