allow authentication against mqtt server

This commit is contained in:
Robin Appelman 2020-12-16 20:12:14 +01:00
commit cc82cc88c7
2 changed files with 39 additions and 12 deletions

View file

@ -7,10 +7,10 @@ use crate::config::Config;
use crate::device::{format_device_state, Device, DeviceState};
use crate::mqtt::mqtt_stream;
use crate::topic::Topic;
use color_eyre::{eyre::WrapErr, Report, Result};
use color_eyre::{eyre::WrapErr, Result};
use dashmap::DashMap;
use pin_utils::pin_mut;
use rumqttc::{MqttOptions, QoS};
use rumqttc::QoS;
use std::sync::Arc;
use tokio::stream::StreamExt;
use tokio::time::Duration;
@ -31,11 +31,10 @@ async fn main() -> Result<()> {
.expect("Error setting Ctrl-C handler");
let states = device_states.clone();
let mqtt_host = config.mqtt_host;
let mqtt_port = config.mqtt_port;
let mi_temp_names = config.mi_temp_names.clone();
tokio::task::spawn(async move {
loop {
if let Err(e) = mqtt_client(&mqtt_host, mqtt_port, states.clone()).await {
if let Err(e) = mqtt_client(&config, states.clone()).await {
eprintln!("lost mqtt collection: {:#}", e);
}
eprintln!("reconnecting after 1s");
@ -45,7 +44,6 @@ async fn main() -> Result<()> {
let state = warp::any().map(move || device_states.clone());
let mi_temp_names = config.mi_temp_names;
let metrics = warp::path!("metrics")
.and(state)
.map(move |state: DeviceStates| {
@ -66,12 +64,8 @@ async fn main() -> Result<()> {
Ok(())
}
async fn mqtt_client(host: &str, port: u16, device_states: DeviceStates) -> Result<()> {
let hostname = hostname::get()?
.into_string()
.map_err(|_| Report::msg("invalid hostname"))?;
let mut mqtt_options = MqttOptions::new(format!("taspromto-{}", hostname), host, port);
mqtt_options.set_keep_alive(5);
async fn mqtt_client(config: &Config, device_states: DeviceStates) -> Result<()> {
let mqtt_options = config.mqtt()?;
let (client, stream) = mqtt_stream(mqtt_options)
.await