mirror of
https://codeberg.org/icewind/taspromto.git
synced 2026-06-03 16:44:11 +02:00
hopefully fix reconnect
This commit is contained in:
parent
bd955a62cc
commit
b3dbfe58e7
1 changed files with 22 additions and 24 deletions
50
src/main.rs
50
src/main.rs
|
|
@ -23,7 +23,7 @@ use warp::Filter;
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
let config = Config::from_env()?;
|
let config = Config::from_env()?;
|
||||||
let host_port = config.host_port;
|
let mqtt_options = config.mqtt()?;
|
||||||
|
|
||||||
let device_states = <Arc<Mutex<DeviceStates>>>::default();
|
let device_states = <Arc<Mutex<DeviceStates>>>::default();
|
||||||
|
|
||||||
|
|
@ -32,17 +32,31 @@ async fn main() -> Result<()> {
|
||||||
})
|
})
|
||||||
.expect("Error setting Ctrl-C handler");
|
.expect("Error setting Ctrl-C handler");
|
||||||
|
|
||||||
|
spawn(serve(device_states.clone(), config));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (client, stream) = mqtt_stream(mqtt_options.clone())
|
||||||
|
.await
|
||||||
|
.wrap_err("Failed to setup mqtt listener")?;
|
||||||
|
|
||||||
|
let cleanup_task = spawn(cleanup(client.clone(), device_states.clone()));
|
||||||
|
|
||||||
|
pin_mut!(stream);
|
||||||
|
|
||||||
|
if let Err(e) = mqtt_client(client.clone(), &mut stream, device_states.clone()).await {
|
||||||
|
eprintln!("lost mqtt collection: {:#}", e);
|
||||||
|
}
|
||||||
|
eprintln!("reconnecting after 1s");
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
cleanup_task.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn serve(device_states: Arc<Mutex<DeviceStates>>, config: Config) {
|
||||||
|
let host_port = config.host_port;
|
||||||
let mi_temp_names = config.mi_temp_names.clone();
|
let mi_temp_names = config.mi_temp_names.clone();
|
||||||
|
|
||||||
let mqtt_options = config.mqtt()?;
|
|
||||||
|
|
||||||
let (client, stream) = mqtt_stream(mqtt_options)
|
|
||||||
.await
|
|
||||||
.wrap_err("Failed to setup mqtt listener")?;
|
|
||||||
|
|
||||||
spawn(mqtt_loop(client.clone(), stream, device_states.clone()));
|
|
||||||
spawn(cleanup(client.clone(), device_states.clone()));
|
|
||||||
|
|
||||||
let state = warp::any().map(move || device_states.clone());
|
let state = warp::any().map(move || device_states.clone());
|
||||||
|
|
||||||
let metrics = warp::path!("metrics")
|
let metrics = warp::path!("metrics")
|
||||||
|
|
@ -60,22 +74,6 @@ async fn main() -> Result<()> {
|
||||||
});
|
});
|
||||||
|
|
||||||
warp::serve(metrics).run(([0, 0, 0, 0], host_port)).await;
|
warp::serve(metrics).run(([0, 0, 0, 0], host_port)).await;
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn mqtt_loop(
|
|
||||||
client: AsyncClient,
|
|
||||||
stream: impl Stream<Item = Result<Publish>>,
|
|
||||||
states: Arc<Mutex<DeviceStates>>,
|
|
||||||
) {
|
|
||||||
pin_mut!(stream);
|
|
||||||
loop {
|
|
||||||
if let Err(e) = mqtt_client(client.clone(), &mut stream, states.clone()).await {
|
|
||||||
eprintln!("lost mqtt collection: {:#}", e);
|
|
||||||
}
|
|
||||||
eprintln!("reconnecting after 1s");
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn command(client: &AsyncClient, device: &Device, command: &str, body: &str) -> Result<()> {
|
async fn command(client: &AsyncClient, device: &Device, command: &str, body: &str) -> Result<()> {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue