delay device queries

This commit is contained in:
Robin Appelman 2020-12-28 20:23:28 +01:00
commit 498627c29f

View file

@ -1,14 +1,15 @@
use crate::config::Config;
use crate::devices::DeviceState;
use crate::devices::{Device, DeviceState};
use crate::mqtt::mqtt_stream;
use crate::topic::Topic;
use color_eyre::{eyre::WrapErr, Result};
use dashmap::DashMap;
use pin_utils::pin_mut;
use rumqttc::QoS;
use rumqttc::{AsyncClient, QoS};
use std::sync::Arc;
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio::time::delay_for;
use warp::hyper::http::uri::Authority;
use warp::Filter;
use warp_reverse_proxy::{extract_request_data_filter, proxy_to_and_forward_response};
@ -67,7 +68,7 @@ async fn main() -> Result<()> {
Err(warp::reject::not_found())
}
} else {
eprintln!("Error {} has no discovered", requested_device);
eprintln!("Error {} has not been discovered", requested_device);
Err(warp::reject::not_found())
}
},
@ -96,44 +97,17 @@ async fn mqtt_client(config: &Config, device_states: DeviceStates) -> Result<()>
let topic = Topic::from(message.topic.as_str());
match topic {
Topic::LWT(device) => {
match payload {
"Online" => {
println!("Discovered {}", device.hostname);
// on discovery, ask the device for it's ip and name
let send_client = client.clone();
tokio::task::spawn(async move {
if let Err(e) = send_client
.publish(
device.get_topic("cmnd", "IPADDRESS"),
QoS::AtLeastOnce,
false,
"",
)
.await
{
eprintln!("Failed to ask for device IP: {:#}", e);
}
if let Err(e) = send_client
.publish(
device.get_topic("cmnd", "DeviceName"),
QoS::AtLeastOnce,
false,
"",
)
.await
{
eprintln!("Failed to ask for device name: {:#}", e);
}
});
}
"Offline" => {
println!("Removing {}", device.hostname);
device_states.remove(&device.hostname);
}
_ => {}
Topic::LWT(device) => match payload {
"Online" => {
println!("Discovered {}", device.hostname);
query_device(client.clone(), device).await;
}
}
"Offline" => {
println!("Removing {}", device.hostname);
device_states.remove(&device.hostname);
}
_ => {}
},
Topic::Result(device) => {
if let Ok(json) = json::parse(payload) {
let mut device_state = device_states.entry(device.hostname).or_default();
@ -145,3 +119,35 @@ async fn mqtt_client(config: &Config, device_states: DeviceStates) -> Result<()>
}
Ok(())
}
async fn query_device(client: AsyncClient, device: Device) {
tokio::task::spawn(async move {
// one device boot, the discovery event can happen before the device is ready to respond to our messages
// thus we wait 5 seconds before asking
delay_for(Duration::from_secs(5)).await;
if let Err(e) = client
.publish(
device.get_topic("cmnd", "IPADDRESS"),
QoS::AtLeastOnce,
false,
"",
)
.await
{
eprintln!("Failed to ask for device IP: {:#}", e);
}
if let Err(e) = client
.publish(
device.get_topic("cmnd", "DeviceName"),
QoS::AtLeastOnce,
false,
"",
)
.await
{
eprintln!("Failed to ask for device name: {:#}", e);
}
});
}