mirror of
https://codeberg.org/icewind/taspromto.git
synced 2026-06-03 08:34:21 +02:00
support rtl433 temp sensors
This commit is contained in:
parent
217298c1c7
commit
347cfe2054
6 changed files with 137 additions and 24 deletions
|
|
@ -1,7 +1,7 @@
|
|||
use crate::device::BDAddr;
|
||||
use crate::device::{BDAddr, RfDeviceId};
|
||||
use color_eyre::{eyre::WrapErr, Report, Result};
|
||||
use rumqttc::MqttOptions;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
@ -11,7 +11,7 @@ pub struct Config {
|
|||
pub mqtt_port: u16,
|
||||
pub host_port: u16,
|
||||
pub mi_temp_names: BTreeMap<BDAddr, String>,
|
||||
pub rf_temp_names: BTreeMap<u8, String>,
|
||||
pub rf_temp_names: HashMap<RfDeviceId<'static>, String>,
|
||||
pub mqtt_credentials: Option<Credentials>,
|
||||
}
|
||||
|
||||
|
|
@ -54,14 +54,15 @@ impl Config {
|
|||
.split(',')
|
||||
.map(|pair| {
|
||||
let mut parts = pair.split('=');
|
||||
if let (Some(mac), Some(name)) = (parts.next().map(u8::from_str), parts.next()) {
|
||||
let channel = mac.wrap_err("Invalid RF_TEMP_NAMES")?;
|
||||
Ok((channel, name.to_string()))
|
||||
if let (Some(channel), Some(name)) = (parts.next(), parts.next()) {
|
||||
let device_id =
|
||||
RfDeviceId::from_str(channel).wrap_err("Invalid RF_TEMP_NAMES")?;
|
||||
Ok((device_id, name.to_string()))
|
||||
} else {
|
||||
Err(Report::msg("Invalid RF_TEMP_NAMES"))
|
||||
}
|
||||
})
|
||||
.collect::<Result<BTreeMap<u8, String>, Report>>()?;
|
||||
.collect::<Result<HashMap<_, _>, Report>>()?;
|
||||
|
||||
let mqtt_credentials = match dotenvy::var("MQTT_USERNAME") {
|
||||
Ok(username) => {
|
||||
|
|
|
|||
104
src/device.rs
104
src/device.rs
|
|
@ -1,9 +1,12 @@
|
|||
use color_eyre::{eyre::WrapErr, Report, Result};
|
||||
use jzon::JsonValue;
|
||||
use rumqttc::{AsyncClient, QoS};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::{self, Debug, Display, Formatter, Write};
|
||||
use std::num::ParseIntError;
|
||||
use std::str::FromStr;
|
||||
use std::time::Instant;
|
||||
use tokio::task::spawn;
|
||||
|
||||
|
|
@ -12,7 +15,8 @@ pub struct DeviceStates {
|
|||
pub devices: HashMap<Device, DeviceState>,
|
||||
pub dsmr_devices: HashMap<Device, DsmrState>,
|
||||
pub mi_temp_devices: BTreeMap<BDAddr, MiTempState>,
|
||||
pub rf_temp_devices: BTreeMap<u8, TempState>,
|
||||
pub rf_temp_devices: HashMap<RfDeviceId<'static>, TempState>,
|
||||
active_rf_temp_id: RfDeviceId<'static>,
|
||||
}
|
||||
|
||||
impl DeviceStates {
|
||||
|
|
@ -59,7 +63,10 @@ impl DeviceStates {
|
|||
|
||||
pub fn update_rf(&mut self, payload: &str) {
|
||||
if let Some(data) = parse_rf_payload(payload) {
|
||||
let state = self.rf_temp_devices.entry(data.channel).or_default();
|
||||
let state = self
|
||||
.rf_temp_devices
|
||||
.entry(data.device_id().to_owned())
|
||||
.or_default();
|
||||
state.humidity = data.humidity;
|
||||
state.temperature = data.temperature;
|
||||
} else {
|
||||
|
|
@ -67,14 +74,42 @@ impl DeviceStates {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn update_rtl(&mut self, device: &str, field: &str, payload: &str) {
|
||||
if self.active_rf_temp_id.name != device {
|
||||
self.active_rf_temp_id = RfDeviceId::default();
|
||||
self.active_rf_temp_id.name = device.to_string().into();
|
||||
}
|
||||
match field {
|
||||
"id" => self.active_rf_temp_id.id = payload.parse().unwrap_or_default(),
|
||||
"channel" => self.active_rf_temp_id.channel = payload.parse().unwrap_or_default(),
|
||||
"temperature_F" | "humidity" => self.update_active_rtl(field, payload),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_active_rtl(&mut self, field: &str, payload: &str) {
|
||||
let state = self
|
||||
.rf_temp_devices
|
||||
.entry(self.active_rf_temp_id.to_owned())
|
||||
.or_default();
|
||||
match field {
|
||||
"temperature_F" => {
|
||||
state.temperature = payload
|
||||
.parse()
|
||||
.map(|temp_f: f32| (temp_f - 32.0) * 5.0 / 9.0)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
"humidity" => state.humidity = payload.parse().unwrap_or_default(),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mi_temp(&self) -> impl Iterator<Item = (&BDAddr, &MiTempState)> {
|
||||
self.mi_temp_devices.iter()
|
||||
}
|
||||
|
||||
pub fn rf_temp(&self) -> impl Iterator<Item = (u8, &TempState)> {
|
||||
self.rf_temp_devices
|
||||
.iter()
|
||||
.map(|(channel, state)| (*channel, state))
|
||||
pub fn rf_temp(&self) -> impl Iterator<Item = (&RfDeviceId<'static>, &TempState)> {
|
||||
self.rf_temp_devices.iter()
|
||||
}
|
||||
|
||||
pub fn retain(&mut self, cleanup_time: Instant, ping_time: Instant, client: &AsyncClient) {
|
||||
|
|
@ -481,11 +516,11 @@ pub struct TempState {
|
|||
|
||||
pub fn format_rf_temp_state<W: Write>(
|
||||
mut writer: W,
|
||||
channel: u8,
|
||||
names: &BTreeMap<u8, String>,
|
||||
channel: &RfDeviceId,
|
||||
names: &HashMap<RfDeviceId, String>,
|
||||
state: &TempState,
|
||||
) -> std::fmt::Result {
|
||||
let name = if let Some(name) = names.get(&channel) {
|
||||
let name = if let Some(name) = names.get(channel) {
|
||||
name
|
||||
} else {
|
||||
return Ok(());
|
||||
|
|
@ -494,16 +529,16 @@ pub fn format_rf_temp_state<W: Write>(
|
|||
if state.temperature > 0.0 {
|
||||
writeln!(
|
||||
writer,
|
||||
"sensor_temperature{{channel=\"{}\", name=\"{}\"}} {}",
|
||||
channel, name, state.temperature
|
||||
"sensor_temperature{{model=\"{}\", id=\"{}\", channel=\"{}\", name=\"{}\"}} {}",
|
||||
channel.name, channel.id, channel.channel, name, state.temperature
|
||||
)?;
|
||||
}
|
||||
|
||||
if state.humidity > 0 {
|
||||
writeln!(
|
||||
writer,
|
||||
"sensor_humidity{{channel=\"{}\", name=\"{}\"}} {}",
|
||||
channel, name, state.humidity
|
||||
"sensor_humidity{{model=\"{}\", id=\"{}\", channel=\"{}\", name=\"{}\"}} {}",
|
||||
channel.name, channel.id, channel.channel, name, state.humidity
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
|
|
@ -781,6 +816,49 @@ struct RfPayload<'a> {
|
|||
humidity: u8,
|
||||
}
|
||||
|
||||
impl<'a> RfPayload<'a> {
|
||||
pub fn device_id(&self) -> RfDeviceId<'a> {
|
||||
RfDeviceId {
|
||||
name: Cow::Borrowed(self.name),
|
||||
id: self.id,
|
||||
channel: self.channel,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Hash, PartialEq, Eq, Debug, Clone, Default)]
|
||||
pub struct RfDeviceId<'a> {
|
||||
name: Cow<'a, str>,
|
||||
id: u16,
|
||||
channel: u8,
|
||||
}
|
||||
|
||||
impl RfDeviceId<'_> {
|
||||
pub fn to_owned(&self) -> RfDeviceId<'static> {
|
||||
RfDeviceId {
|
||||
name: Cow::Owned(self.name.to_string()),
|
||||
id: self.id,
|
||||
channel: self.channel,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for RfDeviceId<'static> {
|
||||
type Err = ParseIntError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let mut parts = s.splitn(3, ':');
|
||||
let name = parts.next().unwrap_or_default();
|
||||
let id = parts.next().unwrap_or_default().parse()?;
|
||||
let channel = parts.next().unwrap_or_default().parse()?;
|
||||
Ok(RfDeviceId {
|
||||
name: name.to_string().into(),
|
||||
id,
|
||||
channel,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_rf_payload(payload: &str) -> Option<RfPayload> {
|
||||
let mut parts = payload.split(";").skip(2);
|
||||
let name = parts.next()?;
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ async fn serve(device_states: Arc<Mutex<DeviceStates>>, config: Config) {
|
|||
format_mi_temp_state(&mut response, *addr, &mi_temp_names, state).unwrap()
|
||||
}
|
||||
for (channel, state) in state.rf_temp() {
|
||||
format_rf_temp_state(&mut response, channel, &rf_temp_names, state).unwrap()
|
||||
format_rf_temp_state(&mut response, &channel, &rf_temp_names, state).unwrap()
|
||||
}
|
||||
response
|
||||
});
|
||||
|
|
@ -141,6 +141,11 @@ async fn mqtt_client<S: Stream<Item = Result<Publish>>>(
|
|||
let mut device_states = device_states.lock().unwrap();
|
||||
device_states.update_rf(payload);
|
||||
}
|
||||
Topic::Rtl(device, field) => {
|
||||
let payload = std::str::from_utf8(message.payload.as_ref()).unwrap_or_default();
|
||||
let mut device_states = device_states.lock().unwrap();
|
||||
device_states.update_rtl(&device.hostname, &field, payload);
|
||||
}
|
||||
topic @ (Topic::Water(_)
|
||||
| Topic::Gas(_)
|
||||
| Topic::Energy1(_)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ pub async fn mqtt_stream(
|
|||
client.subscribe("stat/+/+", QoS::AtMostOnce).await?;
|
||||
client.subscribe("tele/+/+", QoS::AtMostOnce).await?;
|
||||
client.subscribe("rflink/msg", QoS::AtMostOnce).await?;
|
||||
client.subscribe("rtl_433/#", QoS::AtMostOnce).await?;
|
||||
client.subscribe("+/water", QoS::AtMostOnce).await?;
|
||||
client.subscribe("+/gas_delivered", QoS::AtMostOnce).await?;
|
||||
client
|
||||
|
|
|
|||
11
src/topic.rs
11
src/topic.rs
|
|
@ -15,6 +15,7 @@ pub enum Topic {
|
|||
Energy1(Device),
|
||||
Energy2(Device),
|
||||
DsmrPower(Device),
|
||||
Rtl(Device, String),
|
||||
}
|
||||
|
||||
impl Topic {
|
||||
|
|
@ -44,6 +45,7 @@ impl Topic {
|
|||
Topic::Energy1(device) => device,
|
||||
Topic::Energy2(device) => device,
|
||||
Topic::DsmrPower(device) => device,
|
||||
Topic::Rtl(device, _) => device,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -56,6 +58,15 @@ impl From<&str> for Topic {
|
|||
};
|
||||
return Topic::Msg(device);
|
||||
}
|
||||
if let Some((device, topic)) = raw
|
||||
.strip_prefix("rtl_433/")
|
||||
.and_then(|topic| topic.split_once('/'))
|
||||
{
|
||||
let device = Device {
|
||||
hostname: device.to_string(),
|
||||
};
|
||||
return Topic::Rtl(device, topic.into());
|
||||
}
|
||||
if let Some(name) = raw.strip_suffix("/water") {
|
||||
let device = Device {
|
||||
hostname: name.to_string(),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue