get ip and name

This commit is contained in:
Robin Appelman 2024-01-26 23:05:39 +01:00
commit 054b8e277c
3 changed files with 84 additions and 1 deletions

View file

@ -2,6 +2,7 @@ use clap::Parser;
use std::pin::pin; use std::pin::pin;
use tasmota_mqtt_client::DeviceUpdate; use tasmota_mqtt_client::DeviceUpdate;
pub use tasmota_mqtt_client::{Result, TasmotaClient}; pub use tasmota_mqtt_client::{Result, TasmotaClient};
use tokio::join;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
@ -26,7 +27,8 @@ async fn main() -> Result<()> {
while let Some(update) = discovery.next().await { while let Some(update) = discovery.next().await {
match update { match update {
DeviceUpdate::Added(device) => { DeviceUpdate::Added(device) => {
println!("discovered {device}"); let (ip, name) = join!(client.device_ip(&device), client.device_name(&device));
println!("discovered {}({device}) with ip {}", name?, ip?);
} }
DeviceUpdate::Removed(device) => { DeviceUpdate::Removed(device) => {
println!("{device} has gone offline"); println!("{device} has gone offline");

View file

@ -14,6 +14,10 @@ pub enum Error {
JsonPayload(serde_json::Error), JsonPayload(serde_json::Error),
#[error(transparent)] #[error(transparent)]
Download(#[from] DownloadError), Download(#[from] DownloadError),
#[error("Malformed reply received from device for {0}: {1}")]
MalformedReply(&'static str, String),
#[error("Timeout while waiting for reply from device")]
Timeout,
} }
impl From<serde_json::Error> for Error { impl From<serde_json::Error> for Error {
@ -28,6 +32,8 @@ pub enum MqttError {
Client(ClientError), Client(ClientError),
#[error("transparent")] #[error("transparent")]
Connection(ConnectionError), Connection(ConnectionError),
#[error("connection closed unexpectedly")]
Eof,
} }
impl From<MqttError> for Error { impl From<MqttError> for Error {

View file

@ -4,13 +4,20 @@ mod mqtt;
use crate::download::download_config; use crate::download::download_config;
pub use crate::download::DownloadedFile; pub use crate::download::DownloadedFile;
use crate::error::MqttError;
use crate::mqtt::MqttHelper; use crate::mqtt::MqttHelper;
pub use error::{Error, Result}; pub use error::{Error, Result};
use rumqttc::MqttOptions; use rumqttc::MqttOptions;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::spawn; use tokio::spawn;
use tokio::sync::broadcast::{channel, Sender}; use tokio::sync::broadcast::{channel, Sender};
use tokio::time::timeout;
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
@ -18,6 +25,7 @@ pub struct TasmotaClient {
mqtt: MqttHelper, mqtt: MqttHelper,
known_devices: Arc<Mutex<BTreeSet<String>>>, known_devices: Arc<Mutex<BTreeSet<String>>>,
device_update: Sender<DeviceUpdate>, device_update: Sender<DeviceUpdate>,
timeout: Duration,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -68,9 +76,17 @@ impl TasmotaClient {
mqtt, mqtt,
known_devices, known_devices,
device_update, device_update,
timeout: Duration::from_secs(1),
}) })
} }
/// Set the timeout used for one-show commands
///
/// The default timeout is 1 seccond
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
/// Download the config backup from a device /// Download the config backup from a device
/// ///
/// The password is the mqtt password used by the device, which might be different from the mqtt password used by this client /// The password is the mqtt password used by the device, which might be different from the mqtt password used by this client
@ -98,4 +114,63 @@ impl TasmotaClient {
) )
.chain(BroadcastStream::new(rx).filter_map(Result::ok)) .chain(BroadcastStream::new(rx).filter_map(Result::ok))
} }
/// Send a command that expect a single reply message
pub async fn command<T: DeserializeOwned>(
&self,
device: &str,
command: &str,
payload: &str,
) -> Result<T> {
let mut rx = self.mqtt.subscribe(format!("stat/{device}/RESULT")).await?;
self.mqtt
.send_str(&format!("cmnd/{device}/{command}"), payload)
.await?;
let reply = async {
while let Some(msg) = rx.recv().await {
if let Ok(response) = serde_json::from_slice(msg.payload.as_ref()) {
return Ok(response);
}
}
Err(MqttError::Eof.into())
};
timeout(self.timeout, reply)
.await
.map_err(|_| Error::Timeout)?
}
pub async fn device_ip(&self, device: &str) -> Result<IpAddr> {
#[derive(Deserialize)]
struct IpAddressResponse {
#[serde(rename = "IPAddress1")]
ip_address_1: String,
}
let response: IpAddressResponse = self.command(device, "IPADDRESS", "").await?;
let raw = response.ip_address_1;
let Some(Ok(ip)) = raw
.split(' ')
.map(|part| part.trim_start_matches('(').trim_end_matches(')'))
.rev()
.map(IpAddr::from_str)
.next()
else {
return Err(Error::MalformedReply("device ip", raw));
};
Ok(ip)
}
pub async fn device_name(&self, device: &str) -> Result<String> {
#[derive(Deserialize)]
struct NameResponse {
#[serde(rename = "DeviceName")]
device_name: String,
}
let response: NameResponse = self.command(device, "DeviceName", "").await?;
Ok(response.device_name)
}
} }