use mdns 1.1

This commit is contained in:
Robin Appelman 2020-01-31 22:19:16 +01:00
commit cf0a704f21
6 changed files with 124 additions and 167 deletions

View file

@ -1,6 +1,6 @@
use serde::Deserialize;
use err_derive::Error;
use crate::mdns::resolve_mdns;
use err_derive::Error;
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Debug, Error)]
@ -38,27 +38,25 @@ pub enum Parameter {
impl Parameter {
pub async fn get_value(&self) -> Result<String, ParameterError> {
match self {
Parameter::Mdns {
service,
host
} => {
match resolve_mdns(service, host).await? {
Some(service) => Ok(format!("{}:{}", service.addr, service.port)),
None => Err(ParameterError::MdnsHostNotFound)
}
}
Parameter::Mdns { service, host } => match resolve_mdns(service, host).await? {
Some(service) => Ok(service.to_string()),
None => Err(ParameterError::MdnsHostNotFound),
},
Parameter::Value { value } => Ok(value.clone()),
Parameter::Service {
file, key, value
} => {
Parameter::Service { file, key, value } => {
let content = tokio::fs::read(file).await?;
let services: Vec<Service> = serde_json::from_slice(&content)?;
services.into_iter().find_map(|service| {
service.labels.get(key)
.filter(|val| *val == value)
.and_then(|_| service.targets.get(0))
.cloned()
}).ok_or(ParameterError::ServiceNotFound)
services
.into_iter()
.find_map(|service| {
service
.labels
.get(key)
.filter(|val| *val == value)
.and_then(|_| service.targets.get(0))
.cloned()
})
.ok_or(ParameterError::ServiceNotFound)
}
}
}
@ -88,7 +86,7 @@ pub struct Config {
#[derive(Debug, Clone, Deserialize)]
pub struct PrometheusConfig {
pub url: String
pub url: String,
}
#[derive(Debug, Clone, Deserialize)]
@ -111,4 +109,4 @@ pub enum Method {
pub struct Service {
targets: Vec<String>,
labels: HashMap<String, String>,
}
}

View file

@ -1,7 +1,7 @@
use main_error::MainError;
use tokio::fs::File;
use crate::config::Config;
use crate::trigger::TriggerManager;
use main_error::MainError;
use tokio::fs::File;
use tokio::prelude::*;
mod config;
@ -28,4 +28,4 @@ async fn main() -> Result<(), MainError> {
println!("Usage {} config.toml", bin);
return Ok(());
}
}
}

View file

@ -1,87 +1,15 @@
use tokio::time::timeout;
use futures_util::{pin_mut, stream::StreamExt};
use mdns::{Record, RecordKind};
use std::{net::IpAddr, time::Duration};
use std::net::SocketAddr;
use std::time::Duration;
const INTERVAL: Duration = Duration::from_secs(3);
#[derive(Debug)]
pub struct MdnsService {
pub id: String,
pub name: String,
pub addr: IpAddr,
pub port: u16,
pub async fn resolve_mdns(
service: &str,
search_name: &str,
) -> Result<Option<SocketAddr>, mdns::Error> {
Ok(mdns::resolve::one(
service,
&format!("{}.{}", search_name, service),
Duration::from_secs(15),
)
.await?
.and_then(|response| response.socket_address()))
}
pub async fn resolve_mdns(service: &str, search_name: &str) -> Result<Option<MdnsService>, mdns::Error> {
let stream = mdns::discover::all(service, INTERVAL)?.listen();
pin_mut!(stream);
let process = async move {
while let Some(Ok(response)) = stream.next().await {
let id = response.records().find_map(to_id);
let addr = response.records().find_map(to_ip_addr);
let port = response.records().find_map(to_port);
let name = response.records().find_map(to_name);
if let (Some(id), Some(addr), Some(name), Some(port)) = (id, addr, name, port) {
let service = MdnsService {
id,
name,
addr,
port,
};
if service.id == search_name {
return Some(service);
}
}
}
None
};
match timeout(Duration::from_secs(5), process).await {
Err(_) => Ok(None),
Ok(res) => Ok(res)
}
}
fn to_id(record: &Record) -> Option<String> {
match &record.kind {
RecordKind::PTR(id) => {
id.split('.').next().map(|s| s.to_string())
}
_ => None,
}
}
fn to_ip_addr(record: &Record) -> Option<IpAddr> {
match record.kind {
RecordKind::A(addr) => Some(addr.into()),
RecordKind::AAAA(addr) => Some(addr.into()),
_ => None,
}
}
fn to_port(record: &Record) -> Option<u16> {
match record.kind {
RecordKind::SRV { port, .. } => Some(port),
_ => None,
}
}
fn to_name(record: &Record) -> Option<String> {
if let RecordKind::TXT(txt) = &record.kind {
txt.iter()
.find_map(|pair| {
let mut parts = pair.split('=');
if let (Some("name"), Some(value)) = (parts.next(), parts.next()) {
Some(value.to_string())
} else {
None
}
})
} else {
None
}
}

View file

@ -1,13 +1,13 @@
use main_error::MainError;
use std::collections::HashMap;
use crate::config::{Parameter, ParameterError, Trigger, Config, Action, Method, Condition};
use prometheus_edge_detector::EdgeDetector;
use futures_util::future::try_join_all;
use std::time::{Duration, SystemTime};
use reqwest::Client;
use tokio::time::delay_for;
use log::{info, error};
use crate::config::{Action, Condition, Config, Method, Parameter, ParameterError, Trigger};
use err_derive::Error;
use futures_util::future::try_join_all;
use log::{error, info};
use main_error::MainError;
use prometheus_edge_detector::EdgeDetector;
use reqwest::Client;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time::delay_for;
pub struct TriggerManager {
http_client: Client,
@ -16,7 +16,10 @@ pub struct TriggerManager {
}
fn now() -> u64 {
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
}
fn since(time: u64) -> u64 {
@ -45,9 +48,12 @@ impl TriggerManager {
}
pub async fn run_triggers(&self) -> Result<(), MainError> {
try_join_all(self.triggers.iter().map(|trigger| {
self.run_trigger(trigger)
})).await?;
try_join_all(
self.triggers
.iter()
.map(|trigger| self.run_trigger(trigger)),
)
.await?;
Ok(())
}
@ -61,7 +67,10 @@ impl TriggerManager {
Ok(Some(edge)) => {
let elapsed = since(edge);
let wait = delay.saturating_sub(elapsed);
info!("[{}] Found edge, {}s ago, waiting {}s before triggering", trigger.name, elapsed, wait);
info!(
"[{}] Found edge, {}s ago, waiting {}s before triggering",
trigger.name, elapsed, wait
);
let wait_delay = Duration::from_secs(wait);
delay_for(wait_delay).await;
@ -84,7 +93,10 @@ impl TriggerManager {
}
}
Ok(None) => {
info!("[{}] No edge found, waiting {}s before looking for new edge", trigger.name, delay);
info!(
"[{}] No edge found, waiting {}s before looking for new edge",
trigger.name, delay
);
delay_for(delay_duration).await;
}
Err(e) => {
@ -95,16 +107,31 @@ impl TriggerManager {
}
}
async fn get_edge(&self, condition: &Condition, delay: u64) -> Result<Option<u64>, TriggerError> {
async fn get_edge(
&self,
condition: &Condition,
delay: u64,
) -> Result<Option<u64>, TriggerError> {
let query = interpolate_params(&condition.query, &condition.params).await?;
Ok(self.edge_detector.get_last_edge(&query, condition.from, condition.to, Duration::from_secs(delay + 60)).await?)
Ok(self
.edge_detector
.get_last_edge(
&query,
condition.from,
condition.to,
Duration::from_secs(delay + 60),
)
.await?)
}
}
async fn interpolate_params(input: &str, params: &HashMap<String, Parameter>) -> Result<String, ParameterError> {
let futures = params.values().map(|definition| {
Box::pin(definition.get_value())
});
async fn interpolate_params(
input: &str,
params: &HashMap<String, Parameter>,
) -> Result<String, ParameterError> {
let futures = params
.values()
.map(|definition| Box::pin(definition.get_value()));
let resolved_params: Vec<String> = try_join_all(futures).await?;
let mut result = input.to_string();
@ -120,13 +147,16 @@ async fn interpolate_params(input: &str, params: &HashMap<String, Parameter>) ->
async fn test_interpolate() {
use maplit::hashmap;
let result = interpolate_params("foo_$param", &hashmap! {
"param".to_string() => Parameter::Value{value: "bar".to_string()}
}).await;
let result = interpolate_params(
"foo_$param",
&hashmap! {
"param".to_string() => Parameter::Value{value: "bar".to_string()}
},
)
.await;
assert_eq!("foo_bar".to_string(), result.unwrap());
}
async fn run_action(action: &Action, client: &Client) -> Result<(), TriggerError> {
let url = interpolate_params(&action.url, &action.params).await?;
@ -138,4 +168,4 @@ async fn run_action(action: &Action, client: &Client) -> Result<(), TriggerError
req.send().await?;
Ok(())
}
}