From 81ec03a6ecfd9ad7f7bee576395f906142d58c1b Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Fri, 27 Mar 2026 21:57:07 +0100 Subject: [PATCH] fix unix listener --- Cargo.lock | 23 ++++++++++++++++++++++- Cargo.toml | 2 +- src/main.rs | 25 +++++++++++++++++-------- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2394be8..ccb0439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -380,6 +380,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "eyre" version = "0.6.12" @@ -1179,6 +1189,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "slab" version = "0.4.12" @@ -1294,6 +1314,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index 80aa917..dbc3c60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ rust-version = "1.85.0" [dependencies] rumqttc = "0.25.1" -tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread", "signal"] } dashmap = "6.1.0" jzon = "0.12.5" warp = { version = "0.4.2", features = ["server"] } diff --git a/src/main.rs b/src/main.rs index 4458180..3e03473 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,13 +5,13 @@ mod topic; use crate::config::{Config, ListenConfig}; use crate::device::{ - format_device_state, format_dsmr_state, format_mi_temp_state, format_rf_temp_state, Device, - DeviceStates, + Device, DeviceStates, format_device_state, format_dsmr_state, format_mi_temp_state, + format_rf_temp_state, }; use crate::mqtt::mqtt_stream; use crate::topic::Topic; use clap::Parser; -use color_eyre::{eyre::WrapErr, Result}; +use color_eyre::{Result, eyre::WrapErr}; use pin_utils::pin_mut; use rumqttc::{AsyncClient, Publish, QoS}; @@ -20,9 +20,9 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::Instant; use tokio::net::UnixListener; +use tokio::signal::ctrl_c; use tokio::task::spawn; -use tokio::time::{sleep, Duration}; -use tokio_stream::wrappers::UnixListenerStream; +use tokio::time::{Duration, sleep}; use tokio_stream::{Stream, StreamExt}; use warp::Filter; @@ -97,14 +97,23 @@ async fn serve(device_states: Arc>, config: Config) { response }); + let cancel = async { ctrl_c().await.unwrap() }; match config.listen { ListenConfig::Ip { address, port } => { - warp::serve(metrics).run((address, port)).await; + warp::serve(metrics) + .bind((address, port)) + .await + .graceful(cancel) + .run() + .await; } ListenConfig::Unix { socket: path } => { let listener = UnixListener::bind(path).unwrap(); - let incoming = UnixListenerStream::new(listener); - warp::serve(metrics).incoming(incoming); + warp::serve(metrics) + .incoming(listener) + .graceful(cancel) + .run() + .await; } } }