register managed namespaces with symlinks

This commit is contained in:
Robin Appelman 2025-11-01 16:23:41 +01:00
commit d168b4bf4d
4 changed files with 54 additions and 5 deletions

View file

@ -1,191 +0,0 @@
use crate::config::{Config, ForwardConfig, NamespaceConfig, NamespaceName};
use crate::namespace::{NamespaceError, NetNs};
use crate::proxy::{ActiveProxy, ProxyError};
use futures::FutureExt;
use futures::StreamExt;
use futures_concurrency::stream::Merge;
use humansize::{BINARY, SizeFormatter};
use main_error::MainResult;
use sd_notify::{NotifyState, notify};
use std::io::Error as IoError;
use std::pin::pin;
use thiserror::Error;
use tokio::runtime::Runtime;
use tokio::signal::ctrl_c;
use tokio::signal::unix::{SignalKind, signal};
use tokio_stream::wrappers::SignalStream;
use tracing::{debug, error, info};
pub fn daemon(config: Config) -> MainResult {
let rt = Runtime::new()?;
Ok(rt.block_on(daemon_async(config))?)
}
async fn daemon_async(mut config: Config) -> Result<(), DaemonError> {
let mut state = State::default();
state.update(&config)?;
// now the namespaces are setup, we can tell systemd to start any service depending on them
notify(false, &[NotifyState::Ready]).map_err(DaemonError::Notify)?;
let reload_signal = signal(SignalKind::hangup()).map_err(DaemonError::Signal)?;
let reload_signal = SignalStream::new(reload_signal).map(|_| Event::Reload);
let info_signal = signal(SignalKind::user_defined1()).map_err(DaemonError::Signal)?;
let info_signal = SignalStream::new(info_signal).map(|_| Event::Info);
let stop_signal = signal(SignalKind::terminate()).map_err(DaemonError::Signal)?;
let stop_signal = SignalStream::new(stop_signal).map(|_| Event::Quit);
let quit_signal = ctrl_c().into_stream().map(|_| Event::Quit);
let events = (reload_signal, info_signal, stop_signal, quit_signal).merge();
let mut events = pin!(events);
while let Some(event) = events.next().await {
debug!(?event, "handling event");
match event {
Event::Quit => {
break;
}
Event::Reload => {
info!("reloading config");
match NotifyState::monotonic_usec_now() {
Ok(notify_time) => {
notify(false, &[NotifyState::Reloading, notify_time])
.map_err(DaemonError::Notify)?;
}
Err(error) => {
error!(%error, "failed to get current time, not sending reload signal");
}
}
match config.reload() {
Ok(new_config) => {
state.update(&new_config)?;
config = new_config;
}
Err(error) => {
error!(%error, "Failed to load new config");
}
}
notify(false, &[NotifyState::Ready]).map_err(DaemonError::Notify)?;
}
Event::Info => {
for namespace in &state.namespaces {
println!("{}:", namespace.name());
for proxy in &namespace.proxies {
println!(
" {} => {} {} connections ({} active), {} sent to namespace, {} received from namespace",
proxy.source,
proxy.destination,
proxy.stats.total_connections(),
proxy.stats.open_connections(),
SizeFormatter::new(proxy.stats.written(), BINARY),
SizeFormatter::new(proxy.stats.read(), BINARY),
);
}
}
}
}
}
let _ = notify(false, &[NotifyState::Stopping]);
Ok(())
}
#[derive(Debug)]
enum Event {
Reload,
Quit,
Info,
}
#[derive(Default)]
struct State {
namespaces: Vec<ActiveNamespace>,
}
impl State {
pub fn update(&mut self, config: &Config) -> Result<(), DaemonError> {
for removed in self.namespaces.extract_if(.., |namespace| {
config.get_namespace(namespace.name()).is_none()
}) {
removed.ns.delete()?;
}
for new in &config.namespaces {
if !self.has_namespace(&new.name) {
self.namespaces.push(ActiveNamespace::new(new)?);
}
}
for namespace in &mut self.namespaces {
let config = config.get_namespace(namespace.name()).unwrap();
namespace.update_proxies(config)?;
}
Ok(())
}
fn has_namespace(&self, name: &NamespaceName) -> bool {
self.namespaces
.iter()
.any(|existing| existing.name() == name)
}
}
struct ActiveNamespace {
ns: NetNs,
proxies: Vec<ActiveProxy>,
}
impl ActiveNamespace {
pub fn new(config: &NamespaceConfig) -> Result<Self, DaemonError> {
let ns = NetNs::new(config.name.clone())?;
let mut namespace = ActiveNamespace {
ns,
proxies: Vec::default(),
};
namespace.update_proxies(config)?;
Ok(namespace)
}
pub fn update_proxies(&mut self, config: &NamespaceConfig) -> Result<(), DaemonError> {
self.proxies
.retain(|existing| config.forward.iter().any(|new| existing == new));
for new in &config.forward {
if !self.has_forward(new) {
self.proxies.push(ActiveProxy::new(new, &config.name)?);
}
}
Ok(())
}
fn has_forward(&self, config: &ForwardConfig) -> bool {
self.proxies.iter().any(|existing| existing == config)
}
pub fn name(&self) -> &NamespaceName {
self.ns.name()
}
}
#[derive(Debug, Error)]
pub enum DaemonError {
#[error(transparent)]
Namespace(#[from] NamespaceError),
#[error("Error sending notification to systemd: {0:#}")]
Notify(IoError),
#[error("Error setting up signal handler: {0:#}")]
Signal(IoError),
#[error(transparent)]
Proxy(#[from] ProxyError),
}