This commit is contained in:
Robin Appelman 2025-10-30 23:30:00 +01:00
commit ec6c3a0a8b
7 changed files with 404 additions and 40 deletions

View file

@ -6,7 +6,7 @@ use std::str::FromStr;
#[derive(Debug, PartialEq, Clone, Hash, Eq)]
pub struct ForwardDestination {
addr: SocketAddr,
pub addr: SocketAddr,
}
impl From<ForwardDestination> for SocketAddr {

View file

@ -1,20 +1,23 @@
mod namespace;
mod proxy;
use crate::config::{Config, NamespaceConfig, NamespaceName};
use crate::config::{Config, ForwardConfig, NamespaceConfig, NamespaceName};
use crate::daemon::namespace::{NamespaceError, NetNs};
use futures::FutureExt;
use futures::StreamExt;
use futures_concurrency::stream::Merge;
use main_error::MainResult;
use sd_notify::{NotifyState, notify};
use std::io::Error as IoError;
use std::pin::pin;
use futures::FutureExt;
use humansize::{SizeFormatter, BINARY};
use thiserror::Error;
use tokio::runtime::Runtime;
use tokio::signal::ctrl_c;
use tokio::signal::unix::{SignalKind, signal};
use futures::StreamExt;
use futures_concurrency::stream::Merge;
use tokio_stream::wrappers::SignalStream;
use tracing::{debug, error, info};
use crate::daemon::proxy::{ActiveProxy, ProxyError};
pub fn daemon(config: Config) -> MainResult {
let rt = Runtime::new()?;
@ -22,24 +25,21 @@ pub fn daemon(config: Config) -> MainResult {
}
async fn daemon_async(mut config: Config) -> Result<(), DaemonError> {
for namespace in &config.namespaces {
println!("{}:", namespace.name);
for forward in &namespace.forward {
println!(" {} => {}", forward.source, forward.destination);
}
}
let mut state = State::default();
state.update(&config)?;
// now the namespaces are setup, we can tell systemd to start any service depending on them
notify(true, &[NotifyState::Ready]).map_err(DaemonError::Notify)?;
let reload_signal = signal(SignalKind::hangup()).map_err(DaemonError::ReloadSignal)?;
let reload_signal = signal(SignalKind::hangup()).map_err(DaemonError::Signal)?;
let reload_signal = SignalStream::new(reload_signal).map(|_| Event::Reload);
let info_signal = signal(SignalKind::user_defined1()).map_err(DaemonError::Signal)?;
let info_signal = SignalStream::new(info_signal).map(|_| Event::Info);
let quit_signal = ctrl_c().into_stream().map(|_| Event::Quit);
let events = (reload_signal, quit_signal).merge();
let events = (reload_signal, info_signal, quit_signal).merge();
let mut events = pin!(events);
while let Some(event) = events.next().await {
@ -53,18 +53,19 @@ async fn daemon_async(mut config: Config) -> Result<(), DaemonError> {
match NotifyState::monotonic_usec_now() {
Ok(notify_time) => {
notify(true, &[NotifyState::Reloading, notify_time]).map_err(DaemonError::Notify)?;
notify(true, &[NotifyState::Reloading, notify_time])
.map_err(DaemonError::Notify)?;
}
Err(error) => {
error!(%error, "failed to get current time, not sending reload signal");
}
}
match config.reload() {
Ok(new_config) => {
state.update(&new_config)?;
config = new_config;
},
}
Err(error) => {
error!(%error, "Failed to load new config");
}
@ -72,6 +73,22 @@ async fn daemon_async(mut config: Config) -> Result<(), DaemonError> {
notify(true, &[NotifyState::Ready]).map_err(DaemonError::Notify)?;
}
Event::Info => {
for namespace in &state.namespaces {
println!("{}:", namespace.name());
for proxy in &namespace.proxies {
println!(
" {} => {} {} connections ({} active), {} sent to namespace, {} received from namespace",
proxy.source,
proxy.destination,
proxy.stats.total_connections(),
proxy.stats.open_connections(),
SizeFormatter::new(proxy.stats.written(), BINARY),
SizeFormatter::new(proxy.stats.read(), BINARY),
);
}
}
}
}
}
@ -82,6 +99,7 @@ async fn daemon_async(mut config: Config) -> Result<(), DaemonError> {
enum Event {
Reload,
Quit,
Info,
}
#[derive(Default)]
@ -91,11 +109,19 @@ struct State {
impl State {
pub fn update(&mut self, config: &Config) -> Result<(), DaemonError> {
self.namespaces.retain(|existing| {
config
.namespaces
.iter()
.any(|new| &new.name == existing.name())
self.namespaces.retain_mut(|existing| {
if let Some(new_config) =
config
.namespaces
.iter()
.find(|new| &new.name == existing.name()) {
if let Err(error) = existing.update_proxies(new_config) {
error!(%error, "Failed to update proxies for namespace");
}
true
} else {
false
}
});
for new in &config.namespaces {
@ -103,12 +129,12 @@ impl State {
self.namespaces.push(ActiveNamespace::new(new)?);
}
}
Ok(())
}
fn has_namespace(&self, name: &NamespaceName) -> bool {
self
.namespaces
self.namespaces
.iter()
.any(|existing| existing.name() == name)
}
@ -116,12 +142,36 @@ impl State {
struct ActiveNamespace {
ns: NetNs,
proxies: Vec<ActiveProxy>,
}
impl ActiveNamespace {
pub fn new(config: &NamespaceConfig) -> Result<Self, DaemonError> {
let ns = NetNs::new(&config.name)?;
Ok(ActiveNamespace { ns })
let mut namespace = ActiveNamespace {
ns,
proxies: Vec::default(),
};
namespace.update_proxies(config)?;
Ok(namespace)
}
pub fn update_proxies(&mut self, config: &NamespaceConfig) -> Result<(), DaemonError> {
self.proxies.retain(|existing| config.forward.iter().any(|new| existing == new));
for new in &config.forward {
if !self.has_forward(new) {
self.proxies.push(ActiveProxy::new(new)?);
}
}
Ok(())
}
fn has_forward(&self, config: &ForwardConfig) -> bool {
self.proxies.iter().any(|existing| existing == config)
}
pub fn name(&self) -> &NamespaceName {
@ -135,6 +185,8 @@ pub enum DaemonError {
Namespace(#[from] NamespaceError),
#[error("Error sending notification to systemd: {0:#}")]
Notify(IoError),
#[error("Error setting up reload signal listener: {0:#}")]
ReloadSignal(IoError),
#[error("Error setting up signal handler: {0:#}")]
Signal(IoError),
#[error(transparent)]
Proxy(#[from] ProxyError),
}

View file

@ -23,8 +23,16 @@ impl NetNs {
let path = parent.join(name);
let mount_path = path.clone();
let _ =
File::create_new(&path).map_err(|error| NamespaceError::from_create(name, error))?;
match File::create_new(&path) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::AlreadyExists => {
return Ok(NetNs {
name: name.clone(),
path,
});
}
Err(e) => return Err(NamespaceError::from_create(name, e)),
}
let handle: JoinHandle<Result<(), NamespaceError>> = spawn(move || {
unshare(CloneFlags::CLONE_NEWNET).map_err(NamespaceError::Unshare)?;
@ -41,7 +49,7 @@ impl NetNs {
handle.join().unwrap()?;
Ok(NetNs {
name: name.clone(),
path
path,
})
}
}
@ -69,8 +77,6 @@ impl Drop for NetNs {
pub enum NamespaceError {
#[error("Failed to create parent directory for namespaces (/var/run/netns): {0:#}")]
Parent(IoError),
#[error("Network namespace {0} already exists")]
AlreadyExists(NamespaceName),
#[error("Failed to create namespace file {}: {error:#}", path.display())]
Create { path: PathBuf, error: IoError },
#[error("Unexpected error while creating new network namespace: {0:}")]
@ -86,12 +92,9 @@ impl NamespaceError {
}
fn from_create(name: &NamespaceName, error: IoError) -> Self {
match error.kind() {
ErrorKind::AlreadyExists => NamespaceError::AlreadyExists(name.clone()),
_ => NamespaceError::Create {
path: Path::new("/var/run/netns").join(name),
error,
},
NamespaceError::Create {
path: Path::new("/var/run/netns").join(name),
error,
}
}
}

106
src/daemon/proxy/mod.rs Normal file
View file

@ -0,0 +1,106 @@
mod tcp;
use std::fs::remove_file;
use crate::config::{ForwardConfig, ForwardDestination, ForwardSource};
use crate::daemon::proxy::tcp::Proxy;
use futures::future::AbortHandle;
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use thiserror::Error;
use tracing::error;
#[derive(Debug, Error)]
pub enum ProxyError {
#[error("Failed to listen to {address}: {error:#}")]
Bind {
address: ForwardSource,
error: IoError,
},
#[error("Failed to accept proxy connection: {error:#}")]
Accept { error: IoError },
#[error("Failed to connect to proxy destination {destination}: {error:#}")]
Connect {
destination: SocketAddr,
error: IoError,
},
}
pub struct ActiveProxy {
pub source: ForwardSource,
pub destination: ForwardDestination,
abort: AbortHandle,
pub stats: ProxyStats,
}
impl ActiveProxy {
pub fn new(config: &ForwardConfig) -> Result<ActiveProxy, ProxyError> {
let proxy = Proxy::listen(config.source.clone())?;
Ok(proxy.run(config.destination.clone()))
}
}
impl Drop for ActiveProxy {
fn drop(&mut self) {
if let ForwardSource::Unix(path) = &self.source {
if let Err(error) = remove_file(path) {
error!(%error, "failed to remove unix socket");
}
}
self.abort.abort();
}
}
impl PartialEq<ForwardConfig> for ActiveProxy {
fn eq(&self, other: &ForwardConfig) -> bool {
self.source == other.source && self.destination == other.destination
}
}
#[derive(Default, Clone)]
pub struct ProxyStats {
open_connections: Arc<AtomicU64>,
total_connections: Arc<AtomicU64>,
/// Bytes proxied source -> destination
bytes_written: Arc<AtomicU64>,
/// Bytes proxied destination -> source
bytes_read: Arc<AtomicU64>,
}
impl ProxyStats {
pub fn open_connection(&self) {
self.open_connections.fetch_add(1, Ordering::Relaxed);
self.total_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn close_connection(&self) {
self.open_connections.fetch_sub(1, Ordering::Relaxed);
}
pub fn open_connections(&self) -> u64 {
self.open_connections.load(Ordering::Relaxed)
}
pub fn total_connections(&self) -> u64 {
self.total_connections.load(Ordering::Relaxed)
}
pub fn add_read(&self, bytes: u64) {
self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
}
pub fn add_written(&self, bytes: u64) {
self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
}
/// Bytes proxied destination -> source
pub fn read(&self) -> u64 {
self.bytes_read.load(Ordering::Relaxed)
}
/// Bytes proxied source -> destination
pub fn written(&self) -> u64 {
self.bytes_written.load(Ordering::Relaxed)
}
}

168
src/daemon/proxy/tcp.rs Normal file
View file

@ -0,0 +1,168 @@
use std::fs::remove_file;
/// Loosely based on https://github.com/fooker/netns-proxy/blob/main/src/tcp.rs
use crate::config::{ForwardDestination, ForwardSource};
use crate::daemon::proxy::{ActiveProxy, ProxyError, ProxyStats};
use futures::TryStreamExt;
use futures::stream::{AbortHandle, AbortRegistration, Abortable};
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::pin::pin;
use tokio::io::{AsyncRead, AsyncWrite, copy_bidirectional};
use tokio::net::{TcpListener, TcpSocket, TcpStream, UnixListener};
use tokio::spawn;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
use tracing::{Level, debug, error, instrument, span, trace, warn};
#[derive(Debug)]
pub struct Proxy {
source: ForwardSource,
socket: ProxyListener,
}
#[derive(Debug)]
enum ProxyListener {
Tcp(TcpListener),
Unix(UnixListener),
}
fn bind_tcp(addr: SocketAddr) -> Result<TcpListener, IoError> {
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}?;
socket.bind(addr)?;
socket.listen(1024)
}
impl Proxy {
pub fn listen(bind: ForwardSource) -> Result<Self, ProxyError> {
let socket = match &bind {
ForwardSource::Unix(path) => {
let _ = remove_file(path);
UnixListener::bind(path).map(ProxyListener::Unix)
},
ForwardSource::Ip(addr) => bind_tcp(*addr).map(ProxyListener::Tcp),
}
.map_err(|error| ProxyError::Bind {
address: bind.clone(),
error,
})?;
debug!("Created TCP socket");
Ok(Self {
source: bind,
socket,
})
}
pub fn run(self, target: ForwardDestination) -> ActiveProxy {
let (abort_handle, abort) = AbortHandle::new_pair();
let destination = target.clone();
let stats = ProxyStats::default();
let proxy_stats = stats.clone();
spawn(async move {
match self.socket {
ProxyListener::Tcp(socket) => {
run_tcp(socket, target.addr, abort, proxy_stats).await
}
ProxyListener::Unix(socket) => {
run_unix(socket, target.addr, abort, proxy_stats).await
}
}
});
ActiveProxy {
source: self.source,
destination,
abort: abort_handle,
stats,
}
}
}
async fn run_tcp(
socket: TcpListener,
target: SocketAddr,
abort: AbortRegistration,
stats: ProxyStats,
) {
let accepts = TcpListenerStream::new(socket).map_err(|error| ProxyError::Accept { error });
let mut accepts = pin!(Abortable::new(accepts, abort));
while let Some(client) = accepts.next().await {
stats.open_connection();
let result: Result<(), ProxyError> = async {
let client = client?;
let remote = client.peer_addr().ok();
proxy_stream(client, target, remote, stats.clone()).await
}
.await;
if let Err(err) = result {
error!("Error in tcp proxy: {err:?}");
}
}
}
async fn run_unix(
socket: UnixListener,
target: SocketAddr,
abort: AbortRegistration,
stats: ProxyStats,
) {
let accepts = UnixListenerStream::new(socket).map_err(|error| ProxyError::Accept { error });
let mut accepts = pin!(Abortable::new(accepts, abort));
while let Some(client) = accepts.next().await {
stats.open_connection();
let result: Result<(), ProxyError> = async {
let client = client?;
proxy_stream(client, target, None, stats.clone()).await
}
.await;
if let Err(err) = result {
error!("Error in tcp proxy: {err:?}");
}
}
}
#[instrument(skip(stream, proxy))]
async fn proxy_stream<Stream>(
mut stream: Stream,
target: SocketAddr,
remote: Option<SocketAddr>,
proxy: ProxyStats,
) -> Result<(), ProxyError>
where
Stream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
{
let _ = span!(Level::TRACE, "Client connection", connection = ?remote).entered();
trace!("connected");
let mut upstream = TcpStream::connect(target)
.await
.map_err(|error| ProxyError::Connect {
error,
destination: target,
})?;
trace!("Upstream connection established");
spawn(async move {
match copy_bidirectional(&mut stream, &mut upstream).await {
Ok((written, read)) => {
proxy.add_written(written);
proxy.add_read(read);
proxy.close_connection();
trace!("Upstream connection closed");
}
Err(err) => {
warn!("Upstream connection failed: {}", err);
}
}
});
Ok(())
}