initial implementation

This commit is contained in:
Robin Appelman 2020-12-15 00:03:44 +01:00
commit 4a11d00024
8 changed files with 2520 additions and 2 deletions

2
.env Normal file
View file

@ -0,0 +1,2 @@
MQTT_HOSTNAME=astoria
PORT=3030

2208
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -7,3 +7,15 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
rumqttc = "0.2.0"
tokio = "0.2"
dashmap = "3.11"
json = "0.12.4"
warp = "0.2.5"
dotenv = "0.15.0"
ctrlc = { version = "3.1.7", features = ["termination"] }
color-eyre = "0.5.7"
async-stream = "0.3.0"
pin-utils = "0.1.0"
hostname = "^0.3"
warp-reverse-proxy = "0.2.0"

29
src/config.rs Normal file
View file

@ -0,0 +1,29 @@
use color_eyre::{eyre::WrapErr, Result};
use std::str::FromStr;
#[derive(Default)]
pub struct Config {
pub mqtt_host: String,
pub mqtt_port: u16,
pub host_port: u16,
}
impl Config {
pub fn from_env() -> Result<Self> {
let mqtt_host = dotenv::var("MQTT_HOSTNAME").wrap_err("MQTT_HOSTNAME not set")?;
let mqtt_port = dotenv::var("MQTT_PORT")
.ok()
.and_then(|port| u16::from_str(&port).ok())
.unwrap_or(1883);
let host_port = dotenv::var("PORT")
.ok()
.and_then(|port| u16::from_str(&port).ok())
.unwrap_or(80);
Ok(Config {
mqtt_host,
mqtt_port,
host_port,
})
}
}

43
src/devices.rs Normal file
View file

@ -0,0 +1,43 @@
use json::JsonValue;
use std::net::IpAddr;
use std::str::FromStr;
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct Device {
pub topic: String,
pub hostname: String,
}
impl Device {
pub fn get_topic(&self, prefix: &str, command: &str) -> String {
format!("{}/{}/{}/{}", prefix, self.topic, self.hostname, command)
}
}
#[derive(Debug, Default)]
pub struct DeviceState {
name: String,
pub ip: Option<IpAddr>,
}
impl DeviceState {
pub fn update(&mut self, json: JsonValue) {
if json["DeviceName"].is_string() && !json["DeviceName"].is_empty() {
self.name = json["DeviceName"].to_string();
}
if !json["IPAddress1"].is_empty() {
let result = json["IPAddress1"].to_string();
if let Some(Ok(ip)) = result
.split(' ')
.map(|part| part.trim_start_matches('(').trim_end_matches(')'))
.rev()
.map(IpAddr::from_str)
.next()
{
self.ip = Some(ip);
} else {
eprintln!("malformed ipaddress result: {}", result);
}
}
}
}

View file

@ -1,3 +1,141 @@
fn main() { use crate::config::Config;
println!("Hello, world!"); use crate::devices::DeviceState;
use crate::mqtt::mqtt_stream;
use crate::topic::Topic;
use color_eyre::{eyre::WrapErr, Report, Result};
use dashmap::DashMap;
use pin_utils::pin_mut;
use rumqttc::{MqttOptions, QoS};
use std::sync::Arc;
use std::time::Duration;
use tokio::stream::StreamExt;
use warp::hyper::http::uri::Authority;
use warp::Filter;
use warp_reverse_proxy::{extract_request_data_filter, proxy_to_and_forward_response};
mod config;
mod devices;
mod mqtt;
mod topic;
type DeviceStates = Arc<DashMap<String, DeviceState>>;
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::from_env()?;
let host_port = config.host_port;
let device_states = DeviceStates::default();
ctrlc::set_handler(move || {
std::process::exit(0);
})
.expect("Error setting Ctrl-C handler");
let states = device_states.clone();
let mqtt_host = config.mqtt_host;
let mqtt_port = config.mqtt_port;
tokio::task::spawn(async move {
loop {
if let Err(e) = mqtt_client(&mqtt_host, mqtt_port, states.clone()).await {
eprintln!("lost mqtt collection: {:#}", e);
}
eprintln!("reconnecting after 1s");
tokio::time::delay_for(Duration::from_secs(1)).await;
}
});
let state = warp::any().map(move || device_states.clone());
let proxy = warp::any()
.and(warp::filters::host::optional())
.and(state)
.and_then(
move |host: Option<Authority>, states: DeviceStates| async move {
let host = match host {
Some(host) => host,
None => return Err(warp::reject::not_found()),
};
let requested_device = host.as_str().split('.').next().unwrap();
if let Some(state) = states.get(requested_device) {
if let Some(ip) = state.ip {
Ok((format!("http://{}", ip), String::new()))
} else {
Err(warp::reject::not_found())
}
} else {
Err(warp::reject::not_found())
}
},
)
.untuple_one()
.and(extract_request_data_filter())
.and_then(proxy_to_and_forward_response);
warp::serve(proxy).run(([0, 0, 0, 0], host_port)).await;
Ok(())
}
async fn mqtt_client(host: &str, port: u16, device_states: DeviceStates) -> Result<()> {
let hostname = hostname::get()?
.into_string()
.map_err(|_| Report::msg("invalid hostname"))?;
let mut mqtt_options = MqttOptions::new(format!("taspromto-{}", hostname), host, port);
mqtt_options.set_keep_alive(5);
let (client, stream) = mqtt_stream(mqtt_options)
.await
.wrap_err("Failed to setup mqtt listener")?;
pin_mut!(stream);
while let Some(message) = stream.next().await {
let message = message?;
println!(
"{} {}",
message.topic,
std::str::from_utf8(message.payload.as_ref()).unwrap_or_default()
);
let topic = Topic::from(message.topic.as_str());
match topic {
Topic::LWT(device) => {
// on discovery, ask the device for it's ip and name
let send_client = client.clone();
tokio::task::spawn(async move {
if let Err(e) = send_client
.publish(
device.get_topic("cmnd", "IPADDRESS"),
QoS::AtMostOnce,
false,
"",
)
.await
{
eprintln!("Failed to ask for power state: {:#}", e);
}
if let Err(e) = send_client
.publish(
device.get_topic("cmnd", "DeviceName"),
QoS::AtMostOnce,
false,
"",
)
.await
{
eprintln!("Failed to ask for device name: {:#}", e);
}
});
}
Topic::Result(device) => {
let payload = std::str::from_utf8(message.payload.as_ref()).unwrap_or_default();
if let Ok(json) = json::parse(payload) {
let mut device_state = device_states.entry(device.hostname).or_default();
device_state.update(json);
}
}
_ => {}
}
}
Ok(())
} }

29
src/mqtt.rs Normal file
View file

@ -0,0 +1,29 @@
use async_stream::try_stream;
use color_eyre::Result;
use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, Publish, QoS};
use tokio::stream::{Stream, StreamExt};
pub async fn mqtt_stream(
mqtt_options: MqttOptions,
) -> Result<(AsyncClient, impl Stream<Item = Result<Publish>>)> {
let (client, event_loop) = AsyncClient::new(mqtt_options, 10);
client.subscribe("tele/+/+/LWT", QoS::AtMostOnce).await?;
client.subscribe("stat/+/+/RESULT", QoS::AtMostOnce).await?;
let stream = event_loop_to_stream(event_loop).filter_map(|event| match event {
Ok(Event::Incoming(Packet::Publish(message))) => Some(Ok(message)),
Ok(_) => None,
Err(e) => Some(Err(e)),
});
Ok((client, stream))
}
fn event_loop_to_stream(mut event_loop: EventLoop) -> impl Stream<Item = Result<Event>> {
try_stream! {
loop {
let event = event_loop.poll().await?;
yield event;
}
}
}

57
src/topic.rs Normal file
View file

@ -0,0 +1,57 @@
use crate::devices::Device;
#[derive(Debug, Eq, PartialEq)]
pub enum Topic {
LWT(Device),
State(Device),
Sensor(Device),
Result(Device),
Other(String),
}
impl From<&str> for Topic {
fn from(raw: &str) -> Self {
let mut parts = raw.split('/');
if let (Some(prefix), Some(topic), Some(hostname), Some(cmd)) =
(parts.next(), parts.next(), parts.next(), parts.next())
{
let device = Device {
topic: topic.to_string(),
hostname: hostname.to_string(),
};
match (prefix, cmd) {
("tele", "LWT") => Topic::LWT(device),
("tele", "STATE") => Topic::State(device),
("tele", "SENSOR") => Topic::Sensor(device),
("stat", "RESULT") => Topic::Result(device),
_ => Topic::Other(raw.to_string()),
}
} else {
Topic::Other(raw.to_string())
}
}
}
#[test]
fn parse_topic() {
let device = Device {
hostname: "hostname".to_string(),
topic: "foo".to_string(),
};
assert_eq!(
Topic::LWT(device.clone()),
Topic::from("tele/foo/hostname/LWT")
);
assert_eq!(
Topic::State(device.clone()),
Topic::from("tele/foo/hostname/STATE")
);
assert_eq!(
Topic::Sensor(device.clone()),
Topic::from("tele/foo/hostname/SENSOR")
);
assert_eq!(
Topic::Result(device.clone()),
Topic::from("stat/foo/hostname/RESULT")
);
}