allow authentication against mqtt server

This commit is contained in:
Robin Appelman 2020-12-16 20:08:04 +01:00
commit df467a1381
5 changed files with 42 additions and 14 deletions

2
.env
View file

@ -1,2 +0,0 @@
MQTT_HOSTNAME=astoria
PORT=3030

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target /target
.env

View file

@ -16,6 +16,8 @@ Run the binary with the following environment variables
- `MQTT_HOSTNAME`: hostname of the MQTT server to connect to - `MQTT_HOSTNAME`: hostname of the MQTT server to connect to
- `MQTT_PORT`: port of the mqtt server to connect to, defaults to 1883 - `MQTT_PORT`: port of the mqtt server to connect to, defaults to 1883
- `MQTT_USERNAME`: username to authenticate against the mqtt server
- `MQTT_PASSWORD`: password to authenticate against the mqtt server
- `PORT`: port this binary MQTT listen on, defaults to 80 - `PORT`: port this binary MQTT listen on, defaults to 80
Setup dns/hosts/etc to point *.example.com to the server running this binary Setup dns/hosts/etc to point *.example.com to the server running this binary

View file

@ -1,13 +1,20 @@
use color_eyre::{eyre::WrapErr, Result}; use color_eyre::{eyre::WrapErr, Report, Result};
use rumqttc::MqttOptions;
use std::str::FromStr; use std::str::FromStr;
#[derive(Default)] #[derive(Default)]
pub struct Config { pub struct Config {
pub mqtt_host: String, pub mqtt_host: String,
pub mqtt_port: u16, pub mqtt_port: u16,
pub mqtt_credentials: Option<Credentials>,
pub host_port: u16, pub host_port: u16,
} }
pub struct Credentials {
username: String,
password: String,
}
impl Config { impl Config {
pub fn from_env() -> Result<Self> { pub fn from_env() -> Result<Self> {
let mqtt_host = dotenv::var("MQTT_HOSTNAME").wrap_err("MQTT_HOSTNAME not set")?; let mqtt_host = dotenv::var("MQTT_HOSTNAME").wrap_err("MQTT_HOSTNAME not set")?;
@ -20,10 +27,36 @@ impl Config {
.and_then(|port| u16::from_str(&port).ok()) .and_then(|port| u16::from_str(&port).ok())
.unwrap_or(80); .unwrap_or(80);
let mqtt_credentials = match dotenv::var("MQTT_USERNAME") {
Ok(username) => {
let password = dotenv::var("MQTT_PASSWORD")
.wrap_err("MQTT_USERNAME set, but MQTT_PASSWORD not set")?;
Some(Credentials { username, password })
}
Err(_) => None,
};
Ok(Config { Ok(Config {
mqtt_host, mqtt_host,
mqtt_port, mqtt_port,
host_port, host_port,
mqtt_credentials,
}) })
} }
pub fn mqtt(&self) -> Result<MqttOptions> {
let hostname = hostname::get()?
.into_string()
.map_err(|_| Report::msg("invalid hostname"))?;
let mut mqtt_options = MqttOptions::new(
format!("tasproxy-{}", hostname),
&self.mqtt_host,
self.mqtt_port,
);
if let Some(credentials) = self.mqtt_credentials.as_ref() {
mqtt_options.set_credentials(&credentials.username, &credentials.password);
}
mqtt_options.set_keep_alive(5);
Ok(mqtt_options)
}
} }

View file

@ -2,10 +2,10 @@ use crate::config::Config;
use crate::devices::DeviceState; use crate::devices::DeviceState;
use crate::mqtt::mqtt_stream; use crate::mqtt::mqtt_stream;
use crate::topic::Topic; use crate::topic::Topic;
use color_eyre::{eyre::WrapErr, Report, Result}; use color_eyre::{eyre::WrapErr, Result};
use dashmap::DashMap; use dashmap::DashMap;
use pin_utils::pin_mut; use pin_utils::pin_mut;
use rumqttc::{MqttOptions, QoS}; use rumqttc::QoS;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::stream::StreamExt; use tokio::stream::StreamExt;
@ -33,11 +33,9 @@ async fn main() -> Result<()> {
.expect("Error setting Ctrl-C handler"); .expect("Error setting Ctrl-C handler");
let states = device_states.clone(); let states = device_states.clone();
let mqtt_host = config.mqtt_host;
let mqtt_port = config.mqtt_port;
tokio::task::spawn(async move { tokio::task::spawn(async move {
loop { loop {
if let Err(e) = mqtt_client(&mqtt_host, mqtt_port, states.clone()).await { if let Err(e) = mqtt_client(&config, states.clone()).await {
eprintln!("lost mqtt collection: {:#}", e); eprintln!("lost mqtt collection: {:#}", e);
} }
eprintln!("reconnecting after 1s"); eprintln!("reconnecting after 1s");
@ -79,12 +77,8 @@ async fn main() -> Result<()> {
Ok(()) Ok(())
} }
async fn mqtt_client(host: &str, port: u16, device_states: DeviceStates) -> Result<()> { async fn mqtt_client(config: &Config, device_states: DeviceStates) -> Result<()> {
let hostname = hostname::get()? let mqtt_options = config.mqtt()?;
.into_string()
.map_err(|_| Report::msg("invalid hostname"))?;
let mut mqtt_options = MqttOptions::new(format!("tasproxy-{}", hostname), host, port);
mqtt_options.set_keep_alive(5);
let (client, stream) = mqtt_stream(mqtt_options) let (client, stream) = mqtt_stream(mqtt_options)
.await .await