auto reconnect to mqtt

This commit is contained in:
Robin Appelman 2020-11-15 18:57:16 +01:00
commit 964ef6e398

View file

@ -4,6 +4,7 @@ use std::convert::TryFrom;
use std::fmt::Write; use std::fmt::Write;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Duration;
use warp::Filter; use warp::Filter;
type DeviceStates = Arc<DashMap<Device, DeviceState>>; type DeviceStates = Arc<DashMap<Device, DeviceState>>;
@ -27,7 +28,14 @@ async fn main() {
}) })
.expect("Error setting Ctrl-C handler"); .expect("Error setting Ctrl-C handler");
tokio::task::spawn(mqtt_client(mqtt_host, mqtt_port, device_states.clone())); let states = device_states.clone();
tokio::task::spawn(async move {
loop {
mqtt_client(&mqtt_host, mqtt_port, states.clone()).await;
eprintln!("lost mqtt collection, reconnecting after 1s");
tokio::time::delay_for(Duration::from_secs(1)).await;
}
});
let state = warp::any().map(move || device_states.clone()); let state = warp::any().map(move || device_states.clone());
let metrics = warp::path!("metrics") let metrics = warp::path!("metrics")
@ -83,7 +91,7 @@ async fn main() {
warp::serve(metrics).run(([0, 0, 0, 0], host_port)).await; warp::serve(metrics).run(([0, 0, 0, 0], host_port)).await;
} }
async fn mqtt_client(host: String, port: u16, device_states: DeviceStates) { async fn mqtt_client(host: &str, port: u16, device_states: DeviceStates) {
let mut mqttoptions = MqttOptions::new("rumqtt-async", host, port); let mut mqttoptions = MqttOptions::new("rumqtt-async", host, port);
mqttoptions.set_keep_alive(5); mqttoptions.set_keep_alive(5);
@ -105,9 +113,7 @@ async fn mqtt_client(host: String, port: u16, device_states: DeviceStates) {
.await .await
.unwrap(); .unwrap();
loop { while let Ok(notification) = event_loop.poll().await {
let notification = event_loop.poll().await.unwrap();
if let Event::Incoming(Packet::Publish(message)) = notification { if let Event::Incoming(Packet::Publish(message)) = notification {
let topic = Topic::from(message.topic.as_str()); let topic = Topic::from(message.topic.as_str());