mirror of
https://codeberg.org/icewind/netnsd.git
synced 2026-06-03 17:14:06 +02:00
move code around
This commit is contained in:
parent
3b7d53f693
commit
9af09c8669
7 changed files with 9 additions and 10 deletions
147
src/proxy/mod.rs
Normal file
147
src/proxy/mod.rs
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
mod tcp;
|
||||
|
||||
use crate::config::{ForwardConfig, ForwardTarget, ForwardSource, NamespaceName};
|
||||
use crate::proxy::tcp::Proxy;
|
||||
use futures::future::AbortHandle;
|
||||
use nix::sched::{CloneFlags, setns};
|
||||
use std::fs::{File, remove_file};
|
||||
use std::io::Error as IoError;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::thread::spawn;
|
||||
use thiserror::Error;
|
||||
use tokio::runtime::Builder;
|
||||
use tracing::error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ProxyError {
|
||||
#[error("Failed to listen to {address}: {error:#}")]
|
||||
Bind {
|
||||
address: ForwardSource,
|
||||
error: IoError,
|
||||
},
|
||||
#[error("Failed to accept proxy connection: {error:#}")]
|
||||
Accept { error: IoError },
|
||||
#[error("Failed to connect to proxy destination {destination}: {error:#}")]
|
||||
Connect {
|
||||
destination: SocketAddr,
|
||||
error: IoError,
|
||||
},
|
||||
#[error("Failed to open namespace file {}: {error:#}", path.display())]
|
||||
OpenNamespace { path: PathBuf, error: IoError },
|
||||
}
|
||||
|
||||
pub struct ActiveProxy {
|
||||
pub source: ForwardSource,
|
||||
pub destination: ForwardTarget,
|
||||
abort: AbortHandle,
|
||||
pub stats: ProxyStats,
|
||||
}
|
||||
|
||||
impl ActiveProxy {
|
||||
pub fn new(
|
||||
config: &ForwardConfig,
|
||||
namespace: &NamespaceName,
|
||||
) -> Result<ActiveProxy, ProxyError> {
|
||||
let proxy = Proxy::listen(config.source.clone())?;
|
||||
let stats = ProxyStats::default();
|
||||
|
||||
let (abort, abort_reg) = AbortHandle::new_pair();
|
||||
|
||||
let destination = config.target.clone();
|
||||
let run_stats = stats.clone();
|
||||
let ns_path = PathBuf::from(format!("/var/run/netns/{namespace}"));
|
||||
let ns_handle = File::open(&ns_path).map_err(|error| ProxyError::OpenNamespace {
|
||||
error,
|
||||
path: ns_path,
|
||||
})?;
|
||||
spawn(move || match setns(ns_handle, CloneFlags::CLONE_NEWNET) {
|
||||
Ok(_) => {
|
||||
let rt = match Builder::new_current_thread().enable_io().build() {
|
||||
Ok(rt) => rt,
|
||||
Err(error) => {
|
||||
error!(%error, "Error setting up tokio runtime");
|
||||
return;
|
||||
}
|
||||
};
|
||||
rt.block_on(proxy.run(destination, abort_reg, run_stats));
|
||||
}
|
||||
Err(error) => {
|
||||
error!(%error, "Failed to join network namespace for proxy");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(ActiveProxy {
|
||||
source: config.source.clone(),
|
||||
destination: config.target.clone(),
|
||||
abort,
|
||||
stats,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ActiveProxy {
|
||||
fn drop(&mut self) {
|
||||
if let ForwardSource::Unix(path) = &self.source {
|
||||
if let Err(error) = remove_file(path) {
|
||||
error!(%error, "failed to remove unix socket");
|
||||
}
|
||||
}
|
||||
self.abort.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq<ForwardConfig> for ActiveProxy {
|
||||
fn eq(&self, other: &ForwardConfig) -> bool {
|
||||
self.source == other.source && self.destination == other.target
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ProxyStats {
|
||||
open_connections: Arc<AtomicU64>,
|
||||
total_connections: Arc<AtomicU64>,
|
||||
/// Bytes proxied source -> destination
|
||||
bytes_written: Arc<AtomicU64>,
|
||||
/// Bytes proxied destination -> source
|
||||
bytes_read: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl ProxyStats {
|
||||
pub fn open_connection(&self) {
|
||||
self.open_connections.fetch_add(1, Ordering::Relaxed);
|
||||
self.total_connections.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn close_connection(&self) {
|
||||
self.open_connections.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn open_connections(&self) -> u64 {
|
||||
self.open_connections.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn total_connections(&self) -> u64 {
|
||||
self.total_connections.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn add_read(&self, bytes: u64) {
|
||||
self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn add_written(&self, bytes: u64) {
|
||||
self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Bytes proxied destination -> source
|
||||
pub fn read(&self) -> u64 {
|
||||
self.bytes_read.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Bytes proxied source -> destination
|
||||
pub fn written(&self) -> u64 {
|
||||
self.bytes_written.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
159
src/proxy/tcp.rs
Normal file
159
src/proxy/tcp.rs
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
/// Loosely based on https://github.com/fooker/netns-proxy/blob/main/src/tcp.rs
|
||||
use crate::config::{ForwardTarget, ForwardSource};
|
||||
use crate::proxy::{ProxyError, ProxyStats};
|
||||
use futures::TryStreamExt;
|
||||
use futures::stream::{AbortRegistration, Abortable};
|
||||
use std::fs::{remove_file, set_permissions};
|
||||
use std::io::Error as IoError;
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::pin::pin;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, copy_bidirectional};
|
||||
use tokio::net::{TcpListener, TcpSocket, TcpStream, UnixListener};
|
||||
use tokio::spawn;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
|
||||
use tracing::{Level, debug, error, instrument, span, trace, warn};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Proxy {
|
||||
socket: ProxyListener,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ProxyListener {
|
||||
Tcp(TcpListener),
|
||||
Unix(UnixListener),
|
||||
}
|
||||
|
||||
fn bind_tcp(addr: SocketAddr) -> Result<TcpListener, IoError> {
|
||||
let socket = if addr.is_ipv4() {
|
||||
TcpSocket::new_v4()
|
||||
} else {
|
||||
TcpSocket::new_v6()
|
||||
}?;
|
||||
socket.bind(addr)?;
|
||||
socket.listen(1024)
|
||||
}
|
||||
|
||||
impl Proxy {
|
||||
pub fn listen(bind: ForwardSource) -> Result<Self, ProxyError> {
|
||||
let socket = match &bind {
|
||||
ForwardSource::Unix(path) => {
|
||||
let _ = remove_file(path);
|
||||
UnixListener::bind(path).map(|listener| {
|
||||
if let Err(error) = set_permissions(path, PermissionsExt::from_mode(0o666)) {
|
||||
error!(%error, "failed to set socket permissions");
|
||||
}
|
||||
ProxyListener::Unix(listener)
|
||||
})
|
||||
}
|
||||
ForwardSource::Ip(addr) => bind_tcp(*addr).map(ProxyListener::Tcp),
|
||||
}
|
||||
.map_err(|error| ProxyError::Bind {
|
||||
address: bind,
|
||||
error,
|
||||
})?;
|
||||
debug!("Created TCP socket");
|
||||
|
||||
Ok(Self {
|
||||
socket,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(self, target: ForwardTarget, abort: AbortRegistration, stats: ProxyStats) {
|
||||
let proxy_stats = stats.clone();
|
||||
match self.socket {
|
||||
ProxyListener::Tcp(socket) => {
|
||||
run_tcp(socket, target.addr, abort, proxy_stats).await
|
||||
}
|
||||
ProxyListener::Unix(socket) => {
|
||||
run_unix(socket, target.addr, abort, proxy_stats).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_tcp(
|
||||
socket: TcpListener,
|
||||
target: SocketAddr,
|
||||
abort: AbortRegistration,
|
||||
stats: ProxyStats,
|
||||
) {
|
||||
let accepts = TcpListenerStream::new(socket).map_err(|error| ProxyError::Accept { error });
|
||||
let mut accepts = pin!(Abortable::new(accepts, abort));
|
||||
while let Some(client) = accepts.next().await {
|
||||
stats.open_connection();
|
||||
let result: Result<(), ProxyError> = async {
|
||||
let client = client?;
|
||||
let remote = client.peer_addr().ok();
|
||||
proxy_stream(client, target, remote, stats.clone()).await
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(err) = result {
|
||||
error!("Error in tcp proxy: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_unix(
|
||||
socket: UnixListener,
|
||||
target: SocketAddr,
|
||||
abort: AbortRegistration,
|
||||
stats: ProxyStats,
|
||||
) {
|
||||
let accepts = UnixListenerStream::new(socket).map_err(|error| ProxyError::Accept { error });
|
||||
let mut accepts = pin!(Abortable::new(accepts, abort));
|
||||
while let Some(client) = accepts.next().await {
|
||||
stats.open_connection();
|
||||
let result: Result<(), ProxyError> = async {
|
||||
let client = client?;
|
||||
proxy_stream(client, target, None, stats.clone()).await
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(err) = result {
|
||||
error!("Error in tcp proxy: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(stream, proxy))]
|
||||
async fn proxy_stream<Stream>(
|
||||
mut stream: Stream,
|
||||
target: SocketAddr,
|
||||
remote: Option<SocketAddr>,
|
||||
proxy: ProxyStats,
|
||||
) -> Result<(), ProxyError>
|
||||
where
|
||||
Stream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
|
||||
{
|
||||
let _ = span!(Level::TRACE, "Client connection", connection = ?remote).entered();
|
||||
|
||||
trace!("connected");
|
||||
|
||||
let mut upstream = TcpStream::connect(target)
|
||||
.await
|
||||
.map_err(|error| ProxyError::Connect {
|
||||
error,
|
||||
destination: target,
|
||||
})?;
|
||||
trace!("Upstream connection established");
|
||||
|
||||
spawn(async move {
|
||||
match copy_bidirectional(&mut stream, &mut upstream).await {
|
||||
Ok((written, read)) => {
|
||||
proxy.add_written(written);
|
||||
proxy.add_read(read);
|
||||
proxy.close_connection();
|
||||
trace!("Upstream connection closed");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Upstream connection failed: {}", err);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue