1
0
Fork 0
mirror of https://codeberg.org/demostf/sync.git synced 2026-06-03 16:44:07 +02:00

udp socket support

This commit is contained in:
Robin Appelman 2025-05-10 15:34:52 +02:00
commit 65ff6501f4
3 changed files with 92 additions and 17 deletions

22
Cargo.lock generated
View file

@ -542,6 +542,15 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
@ -590,6 +599,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tracing",
"tracing-subscriber",
@ -636,6 +646,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
@ -652,6 +663,17 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.26.2"

View file

@ -5,7 +5,8 @@ authors = ["Robin Appelman <robin@icewind.nl>"]
edition = "2021"
[dependencies]
tokio = { version = "1.45.0", features = ["rt-multi-thread", "macros", "sync"] }
tokio = { version = "1.45.0", features = ["rt-multi-thread", "macros", "sync", "signal"] }
tokio-stream = { version = "0.1.17", features = ["net"]}
tokio-tungstenite = "0.26.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View file

@ -2,21 +2,26 @@ mod session;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::fs::remove_file;
use crate::session::Session;
use dashmap::DashMap;
use futures_channel::mpsc::{channel, Sender};
use futures_util::future::select;
use futures_util::StreamExt;
use futures_util::{FutureExt, Stream, StreamExt};
use futures_util::TryStreamExt;
use main_error::MainResult;
use real_ip::{real_ip, IpNet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tokio::select;
use tokio::signal::ctrl_c;
use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, warn};
@ -154,14 +159,12 @@ impl Server {
});
}
async fn handle_connection(&self, raw_stream: TcpStream, addr: SocketAddr) {
async fn handle_connection<S: AsyncRead + AsyncWrite + Unpin>(&self, raw_stream: S, mut remote_ip: IpAddr) {
debug!("incoming connection");
let mut remote_ip = addr.ip();
let ws_stream_res =
tokio_tungstenite::accept_hdr_async(raw_stream, |req: &Request, response: Response| {
if let Some(ip) = real_ip(req.headers(), addr.ip(), TRUSTED_PROXIES) {
if let Some(ip) = real_ip(req.headers(), remote_ip, TRUSTED_PROXIES) {
remote_ip = ip;
}
Ok::<_, ErrorResponse>(response)
@ -221,24 +224,73 @@ async fn main() -> MainResult {
let port: u16 = std::env::var("PORT")
.unwrap_or_else(|_| "80".to_string())
.parse()?;
let listen_address = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
let socket = std::env::var("SOCKET").ok().map(PathBuf::from);
let state = Arc::new(Server::new());
let shutdown = ctrl_c().map(|_| ());
// Create the event loop and TCP listener we'll accept connections on.
let listener = if let Some(socket) = socket.as_deref() {
if socket.exists() {
remove_file(socket)?;
}
Box::new(listen_unix(socket).await) as Box<dyn Stream<Item=Result<(Box<dyn StreamTrait>, IpAddr), std::io::Error>>>
} else {
let listen_address = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
Box::new(listen_tcp(listen_address).await)
};
let mut listener = Box::into_pin(listener);
let serve = async {
while let Some(Ok((stream, addr))) = listener.next().await {
let state = state.clone();
tokio::spawn(async move { state.handle_connection(stream, addr).await });
}
};
select! {
_ = serve => {
warn!("socket disconnected");
}
_ = shutdown => {
info!("shutdown requested");
}
}
info!("shutting down");
if let Some(socket) = socket.as_deref() {
remove_file(socket)?;
}
Ok(())
}
trait StreamTrait: AsyncRead + AsyncWrite + Send + Unpin {}
impl StreamTrait for TcpStream{}
impl StreamTrait for UnixStream{}
async fn listen_tcp(listen_address: SocketAddr) -> impl Stream<Item=Result<(Box<dyn StreamTrait>, IpAddr), std::io::Error>> {
let listener = TcpListener::bind(&listen_address)
.await
.expect("Failed to bind");
info!("listening on: {:?}", listen_address);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
let state = state.clone();
tokio::spawn(async move { state.handle_connection(stream, addr).await });
}
TcpListenerStream::new(listener).map_ok(|stream| {
let addr = stream.peer_addr().map(|addr| addr.ip()).unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
(Box::new(stream) as Box::<dyn StreamTrait>, addr)
})
}
Ok(())
async fn listen_unix(path: &Path) -> impl Stream<Item=Result<(Box<dyn StreamTrait>, IpAddr), std::io::Error>> {
let listener = UnixListener::bind(path).expect("Failed to bind");
info!("listening on: {}", path.display());
UnixListenerStream::new(listener).map_ok(|stream| {
let addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
(Box::new(stream) as Box::<dyn StreamTrait>, addr)
})
}
const TRUSTED_PROXIES: &[IpNet] = &[IpNet::new_assert(