dedup mitemp

This commit is contained in:
Robin Appelman 2021-11-10 00:02:03 +01:00
commit b6624fd763
3 changed files with 120 additions and 80 deletions

View file

@ -4,29 +4,28 @@ mod mqtt;
mod topic;
use crate::config::Config;
use crate::device::{format_device_state, Device, DeviceState};
use crate::device::{format_device_state, format_mi_temp_state, Device, DeviceStates};
use crate::mqtt::mqtt_stream;
use crate::topic::Topic;
use color_eyre::{eyre::WrapErr, Result};
use dashmap::DashMap;
use pin_utils::pin_mut;
use rumqttc::{AsyncClient, Publish, QoS};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::task::spawn;
use tokio::time::{sleep, Duration};
use tokio_stream::{Stream, StreamExt};
use warp::Filter;
type DeviceStates = Arc<DashMap<Device, DeviceState>>;
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::from_env()?;
let host_port = config.host_port;
let device_states = DeviceStates::default();
let device_states = <Arc<Mutex<DeviceStates>>>::default();
ctrlc::set_handler(move || {
std::process::exit(0);
@ -48,16 +47,14 @@ async fn main() -> Result<()> {
let metrics = warp::path!("metrics")
.and(state)
.map(move |state: DeviceStates| {
.map(move |state: Arc<Mutex<DeviceStates>>| {
let state = state.lock().unwrap();
let mut response = String::new();
for device in state.iter() {
format_device_state(
&mut response,
&device.key(),
&device.value(),
&mi_temp_names,
)
.unwrap();
for (device, state) in state.devices() {
format_device_state(&mut response, device, state).unwrap();
}
for (addr, state) in state.mi_temp() {
format_mi_temp_state(&mut response, *addr, &mi_temp_names, state).unwrap()
}
response
});
@ -69,7 +66,7 @@ async fn main() -> Result<()> {
async fn mqtt_loop(
client: AsyncClient,
stream: impl Stream<Item = Result<Publish>>,
states: DeviceStates,
states: Arc<Mutex<DeviceStates>>,
) {
pin_mut!(stream);
loop {
@ -96,7 +93,7 @@ async fn command(client: &AsyncClient, device: &Device, command: &str) -> Result
async fn mqtt_client<S: Stream<Item = Result<Publish>>>(
client: AsyncClient,
stream: &mut Pin<&mut S>,
device_states: DeviceStates,
device_states: Arc<Mutex<DeviceStates>>,
) -> Result<()> {
while let Some(message) = stream.next().await {
let message = message?;
@ -108,7 +105,7 @@ async fn mqtt_client<S: Stream<Item = Result<Publish>>>(
let topic = Topic::from(message.topic.as_str());
match topic {
Topic::LWT(device) => {
Topic::Lwt(device) => {
// on discovery, ask the device for it's power state and name
let send_client = client.clone();
spawn(async move {
@ -124,15 +121,15 @@ async fn mqtt_client<S: Stream<Item = Result<Publish>>>(
Topic::Result(device) => {
let payload = std::str::from_utf8(message.payload.as_ref()).unwrap_or_default();
if let Ok(json) = json::parse(payload) {
let mut device_state = device_states.entry(device).or_default();
device_state.update(json);
let mut device_states = device_states.lock().unwrap();
device_states.update(device, json);
}
}
Topic::Sensor(device) => {
let payload = std::str::from_utf8(message.payload.as_ref()).unwrap_or_default();
if let Ok(json) = json::parse(payload) {
let mut device_state = device_states.entry(device).or_default();
device_state.update(json);
let mut device_states = device_states.lock().unwrap();
device_states.update(device, json);
}
}
_ => {}
@ -141,32 +138,15 @@ async fn mqtt_client<S: Stream<Item = Result<Publish>>>(
Ok(())
}
async fn cleanup(client: AsyncClient, devices: DeviceStates) {
async fn cleanup(client: AsyncClient, state: Arc<Mutex<DeviceStates>>) {
loop {
let ping_time = Instant::now() - Duration::from_secs(10 * 60);
let cleanup_time = Instant::now() - Duration::from_secs(15 * 60);
devices.retain(|device, state| {
if state.last_seen < cleanup_time {
println!("{} hasn't been seen for 15m, removing", device.hostname);
false
} else if state.last_seen < ping_time || state.name.is_empty() {
println!(
"{} hasn't been seen for 10m or has no name set, pinging",
device.hostname
);
let send_client = client.clone();
let topic = device.get_topic("cmnd", "DeviceName");
spawn(async move {
if let Err(e) = send_client.publish(topic, QoS::AtMostOnce, false, "").await {
eprintln!("Failed to ping device: {:#}", e);
}
});
true
} else {
true
}
});
state
.lock()
.unwrap()
.retain(cleanup_time, ping_time, &client);
sleep(Duration::from_secs(60)).await;
}