config export

This commit is contained in:
Robin Appelman 2024-01-26 21:01:50 +01:00
commit 549c533076
7 changed files with 721 additions and 9 deletions

368
Cargo.lock generated
View file

@ -17,6 +17,76 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "anstream"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87"
[[package]]
name = "anstyle-parse"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
dependencies = [
"anstyle",
"windows-sys 0.52.0",
]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -50,6 +120,15 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
[[package]]
name = "bytes"
version = "1.5.0"
@ -71,6 +150,52 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "4.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -87,6 +212,39 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "flume"
version = "0.11.0"
@ -129,6 +287,16 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.12"
@ -146,6 +314,36 @@ version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "hashbrown"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
[[package]]
name = "heck"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "itoa"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]]
name = "libc"
version = "0.2.152"
@ -168,6 +366,16 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "memchr"
version = "2.7.1"
@ -194,6 +402,16 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.32.2"
@ -203,12 +421,31 @@ dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "parking_lot_core"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.48.5",
]
[[package]]
name = "pin-project-lite"
version = "0.2.13"
@ -239,6 +476,15 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags",
]
[[package]]
name = "ring"
version = "0.17.7"
@ -320,6 +566,12 @@ dependencies = [
"untrusted",
]
[[package]]
name = "ryu"
version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "schannel"
version = "0.1.23"
@ -368,6 +620,37 @@ dependencies = [
"libc",
]
[[package]]
name = "serde"
version = "1.0.195"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.195"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.111"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "slab"
version = "0.4.9"
@ -377,6 +660,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "socket2"
version = "0.5.5"
@ -396,6 +685,12 @@ dependencies = [
"lock_api",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "2.0.48"
@ -411,7 +706,19 @@ dependencies = [
name = "tasmota-mqtt-client"
version = "0.1.0"
dependencies = [
"async-stream",
"bytes",
"clap",
"dashmap",
"hex",
"md-5",
"rumqttc",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
@ -444,6 +751,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"num_cpus",
"pin-project-lite",
"socket2",
"tokio-macros",
@ -471,6 +779,54 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tracing"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
]
[[package]]
name = "typenum"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-ident"
version = "1.0.12"
@ -483,6 +839,18 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View file

@ -7,3 +7,17 @@ rust-version = "1.64.0"
[dependencies]
rumqttc = { version = "0.23.0", features = ["use-rustls"] }
thiserror = "1.0.56"
tokio = { version = "1.35.1", features = ["rt-multi-thread", "sync"] }
tracing = "0.1.40"
async-stream = "0.3.5"
tokio-stream = "0.1.14"
dashmap = "5.5.3"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
bytes = "1.5.0"
hex = "0.4.3"
md-5 = "0.10.6"
[dev-dependencies]
clap = { version = "4.4.18", features = ["derive"] }

31
examples/backup.rs Normal file
View file

@ -0,0 +1,31 @@
use clap::Parser;
pub use tasmota_mqtt_client::{Result, TasmotaClient};
#[derive(Debug, Parser)]
struct Args {
hostname: String,
port: u16,
username: String,
password: String,
device: String,
device_password: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let client = TasmotaClient::connect(
&args.hostname,
args.port,
Some((&args.username, &args.password)),
)?;
let file = client
.download_config(&args.device, &args.device_password)
.await?;
println!("downloaded {}", file.name);
if let Err(e) = std::fs::write(&file.name, file.data) {
eprintln!("Error while saving {}: {:#}", file.name, e);
}
Ok(())
}

120
src/download.rs Normal file
View file

@ -0,0 +1,120 @@
use crate::error::DownloadError;
use crate::mqtt::MqttHelper;
use crate::Result;
use bytes::{Bytes, BytesMut};
use md5::{Digest, Md5};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Serialize)]
struct SendDownloadPayload<'a> {
password: &'a str,
#[serde(rename = "type")]
ty: u8,
binary: u8,
}
#[derive(Default, Debug)]
struct DownloadState {
name: String,
size: u32,
ty: u8,
id: u32,
data: BytesMut,
md5: [u8; 16],
}
#[derive(Debug)]
pub struct DownloadedFile {
pub name: String,
pub data: Bytes,
}
pub async fn download_config(
mqtt: &MqttHelper,
client: &str,
password: &str,
) -> Result<DownloadedFile> {
let mut rx = mqtt
.subscribe(format!("stat/{client}/FILEDOWNLOAD"))
.await?;
let topic = format!("cmnd/{client}/FILEDOWNLOAD");
mqtt.send(
&topic,
&SendDownloadPayload {
password,
ty: 2,
binary: 1,
},
)
.await?;
let mut state = DownloadState::default();
loop {
let msg = rx.recv().await.unwrap();
if let Ok(body) = serde_json::from_slice::<Value>(msg.payload.as_ref()) {
if let Some(status) = body.get("FileDownload") {
match status.as_str() {
Some("Started") => {
continue;
}
Some("Aborted") => {
return Err(DownloadError::DownloadAborted.into());
}
Some("Error 1") => {
return Err(DownloadError::InvalidPassword.into());
}
Some("Error 2") => {
return Err(DownloadError::BadChunkSize.into());
}
Some("Error 3") => {
return Err(DownloadError::InvalidFileType.into());
}
Some("Done") => {
break;
}
_ => {}
}
}
if let Some(name) = body.get("File").and_then(|v| v.as_str()) {
state.name = name.to_string();
}
if let Some(size) = body.get("Size").and_then(|v| v.as_u64()) {
state.size = size as u32;
}
if let Some(id) = body.get("Size").and_then(|v| v.as_u64()) {
state.id = id as u32;
}
if let Some(ty) = body.get("Type").and_then(|v| v.as_u64()) {
state.ty = ty as u8;
}
if let Some(md5) = body.get("Md5").and_then(|v| v.as_str()) {
hex::decode_to_slice(md5, &mut state.md5[..]).map_err(DownloadError::from)?;
}
} else {
state.data.extend(msg.payload);
}
mqtt.send_str(&topic, "?").await?;
}
if state.data.len() != state.size as usize {
return Err(DownloadError::MismatchedLength(state.size, state.data.len() as u32).into());
}
let mut hasher = Md5::new();
hasher.update(state.data.as_ref());
let hash = hasher.finalize();
if hash != state.md5.into() {
return Err(DownloadError::MismatchedHash(state.md5, hash.into()).into());
}
Ok(DownloadedFile {
name: state.name,
data: state.data.freeze(),
})
}

75
src/error.rs Normal file
View file

@ -0,0 +1,75 @@
use hex::FromHexError;
use rumqttc::{ClientError, ConnectionError};
use thiserror::Error;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum Error {
#[error("Error with mqtt transport: {0:#}")]
Mqtt(MqttError),
#[error("Topic {0} doesn't follow expected format")]
MalformedTopic(String),
#[error("Malformed json payload received: {0:#}")]
JsonPayload(serde_json::Error),
#[error(transparent)]
Download(#[from] DownloadError),
}
impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Error::JsonPayload(value)
}
}
#[derive(Debug, Error)]
pub enum MqttError {
#[error("transparent")]
Client(ClientError),
#[error("transparent")]
Connection(ConnectionError),
}
impl From<MqttError> for Error {
fn from(value: MqttError) -> Self {
Error::Mqtt(value)
}
}
impl From<ClientError> for Error {
fn from(value: ClientError) -> Self {
MqttError::Client(value).into()
}
}
impl From<ConnectionError> for Error {
fn from(value: ConnectionError) -> Self {
MqttError::Connection(value).into()
}
}
#[derive(Debug, Error)]
pub enum DownloadError {
#[error("Aborted")]
DownloadAborted,
#[error("Invalid password for device")]
InvalidPassword,
#[error("Bad chunk size")]
BadChunkSize,
#[error("Invalid file type")]
InvalidFileType,
#[error("Received error code: {0}")]
Unknown(u32),
#[error("Mismatched payload length, expected {0} got {1}")]
MismatchedLength(u32, u32),
#[error("Received an invalid md5 hash")]
InvalidHash,
#[error("Received data doesn't match the expected md5 hash, expected {0:x?} got {1:x?}")]
MismatchedHash([u8; 16], [u8; 16]),
}
impl From<FromHexError> for DownloadError {
fn from(_: FromHexError) -> Self {
DownloadError::InvalidHash
}
}

View file

@ -1,14 +1,29 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
mod download;
mod error;
mod mqtt;
use crate::download::download_config;
pub use crate::download::DownloadedFile;
use crate::mqtt::MqttHelper;
pub use error::{Error, Result};
use rumqttc::MqttOptions;
pub struct TasmotaClient {
mqtt: MqttHelper,
}
#[cfg(test)]
mod tests {
use super::*;
impl TasmotaClient {
pub fn connect(host: &str, port: u16, credentials: Option<(&str, &str)>) -> Result<Self> {
let mut mqtt_opts = MqttOptions::new("tasmota-client", host, port);
if let Some((username, password)) = credentials {
mqtt_opts.set_credentials(username, password);
}
Ok(TasmotaClient {
mqtt: MqttHelper::connect(mqtt_opts)?,
})
}
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
pub async fn download_config(&self, client: &str, password: &str) -> Result<DownloadedFile> {
download_config(&self.mqtt, client, password).await
}
}

89
src/mqtt.rs Normal file
View file

@ -0,0 +1,89 @@
use crate::Result;
use async_stream::try_stream;
use rumqttc::{matches, AsyncClient, Event, EventLoop, MqttOptions, Packet, Publish, QoS};
use serde::Serialize;
use std::pin::pin;
use std::sync::Arc;
use tokio::spawn;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio_stream::{Stream, StreamExt};
use tracing::{debug, error};
pub struct MqttHelper {
client: AsyncClient,
listeners: Arc<Mutex<Vec<(String, Sender<Publish>)>>>,
}
impl MqttHelper {
pub fn connect(opts: MqttOptions) -> Result<Self> {
let (client, event_loop) = AsyncClient::new(opts, 10);
let listeners = Arc::<Mutex<Vec<(String, Sender<_>)>>>::default();
let senders = listeners.clone();
spawn(async move {
let stream = event_loop_to_stream(event_loop);
let messages = stream
.filter_map(|event| match event {
Ok(event) => {
debug!(event = ?event, "processing event");
Some(event)
}
Err(e) => {
error!(error = ?e, "error while receiving mqtt message");
None
}
})
.filter_map(|event| match event {
Event::Incoming(Packet::Publish(message)) => Some(message),
_ => None,
});
let mut messages = pin!(messages);
while let Some(message) = messages.next().await {
let message: Publish = message;
let mut listeners_ref = senders.lock().await;
listeners_ref.retain(|(_, sender)| !sender.is_closed());
for (filter, sender) in listeners_ref.iter() {
if matches(&message.topic, filter.as_str()) {
let _ = sender.send(message.clone()).await;
}
}
}
});
Ok(Self { client, listeners })
}
pub async fn send<B: Serialize>(&self, topic: &str, body: &B) -> Result<()> {
self.client
.publish(topic, QoS::AtLeastOnce, false, serde_json::to_vec(body)?)
.await?;
Ok(())
}
pub async fn send_str(&self, topic: &str, body: &str) -> Result<()> {
self.client
.publish(topic, QoS::AtLeastOnce, false, body)
.await?;
Ok(())
}
pub async fn subscribe(&self, topic: String) -> Result<Receiver<Publish>> {
self.client.subscribe(&topic, QoS::AtLeastOnce).await?;
let (tx, rx) = channel(10);
self.listeners.lock().await.push((topic, tx));
Ok(rx)
}
}
fn event_loop_to_stream(mut event_loop: EventLoop) -> impl Stream<Item = Result<Event>> {
try_stream! {
loop {
let event = event_loop.poll().await?;
yield event;
}
}
}