cleanup devices that haven't been seen for a while

This commit is contained in:
Robin Appelman 2021-02-07 19:09:17 +01:00
commit df0fececdb
2 changed files with 83 additions and 22 deletions

View file

@ -3,6 +3,7 @@ use json::JsonValue;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::fmt::{self, Debug, Display, Formatter, Write};
use std::time::Instant;
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct Device {
@ -16,7 +17,7 @@ impl Device {
}
}
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct DeviceState {
pub state: Option<bool>,
pub name: String,
@ -26,10 +27,29 @@ pub struct DeviceState {
pub co2: Option<f32>,
pub mi_temp_devices: BTreeMap<BDAddr, MiTempState>,
pub pms_state: Option<PMSState>,
pub last_seen: Instant,
}
impl Default for DeviceState {
fn default() -> Self {
DeviceState {
state: Default::default(),
name: Default::default(),
power_watts: Default::default(),
power_yesterday: Default::default(),
power_today: Default::default(),
co2: Default::default(),
mi_temp_devices: Default::default(),
pms_state: Default::default(),
last_seen: Instant::now(),
}
}
}
impl DeviceState {
pub fn update(&mut self, json: JsonValue) {
self.last_seen = Instant::now();
if json["DeviceName"].is_string() && !json["DeviceName"].is_empty() {
self.name = json["DeviceName"].to_string();
}

View file

@ -10,10 +10,13 @@ use crate::topic::Topic;
use color_eyre::{eyre::WrapErr, Result};
use dashmap::DashMap;
use pin_utils::pin_mut;
use rumqttc::QoS;
use rumqttc::{AsyncClient, Publish, QoS};
use std::pin::Pin;
use std::sync::Arc;
use tokio::time::Duration;
use tokio_stream::StreamExt;
use std::time::Instant;
use tokio::task::spawn;
use tokio::time::{sleep, Duration};
use tokio_stream::{Stream, StreamExt};
use warp::Filter;
type DeviceStates = Arc<DashMap<Device, DeviceState>>;
@ -30,17 +33,16 @@ async fn main() -> Result<()> {
})
.expect("Error setting Ctrl-C handler");
let states = device_states.clone();
let mi_temp_names = config.mi_temp_names.clone();
tokio::task::spawn(async move {
loop {
if let Err(e) = mqtt_client(&config, states.clone()).await {
eprintln!("lost mqtt collection: {:#}", e);
}
eprintln!("reconnecting after 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let mqtt_options = config.mqtt()?;
let (client, stream) = mqtt_stream(mqtt_options)
.await
.wrap_err("Failed to setup mqtt listener")?;
spawn(mqtt_loop(client.clone(), stream, device_states.clone()));
spawn(cleanup(client.clone(), device_states.clone()));
let state = warp::any().map(move || device_states.clone());
@ -64,15 +66,26 @@ async fn main() -> Result<()> {
Ok(())
}
async fn mqtt_client(config: &Config, device_states: DeviceStates) -> Result<()> {
let mqtt_options = config.mqtt()?;
let (client, stream) = mqtt_stream(mqtt_options)
.await
.wrap_err("Failed to setup mqtt listener")?;
async fn mqtt_loop(
client: AsyncClient,
stream: impl Stream<Item = Result<Publish>>,
states: DeviceStates,
) {
pin_mut!(stream);
loop {
if let Err(e) = mqtt_client(client.clone(), &mut stream, states.clone()).await {
eprintln!("lost mqtt collection: {:#}", e);
}
eprintln!("reconnecting after 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn mqtt_client<S: Stream<Item = Result<Publish>>>(
client: AsyncClient,
stream: &mut Pin<&mut S>,
device_states: DeviceStates,
) -> Result<()> {
while let Some(message) = stream.next().await {
let message = message?;
println!(
@ -86,7 +99,7 @@ async fn mqtt_client(config: &Config, device_states: DeviceStates) -> Result<()>
Topic::LWT(device) => {
// on discovery, ask the device for it's power state and name
let send_client = client.clone();
tokio::task::spawn(async move {
spawn(async move {
if let Err(e) = send_client
.publish(
device.get_topic("cmnd", "POWER"),
@ -131,3 +144,31 @@ async fn mqtt_client(config: &Config, device_states: DeviceStates) -> Result<()>
}
Ok(())
}
async fn cleanup(client: AsyncClient, devices: DeviceStates) {
loop {
let ping_time = Instant::now() - Duration::from_secs(10 * 60);
let cleanup_time = Instant::now() - Duration::from_secs(15 * 60);
devices.retain(|device, state| {
if state.last_seen < cleanup_time {
println!("{} hasn't been seen for 15m, removing", device.hostname);
true
} else if state.last_seen < ping_time {
println!("{} hasn't been seen for 10m, pinging", device.hostname);
let send_client = client.clone();
let topic = device.get_topic("cmnd", "DeviceName");
spawn(async move {
if let Err(e) = send_client.publish(topic, QoS::AtMostOnce, false, "").await {
eprintln!("Failed to ping device: {:#}", e);
}
});
false
} else {
false
}
});
sleep(Duration::from_secs(5 * 60)).await;
}
}