initial version

This commit is contained in:
Robin Appelman 2020-11-15 17:09:54 +01:00
commit 0c73695f2b
4 changed files with 1861 additions and 2 deletions

View file

@ -1,3 +1,213 @@
fn main() {
println!("Hello, world!");
use dashmap::DashMap;
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
use std::convert::TryFrom;
use std::fmt::Write;
use std::str::FromStr;
use std::sync::Arc;
use warp::Filter;
type DeviceStates = Arc<DashMap<Device, DeviceState>>;
#[tokio::main]
async fn main() {
let mqtt_host = dotenv::var("MQTT_HOSTNAME").expect("MQTT_HOSTNAME not set");
let mqtt_port = dotenv::var("MQTT_PORT")
.ok()
.and_then(|port| u16::from_str(&port).ok())
.unwrap_or(1883);
let host_port = dotenv::var("PORT")
.ok()
.and_then(|port| u16::from_str(&port).ok())
.unwrap_or(3030);
let device_states = DeviceStates::default();
tokio::task::spawn(mqtt_client(mqtt_host, mqtt_port, device_states.clone()));
let state = warp::any().map(move || device_states.clone());
let metrics = warp::path!("metrics")
.and(state)
.map(|state: DeviceStates| {
let mut response = String::new();
for device in state.iter() {
writeln!(
&mut response,
"switch_state[name=\"{}\"] {}",
device.key().hostname,
if device.state { 1 } else { 0 }
)
.unwrap();
if let Some(power_watts) = device.power_watts {
writeln!(
&mut response,
"power_watts[name=\"{}\"] {}",
device.key().hostname,
power_watts
)
.unwrap();
}
if let Some(power_yesterday) = device.power_yesterday {
writeln!(
&mut response,
"power_yesterday_kwh[name=\"{}\"] {}",
device.key().hostname,
power_yesterday
)
.unwrap();
}
if let Some(power_today) = device.power_today {
writeln!(
&mut response,
"power_today_kwh[name=\"{}\"] {}",
device.key().hostname,
power_today
)
.unwrap();
}
}
response
});
warp::serve(metrics).run(([127, 0, 0, 1], host_port)).await;
}
async fn mqtt_client(host: String, port: u16, device_states: DeviceStates) {
let mut mqttoptions = MqttOptions::new("rumqtt-async", host, port);
mqttoptions.set_keep_alive(5);
let (client, mut event_loop) = AsyncClient::new(mqttoptions, 10);
client
.subscribe("tele/+/+/LWT", QoS::AtMostOnce)
.await
.unwrap();
client
.subscribe("stat/+/+/POWER", QoS::AtMostOnce)
.await
.unwrap();
client
.subscribe("tele/+/+/SENSOR", QoS::AtMostOnce)
.await
.unwrap();
loop {
let notification = event_loop.poll().await.unwrap();
if let Event::Incoming(Packet::Publish(message)) = notification {
let topic = Topic::from(message.topic.as_str());
match topic {
Topic::LWT(device) => {
// on discovery, ask the device for it's power state
client
.publish(
device.get_topic("cmnd", "POWER"),
QoS::AtMostOnce,
false,
"",
)
.await
.unwrap();
}
Topic::Power(device) => {
let state = message.payload.as_ref() == b"ON";
device_states.entry(device).or_default().state = state;
}
Topic::Sensor(device) => {
let payload = std::str::from_utf8(message.payload.as_ref()).unwrap_or_default();
if let Ok(json) = json::parse(payload) {
let mut device_state = device_states.entry(device).or_default();
device_state.power_watts = json["ENERGY"]["Power"]
.as_number()
.map(|num| f32::try_from(num).unwrap_or_default());
device_state.power_yesterday = json["ENERGY"]["Yesterday"]
.as_number()
.map(|num| f32::try_from(num).unwrap_or_default());
device_state.power_today = json["ENERGY"]["Today"]
.as_number()
.map(|num| f32::try_from(num).unwrap_or_default());
}
}
_ => {}
}
}
}
}
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
struct Device {
topic: String,
hostname: String,
}
impl Device {
fn get_topic(&self, prefix: &str, command: &str) -> String {
format!("{}/{}/{}/{}", prefix, self.topic, self.hostname, command)
}
}
#[derive(Debug, Default)]
struct DeviceState {
state: bool,
power_watts: Option<f32>,
power_yesterday: Option<f32>,
power_today: Option<f32>,
}
#[derive(Debug, Eq, PartialEq)]
enum Topic {
LWT(Device),
Power(Device),
State(Device),
Sensor(Device),
Other(String),
}
impl From<&str> for Topic {
fn from(raw: &str) -> Self {
let mut parts = raw.split('/');
if let (Some(prefix), Some(topic), Some(hostname), Some(cmd)) =
(parts.next(), parts.next(), parts.next(), parts.next())
{
let device = Device {
topic: topic.to_string(),
hostname: hostname.to_string(),
};
match (prefix, cmd) {
("tele", "LWT") => Topic::LWT(device),
("tele", "STATE") => Topic::State(device),
("stat", "POWER") => Topic::Power(device),
("tele", "SENSOR") => Topic::Sensor(device),
_ => Topic::Other(raw.to_string()),
}
} else {
Topic::Other(raw.to_string())
}
}
}
#[test]
fn parse_topic() {
let device = Device {
hostname: "hostname".to_string(),
topic: "foo".to_string(),
};
assert_eq!(
Topic::LWT(device.clone()),
Topic::from("tele/foo/hostname/LWT")
);
assert_eq!(
Topic::Power(device.clone()),
Topic::from("stat/foo/hostname/POWER")
);
assert_eq!(
Topic::State(device.clone()),
Topic::from("tele/foo/hostname/STATE")
);
assert_eq!(
Topic::Sensor(device.clone()),
Topic::from("tele/foo/hostname/SENSOR")
);
}