use async sleeps to handle delays

This commit is contained in:
Robin Appelman 2020-01-25 13:34:56 +01:00
commit 77c1f24df6
4 changed files with 52 additions and 34 deletions

View file

@ -7,7 +7,7 @@
"instance": { "instance": {
"type": "mdns", "type": "mdns",
"service": "_switch-http._tcp.local", "service": "_switch-http._tcp.local",
"name": "lighthouse1" "host": "lighthouse1"
} }
}, },
"query": "switch_state{instance=\"$instance\"}", "query": "switch_state{instance=\"$instance\"}",

View file

@ -17,7 +17,7 @@ pub enum ParameterError {
pub enum Parameter { pub enum Parameter {
Mdns { Mdns {
service: String, service: String,
name: String, host: String,
}, },
Value { Value {
value: String, value: String,
@ -29,9 +29,9 @@ impl Parameter {
match self { match self {
Parameter::Mdns { Parameter::Mdns {
service, service,
name host
} => { } => {
match resolve_mdns(service, name).await? { match resolve_mdns(service, host).await? {
Some(service) => Ok(format!("{}:{}", service.addr, service.port)), Some(service) => Ok(format!("{}:{}", service.addr, service.port)),
None => Err(ParameterError::MdnsHostNotFound) None => Err(ParameterError::MdnsHostNotFound)
} }
@ -66,5 +66,5 @@ pub struct Config {
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct Trigger { pub struct Trigger {
pub trigger: Condition, pub trigger: Condition,
pub action: Action pub action: Action,
} }

View file

@ -2,8 +2,6 @@ use main_error::MainError;
use tokio::fs::File; use tokio::fs::File;
use crate::config::Config; use crate::config::Config;
use crate::trigger::TriggerManager; use crate::trigger::TriggerManager;
use tokio::time::delay_for;
use std::time::Duration;
use tokio::prelude::*; use tokio::prelude::*;
mod config; mod config;
@ -12,23 +10,17 @@ mod trigger;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), MainError> { async fn main() -> Result<(), MainError> {
let args: Vec<String> = std::env::args().collect(); if let Some(path) = std::env::args().nth(1) {
let mut file = File::open(path).await?;
if args.len() < 2 { let mut contents = vec![];
println!("Usage {} config.json", args[0]); file.read_to_end(&mut contents).await?;
let config: Config = serde_json::from_slice(&contents)?;
let trigger_manager = TriggerManager::new(config);
Ok(trigger_manager.run_triggers().await?)
} else {
println!("Usage {} config.json", std::env::args().next().unwrap());
return Ok(()); return Ok(());
} }
let mut file = File::open("foo.txt").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let config: Config = serde_json::from_slice(&contents)?;
let trigger_manager = TriggerManager::new(config);
loop {
trigger_manager.poll_triggers().await?;
delay_for(Duration::from_secs(60)).await;
}
} }

View file

@ -5,7 +5,7 @@ use prometheus_edge_detector::EdgeDetector;
use futures_util::future::try_join_all; use futures_util::future::try_join_all;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use reqwest::Client; use reqwest::Client;
use tokio::time::delay_for;
pub struct TriggerManager { pub struct TriggerManager {
http_client: Client, http_client: Client,
@ -13,6 +13,14 @@ pub struct TriggerManager {
triggers: Vec<Trigger>, triggers: Vec<Trigger>,
} }
fn now() -> u64 {
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
}
fn since(time: u64) -> u64 {
now().saturating_sub(time)
}
impl TriggerManager { impl TriggerManager {
pub fn new(config: Config) -> TriggerManager { pub fn new(config: Config) -> TriggerManager {
let edge_detector = EdgeDetector::new(&config.prometheus_url); let edge_detector = EdgeDetector::new(&config.prometheus_url);
@ -24,24 +32,42 @@ impl TriggerManager {
} }
} }
pub async fn poll_triggers(&self) -> Result<(), MainError> { pub async fn run_triggers(&self) -> Result<(), MainError> {
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); try_join_all(self.triggers.iter().map(|trigger| {
self.run_trigger(trigger)
for trigger in &self.triggers { })).await?;
let delay = trigger.action.delay;
let query = interpolate_params(&trigger.trigger.query, &trigger.trigger.params).await?;
let edge = self.edge_detector.get_last_edge(&query, trigger.trigger.from, trigger.trigger.to, Duration::from_secs(delay * 2)).await?;
if let Some(edge) = edge {
let edge_from_now = now - edge;
if edge_from_now > delay && (edge_from_now - delay) < 60 {
run_action(&trigger.action, &self.http_client).await?;
}
}
}
Ok(()) Ok(())
} }
pub async fn run_trigger(&self, trigger: &Trigger) -> Result<(), MainError> {
let delay = trigger.action.delay;
let delay_duration = Duration::from_secs(delay);
loop {
let query = interpolate_params(&trigger.trigger.query, &trigger.trigger.params).await?;
let edge = self.edge_detector.get_last_edge(&query, trigger.trigger.from, trigger.trigger.to, Duration::from_secs(delay + 60)).await?;
if let Some(edge) = edge {
let elapsed = since(edge);
let wait = delay.saturating_sub(elapsed);
println!("Found edge, {}s ago, waiting {}s before triggering", elapsed, wait);
let wait_delay = Duration::from_secs(wait);
delay_for(wait_delay).await;
// verify that the previously found edge is still the most recent
let new_edge = self.edge_detector.get_last_edge(&query, trigger.trigger.from, trigger.trigger.to, Duration::from_secs(delay + 60)).await?;
if new_edge == Some(edge) {
println!("Edge still valid, triggering");
run_action(&trigger.action, &self.http_client).await?;
delay_for(delay_duration).await;
} else {
println!("Edge no longer value");
}
} else {
println!("No edge found, waiting {}s before looking for new edge", delay);
delay_for(delay_duration).await;
}
}
}
} }
async fn interpolate_params(input: &str, params: &HashMap<String, Parameter>) -> Result<String, ParameterError> { async fn interpolate_params(input: &str, params: &HashMap<String, Parameter>) -> Result<String, ParameterError> {