discovery wip

This commit is contained in:
Robin Appelman 2024-01-26 22:34:13 +01:00
commit 56dc433854
5 changed files with 128 additions and 4 deletions

14
Cargo.lock generated
View file

@ -795,6 +795,20 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]

View file

@ -11,7 +11,7 @@ thiserror = "1.0.56"
tokio = { version = "1.35.1", features = ["rt-multi-thread", "sync"] }
tracing = "0.1.40"
async-stream = "0.3.5"
tokio-stream = "0.1.14"
tokio-stream = { version = "0.1.14", features = ["sync"] }
dashmap = "5.5.3"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"

View file

@ -19,7 +19,8 @@ async fn main() -> Result<()> {
&args.hostname,
args.port,
Some((&args.username, &args.password)),
)?;
)
.await?;
let file = client
.download_config(&args.device, &args.device_password)
.await?;

37
examples/discovery.rs Normal file
View file

@ -0,0 +1,37 @@
use clap::Parser;
use std::pin::pin;
use tasmota_mqtt_client::DeviceUpdate;
pub use tasmota_mqtt_client::{Result, TasmotaClient};
use tokio_stream::StreamExt;
#[derive(Debug, Parser)]
struct Args {
hostname: String,
port: u16,
username: String,
password: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let client = TasmotaClient::connect(
&args.hostname,
args.port,
Some((&args.username, &args.password)),
)
.await?;
let mut discovery = pin!(client.devices());
while let Some(update) = discovery.next().await {
match update {
DeviceUpdate::Added(device) => {
println!("discovered {device}");
}
DeviceUpdate::Removed(device) => {
println!("{device} has gone offline");
}
}
}
Ok(())
}

View file

@ -7,23 +7,95 @@ pub use crate::download::DownloadedFile;
use crate::mqtt::MqttHelper;
pub use error::{Error, Result};
use rumqttc::MqttOptions;
use std::collections::BTreeSet;
use std::sync::{Arc, Mutex};
use tokio::spawn;
use tokio::sync::broadcast::{channel, Sender};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt};
pub struct TasmotaClient {
mqtt: MqttHelper,
known_devices: Arc<Mutex<BTreeSet<String>>>,
device_update: Sender<DeviceUpdate>,
}
#[derive(Debug, Clone)]
pub enum DeviceUpdate {
Added(String),
Removed(String),
}
impl TasmotaClient {
pub fn connect(host: &str, port: u16, credentials: Option<(&str, &str)>) -> Result<Self> {
pub async fn connect(host: &str, port: u16, credentials: Option<(&str, &str)>) -> Result<Self> {
let mut mqtt_opts = MqttOptions::new("tasmota-client", host, port);
if let Some((username, password)) = credentials {
mqtt_opts.set_credentials(username, password);
}
let mqtt = MqttHelper::connect(mqtt_opts)?;
let mut lwt = mqtt.subscribe("tele/+/LWT".into()).await?;
let known_devices = Arc::new(Mutex::new(BTreeSet::new()));
let edit_devices = known_devices.clone();
let (tx, _) = channel(10);
let device_update = tx.clone();
spawn(async move {
while let Some(msg) = lwt.recv().await {
let payload = std::str::from_utf8(msg.payload.as_ref()).unwrap_or_default();
let Some(device) = msg.topic.split('/').nth(1) else {
continue;
};
match payload {
"Online" => {
edit_devices.lock().unwrap().insert(device.into());
let _ = tx.send(DeviceUpdate::Added(device.into()));
}
"Offline" => {
edit_devices.lock().unwrap().remove(device.into());
let _ = tx.send(DeviceUpdate::Removed(device.into()));
}
_ => {}
}
}
});
Ok(TasmotaClient {
mqtt: MqttHelper::connect(mqtt_opts)?,
mqtt,
known_devices,
device_update,
})
}
/// Download the config backup from a device
///
/// The password is the mqtt password used by the device, which might be different from the mqtt password used by this client
pub async fn download_config(&self, client: &str, password: &str) -> Result<DownloadedFile> {
download_config(&self.mqtt, client, password).await
}
/// Get the list of known devices at this point in time
///
/// Due to the asynchronous nature of discovery, calling this directly after creating the client
/// will be unlikely to return all live devices
pub fn current_devices(&self) -> Vec<String> {
self.known_devices.lock().unwrap().iter().cloned().collect()
}
/// Subscribe to device discovery, receiving a [`DeviceUpdate`] whenever a device comes online or goes offline
pub fn devices(&self) -> impl Stream<Item = DeviceUpdate> {
let current = self.current_devices();
let rx = self.device_update.subscribe();
tokio_stream::iter(
current
.into_iter()
.map(|device| DeviceUpdate::Added(device)),
)
.chain(BroadcastStream::new(rx).filter_map(Result::ok))
}
}