handle dmsr mqtt messages

This commit is contained in:
Robin Appelman 2024-07-17 20:01:25 +02:00
commit 217298c1c7
6 changed files with 202 additions and 12 deletions

8
flake.lock generated
View file

@ -20,16 +20,16 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1714971268,
"narHash": "sha256-IKwMSwHj9+ec660l+I4tki/1NRoeGpyA2GdtdYpAgEw=",
"lastModified": 1720954236,
"narHash": "sha256-1mEKHp4m9brvfQ0rjCca8P1WHpymK3TOr3v34ydv9bs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "27c13997bf450a01219899f5a83bd6ffbfc70d3c",
"rev": "53e81e790209e41f0c1efa9ff26ff2fd7ab35e27",
"type": "github"
},
"original": {
"id": "nixpkgs",
"ref": "nixos-23.11",
"ref": "nixos-24.05",
"type": "indirect"
}
},

View file

@ -1,6 +1,6 @@
{
inputs = {
nixpkgs.url = "nixpkgs/nixos-23.11";
nixpkgs.url = "nixpkgs/nixos-24.05";
flake-utils.url = "github:numtide/flake-utils";
};

View file

@ -10,6 +10,7 @@ use tokio::task::spawn;
#[derive(Default)]
pub struct DeviceStates {
pub devices: HashMap<Device, DeviceState>,
pub dsmr_devices: HashMap<Device, DsmrState>,
pub mi_temp_devices: BTreeMap<BDAddr, MiTempState>,
pub rf_temp_devices: BTreeMap<u8, TempState>,
}
@ -19,6 +20,10 @@ impl DeviceStates {
self.devices.iter()
}
pub fn dsmr_devices(&self) -> impl Iterator<Item = (&Device, &DsmrState)> {
self.dsmr_devices.iter()
}
pub fn update(&mut self, device: Device, json: JsonValue) {
let device = self.devices.entry(device).or_default();
@ -37,6 +42,21 @@ impl DeviceStates {
device.update(json);
}
pub fn update_dsmr(&mut self, device: Device, ty: DsmrMessageType, payload: &str) {
if let Ok(value) = payload.parse() {
let state = self.dsmr_devices.entry(device).or_default();
match ty {
DsmrMessageType::Water => state.water_total = Some(value),
DsmrMessageType::Gas => state.gas_total = Some(value),
DsmrMessageType::Energy1 => state.power_total_tariff_1 = Some(value),
DsmrMessageType::Energy2 => state.power_total_tariff_2 = Some(value),
DsmrMessageType::Power => state.power = Some(value),
}
state.last_seen = Instant::now();
}
}
pub fn update_rf(&mut self, payload: &str) {
if let Some(data) = parse_rf_payload(payload) {
let state = self.rf_temp_devices.entry(data.channel).or_default();
@ -141,6 +161,37 @@ impl Default for DeviceState {
}
}
pub enum DsmrMessageType {
Water,
Gas,
Energy1,
Energy2,
Power,
}
#[derive(Debug)]
pub struct DsmrState {
pub power: Option<f32>,
pub power_total_tariff_1: Option<f32>,
pub power_total_tariff_2: Option<f32>,
pub gas_total: Option<f32>,
pub water_total: Option<f32>,
pub last_seen: Instant,
}
impl Default for DsmrState {
fn default() -> Self {
DsmrState {
power: None,
power_total_tariff_1: None,
power_total_tariff_2: None,
gas_total: None,
water_total: None,
last_seen: Instant::now(),
}
}
}
impl DeviceState {
pub fn update(&mut self, json: JsonValue) {
self.last_seen = Instant::now();
@ -458,6 +509,56 @@ pub fn format_rf_temp_state<W: Write>(
Ok(())
}
pub fn format_dsmr_state<W: Write>(
mut writer: W,
device: &str,
state: &DsmrState,
) -> std::fmt::Result {
let power_total = state.power_total_tariff_1.unwrap_or_default()
+ state.power_total_tariff_2.unwrap_or_default();
if power_total > 0.0 {
writeln!(
writer,
"power_total_kwh{{name=\"{}\"}} {}",
device, power_total
)?;
}
if let Some(power) = state.power_total_tariff_1 {
writeln!(
writer,
"power_total_low_kwh{{name=\"{}\"}} {}",
device, power
)?;
}
if let Some(power) = state.power_total_tariff_2 {
writeln!(
writer,
"power_total_high_kwh{{name=\"{}\"}} {}",
device, power
)?;
}
if let Some(power) = state.power {
writeln!(
writer,
"power_watts{{name=\"{}\"}} {}",
device,
power * 1000.0
)?;
}
if let Some(gas) = state.gas_total {
writeln!(writer, "gas_total_m3{{name=\"{}\"}} {}", device, gas)?;
}
if let Some(water) = state.water_total {
writeln!(writer, "water_total_m3{{name=\"{}\"}} {}", device, water)?;
}
Ok(())
}
/// Stores the 6 byte address used to identify Bluetooth devices.
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Copy, Clone, Hash, Eq, PartialEq, Default, Ord, PartialOrd)]

View file

@ -5,7 +5,8 @@ mod topic;
use crate::config::Config;
use crate::device::{
format_device_state, format_mi_temp_state, format_rf_temp_state, Device, DeviceStates,
format_device_state, format_dsmr_state, format_mi_temp_state, format_rf_temp_state, Device,
DeviceStates,
};
use crate::mqtt::mqtt_stream;
use crate::topic::Topic;
@ -70,6 +71,9 @@ async fn serve(device_states: Arc<Mutex<DeviceStates>>, config: Config) {
for (device, state) in state.devices() {
format_device_state(&mut response, device, state).unwrap();
}
for (device, state) in state.dsmr_devices() {
format_dsmr_state(&mut response, device.hostname.as_str(), state).unwrap();
}
for (addr, state) in state.mi_temp() {
format_mi_temp_state(&mut response, *addr, &mi_temp_names, state).unwrap()
}
@ -137,6 +141,17 @@ async fn mqtt_client<S: Stream<Item = Result<Publish>>>(
let mut device_states = device_states.lock().unwrap();
device_states.update_rf(payload);
}
topic @ (Topic::Water(_)
| Topic::Gas(_)
| Topic::Energy1(_)
| Topic::Energy2(_)
| Topic::DsmrPower(_)) => {
let payload = std::str::from_utf8(message.payload.as_ref()).unwrap_or_default();
let mut device_states = device_states.lock().unwrap();
if let Some(ty) = topic.dsmr_type() {
device_states.update_dsmr(topic.into_device(), ty, payload);
}
}
_ => {}
}
}

View file

@ -7,12 +7,20 @@ pub async fn mqtt_stream(
mqtt_options: MqttOptions,
) -> Result<(AsyncClient, impl Stream<Item = Result<Publish>>)> {
let (client, event_loop) = AsyncClient::new(mqtt_options, 10);
client.subscribe("tele/+/LWT", QoS::AtMostOnce).await?;
client.subscribe("stat/+/POWER", QoS::AtMostOnce).await?;
client.subscribe("tele/+/SENSOR", QoS::AtMostOnce).await?;
client.subscribe("stat/+/RESULT", QoS::AtMostOnce).await?;
client.subscribe("stat/+/STATUS2", QoS::AtMostOnce).await?;
client.subscribe("stat/+/+", QoS::AtMostOnce).await?;
client.subscribe("tele/+/+", QoS::AtMostOnce).await?;
client.subscribe("rflink/msg", QoS::AtMostOnce).await?;
client.subscribe("+/water", QoS::AtMostOnce).await?;
client.subscribe("+/gas_delivered", QoS::AtMostOnce).await?;
client
.subscribe("+/energy_delivered_tariff1", QoS::AtMostOnce)
.await?;
client
.subscribe("+/energy_delivered_tariff2", QoS::AtMostOnce)
.await?;
client
.subscribe("+/power_delivered_l1", QoS::AtMostOnce)
.await?;
let stream = event_loop_to_stream(event_loop).filter_map(|event| match event {
Ok(Event::Incoming(Packet::Publish(message))) => Some(Ok(message)),

View file

@ -1,4 +1,4 @@
use crate::device::Device;
use crate::device::{Device, DsmrMessageType};
#[derive(Debug, Eq, PartialEq)]
pub enum Topic {
@ -10,6 +10,42 @@ pub enum Topic {
Other(String),
Status(Device),
Msg(Device),
Water(Device),
Gas(Device),
Energy1(Device),
Energy2(Device),
DsmrPower(Device),
}
impl Topic {
pub fn dsmr_type(&self) -> Option<DsmrMessageType> {
match self {
Topic::Water(_) => Some(DsmrMessageType::Water),
Topic::Gas(_) => Some(DsmrMessageType::Gas),
Topic::Energy1(_) => Some(DsmrMessageType::Energy1),
Topic::Energy2(_) => Some(DsmrMessageType::Energy2),
Topic::DsmrPower(_) => Some(DsmrMessageType::Power),
_ => None,
}
}
pub fn into_device(self) -> Device {
match self {
Topic::Lwt(device) => device,
Topic::Power(device) => device,
Topic::State(device) => device,
Topic::Sensor(device) => device,
Topic::Result(device) => device,
Topic::Other(device) => Device { hostname: device },
Topic::Status(device) => device,
Topic::Msg(device) => device,
Topic::Water(device) => device,
Topic::Gas(device) => device,
Topic::Energy1(device) => device,
Topic::Energy2(device) => device,
Topic::DsmrPower(device) => device,
}
}
}
impl From<&str> for Topic {
@ -20,6 +56,36 @@ impl From<&str> for Topic {
};
return Topic::Msg(device);
}
if let Some(name) = raw.strip_suffix("/water") {
let device = Device {
hostname: name.to_string(),
};
return Topic::Water(device);
}
if let Some(name) = raw.strip_suffix("/gas_delivered") {
let device = Device {
hostname: name.to_string(),
};
return Topic::Gas(device);
}
if let Some(name) = raw.strip_suffix("/energy_delivered_tariff1") {
let device = Device {
hostname: name.to_string(),
};
return Topic::Energy1(device);
}
if let Some(name) = raw.strip_suffix("/energy_delivered_tariff2") {
let device = Device {
hostname: name.to_string(),
};
return Topic::Energy2(device);
}
if let Some(name) = raw.strip_suffix("/power_delivered_l1") {
let device = Device {
hostname: name.to_string(),
};
return Topic::DsmrPower(device);
}
let mut parts = raw.split('/');
if let (Some(prefix), Some(hostname), Some(cmd)) =