allow listening on unix socket

This commit is contained in:
Robin Appelman 2022-07-24 22:10:10 +02:00
commit 698e082a28
4 changed files with 56 additions and 16 deletions

5
Cargo.lock generated
View file

@ -1133,6 +1133,7 @@ dependencies = [
"pin-utils", "pin-utils",
"rumqttc", "rumqttc",
"tokio", "tokio",
"tokio-stream",
"warp", "warp",
"warp-reverse-proxy", "warp-reverse-proxy",
] ]
@ -1248,9 +1249,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.8" version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",

View file

@ -18,4 +18,5 @@ color-eyre = "0.5"
async-stream = "0.3" async-stream = "0.3"
pin-utils = "0.1" pin-utils = "0.1"
hostname = "0.3" hostname = "0.3"
warp-reverse-proxy = { version = "0.3", default_features = false, features = ["rustls-tls"] } warp-reverse-proxy = { version = "0.3", default_features = false, features = ["rustls-tls"] }
tokio-stream = { version = "0.1.9", features = ["net"] }

View file

@ -3,12 +3,17 @@ use rumqttc::MqttOptions;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
#[derive(Default)]
pub struct Config { pub struct Config {
pub mqtt_host: String, pub mqtt_host: String,
pub mqtt_port: u16, pub mqtt_port: u16,
pub mqtt_credentials: Option<Credentials>, pub mqtt_credentials: Option<Credentials>,
pub host_port: u16, pub listen: Listen,
}
#[derive(Clone)]
pub enum Listen {
Tcp(u16),
Unix(String),
} }
pub struct Credentials { pub struct Credentials {
@ -23,10 +28,16 @@ impl Config {
.ok() .ok()
.and_then(|port| u16::from_str(&port).ok()) .and_then(|port| u16::from_str(&port).ok())
.unwrap_or(1883); .unwrap_or(1883);
let host_port = dotenv::var("PORT") let listen = match dotenv::var("SOCKET") {
.ok() Ok(socket) => Listen::Unix(socket),
.and_then(|port| u16::from_str(&port).ok()) _ => {
.unwrap_or(80); let port = dotenv::var("PORT")
.ok()
.and_then(|port| u16::from_str(&port).ok())
.unwrap_or(80);
Listen::Tcp(port)
}
};
let mqtt_credentials = match dotenv::var("MQTT_USERNAME") { let mqtt_credentials = match dotenv::var("MQTT_USERNAME") {
Ok(username) => { Ok(username) => {
@ -40,8 +51,8 @@ impl Config {
Ok(Config { Ok(Config {
mqtt_host, mqtt_host,
mqtt_port, mqtt_port,
host_port,
mqtt_credentials, mqtt_credentials,
listen,
}) })
} }

View file

@ -1,16 +1,21 @@
use crate::config::Config; use crate::config::{Config, Listen};
use crate::devices::{Device, DeviceState}; use crate::devices::{Device, DeviceState};
use crate::mqtt::mqtt_stream; use crate::mqtt::mqtt_stream;
use crate::topic::Topic; use crate::topic::Topic;
use color_eyre::{eyre::WrapErr, Result}; use color_eyre::{eyre::WrapErr, Result};
use dashmap::DashMap; use dashmap::DashMap;
use futures_util::future::{Either, FutureExt};
use futures_util::stream::StreamExt; use futures_util::stream::StreamExt;
use pin_utils::pin_mut; use pin_utils::pin_mut;
use rumqttc::{AsyncClient, QoS}; use rumqttc::{AsyncClient, QoS};
use std::fs::{remove_file, set_permissions};
use std::os::unix::prelude::PermissionsExt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::net::UnixListener;
use tokio::signal; use tokio::signal;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_stream::wrappers::UnixListenerStream;
use warp::hyper::http::uri::Authority; use warp::hyper::http::uri::Authority;
use warp::Filter; use warp::Filter;
use warp_reverse_proxy::{extract_request_data_filter, proxy_to_and_forward_response}; use warp_reverse_proxy::{extract_request_data_filter, proxy_to_and_forward_response};
@ -25,7 +30,7 @@ type DeviceStates = Arc<DashMap<String, DeviceState>>;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let config = Config::from_env()?; let config = Config::from_env()?;
let host_port = config.host_port; let listen = config.listen.clone();
let device_states = DeviceStates::default(); let device_states = DeviceStates::default();
@ -73,10 +78,32 @@ async fn main() -> Result<()> {
.and(extract_request_data_filter()) .and(extract_request_data_filter())
.and_then(proxy_to_and_forward_response); .and_then(proxy_to_and_forward_response);
let (_addr, server) = let cancel = async {
warp::serve(proxy).bind_with_graceful_shutdown(([0, 0, 0, 0], host_port), async { signal::ctrl_c().await.ok();
signal::ctrl_c().await.ok(); };
});
let warp_server = warp::serve(proxy);
let server = match listen {
Listen::Tcp(host_port) => Either::Left(
warp_server
.bind_with_graceful_shutdown(([0, 0, 0, 0], host_port), cancel)
.1,
),
Listen::Unix(socket) => {
remove_file(&socket).ok();
let listener = UnixListener::bind(&socket)?;
set_permissions(&socket, PermissionsExt::from_mode(0o666))?;
let stream = UnixListenerStream::new(listener);
Either::Right(
warp_server
.serve_incoming_with_graceful_shutdown(stream, cancel)
.map(move |_| {
remove_file(&socket).ok();
}),
)
}
};
server.await; server.await;