use crate::config::{ Config, DeviceName, ForwardConfig, NamespaceConfig, NamespaceName, RouteConfig, }; use crate::link::{LinkError, LinkManager}; use crate::namespace::{ NamespaceEnterError, NamespaceError, NamespaceHandle, NamespaceHandleError, NetNs, }; use crate::proxy::{ActiveProxy, ProxyError}; use futures::FutureExt; use futures::StreamExt; use futures_concurrency::stream::Merge; use main_error::MainResult; use sd_notify::{NotifyState, notify}; use std::io::Error as IoError; use std::pin::pin; use thiserror::Error; use tokio::runtime::Builder; use tokio::signal::ctrl_c; use tokio::signal::unix::{SignalKind, signal}; use tokio_stream::wrappers::SignalStream; use tracing::{debug, error, info}; pub fn daemon(config: Config) -> MainResult { let rt = Builder::new_current_thread().enable_io().build()?; Ok(rt.block_on(daemon_async(config))?) } async fn daemon_async(mut config: Config) -> Result<(), DaemonError> { let mut state = State::new()?; state.update(&config)?; // now the namespaces are setup, we can tell systemd to start any service depending on them notify(&[NotifyState::Ready]).map_err(DaemonError::Notify)?; let reload_signal = signal(SignalKind::hangup()).map_err(DaemonError::Signal)?; let reload_signal = SignalStream::new(reload_signal).map(|_| Event::Reload); let info_signal = signal(SignalKind::user_defined1()).map_err(DaemonError::Signal)?; let info_signal = SignalStream::new(info_signal).map(|_| Event::Info); let stop_signal = signal(SignalKind::terminate()).map_err(DaemonError::Signal)?; let stop_signal = SignalStream::new(stop_signal).map(|_| Event::Quit); let quit_signal = ctrl_c().into_stream().map(|_| Event::Quit); let events = (reload_signal, info_signal, stop_signal, quit_signal).merge(); let mut events = pin!(events); while let Some(event) = events.next().await { debug!(?event, "handling event"); match event { Event::Quit => { break; } Event::Reload => { info!("reloading config"); match NotifyState::monotonic_usec_now() { Ok(notify_time) => { notify(&[NotifyState::Reloading, notify_time]) .map_err(DaemonError::Notify)?; } Err(error) => { error!(%error, "failed to get current time, not sending reload signal"); } } match config.reload() { Ok(new_config) => { state.update(&new_config)?; config = new_config; } Err(error) => { error!(%error, "Failed to load new config"); } } notify(&[NotifyState::Ready]).map_err(DaemonError::Notify)?; } Event::Info => { for namespace in &state.namespaces { println!("{}:", namespace.name()); for proxy in &namespace.proxies { println!(" {} => {}", proxy.source, proxy.destination,); } } } } } let _ = notify(&[NotifyState::Stopping]); Ok(()) } #[derive(Debug)] enum Event { Reload, Quit, Info, } struct State { namespaces: Vec, } impl State { pub fn new() -> Result { let namespaces = NetNs::existing(false)? .map(ActiveNamespace::new) .collect::, _>>()?; Ok(State { namespaces }) } pub fn update(&mut self, config: &Config) -> Result<(), DaemonError> { for removed in self.namespaces.extract_if(.., |namespace| { config.get_namespace(namespace.name()).is_none() }) { removed.ns.delete()?; } for new in &config.namespaces { if !self.has_namespace(&new.name) { self.namespaces .push(ActiveNamespace::new(new.name.clone())?); } } for namespace in &mut self.namespaces { let config = config.get_namespace(namespace.name()).unwrap(); namespace.update_proxies(config)?; namespace.update_devices(config)?; namespace.update_links(config)?; } Ok(()) } fn has_namespace(&self, name: &NamespaceName) -> bool { self.namespaces .iter() .any(|existing| existing.name() == name) } } struct ActiveNamespace { ns: NetNs, proxies: Vec, devices: Vec, routes: Vec, } impl ActiveNamespace { pub fn new(name: NamespaceName) -> Result { let ns = NetNs::new(name)?; Ok(ActiveNamespace { ns, proxies: Vec::default(), devices: Vec::default(), routes: Vec::default(), }) } pub fn update_proxies(&mut self, config: &NamespaceConfig) -> Result<(), DaemonError> { self.proxies .retain(|existing| config.forward.iter().any(|new| existing == new)); for new in &config.forward { if !self.has_forward(new) { self.proxies.push(ActiveProxy::new(new, &config.name)?); } } Ok(()) } pub fn update_devices(&mut self, config: &NamespaceConfig) -> Result<(), DaemonError> { let parent_namespace = NamespaceHandle::parent()?; let removed: Vec<_> = self .devices .extract_if(.., |existing| { !config.devices.iter().any(|new| existing == new) }) .collect(); self.ns.handle().run_in(move || { let link_manager = LinkManager::new()?; for link in link_manager.get_links()?.flatten() { if removed.iter().any(|name| *name == link.name.as_str()) { info!(namespace = %config.name, link = link.name , "moving link out of namespace"); link_manager.move_link(&link, &parent_namespace)? } } Ok::<_, LinkError>(()) })??; let mut added = Vec::new(); for new in &config.devices { if !self.has_device(new) { added.push(new.clone()); } } let link_manager = LinkManager::new()?; for link in link_manager.get_links()?.flatten() { if added.iter().any(|name| *name == link.name.as_str()) { info!(namespace = %config.name, link = link.name , "moving link into namespace"); link_manager.move_link(&link, self.ns.handle())? } } for new in added { self.devices.push(new); } Ok(()) } pub fn update_links(&mut self, config: &NamespaceConfig) -> Result<(), DaemonError> { let removed: Vec<_> = self .routes .extract_if(.., |existing| { !config.routes.iter().any(|new| existing == new) }) .collect(); let mut added = Vec::new(); for new in &config.routes { if !self.has_route(new) { added.push(new.clone()); } } self.ns.handle().run_in(|| { let link_manager = LinkManager::new()?; for link in link_manager.get_links()?.flatten() { if let Some(route) = removed .iter() .find(|route| route.device == link.name.as_str()) { info!(namespace = %config.name, %route, "deleting route"); link_manager.delete_route(&link, route.destination)?; } } for link in link_manager.get_links()?.flatten() { if let Some(route) = added .iter() .find(|route| route.device == link.name.as_str()) { info!(namespace = %config.name, %route, "adding route"); link_manager.add_route(&link, route.destination)?; } } Ok::<_, DaemonError>(()) })??; for new in added { self.routes.push(new); } Ok(()) } fn has_forward(&self, config: &ForwardConfig) -> bool { self.proxies.iter().any(|existing| existing == config) } fn has_device(&self, name: &DeviceName) -> bool { self.devices.iter().any(|existing| existing == name) } fn has_route(&self, route: &RouteConfig) -> bool { self.routes.iter().any(|existing| existing == route) } pub fn name(&self) -> &NamespaceName { self.ns.name() } } #[derive(Debug, Error)] pub enum DaemonError { #[error(transparent)] Namespace(#[from] NamespaceError), #[error("Error sending notification to systemd: {0:#}")] Notify(IoError), #[error("Error setting up signal handler: {0:#}")] Signal(IoError), #[error(transparent)] Proxy(#[from] ProxyError), #[error(transparent)] Handle(#[from] NamespaceHandleError), #[error(transparent)] Enter(#[from] NamespaceEnterError), #[error(transparent)] Link(#[from] LinkError), }