mirror of
https://codeberg.org/icewind/prometheus-edge-trigger.git
synced 2026-06-03 18:24:10 +02:00
switch to toml for config and improve error handling
This commit is contained in:
parent
77c1f24df6
commit
fe92c27bc6
7 changed files with 209 additions and 66 deletions
|
|
@ -51,7 +51,7 @@ pub struct Condition {
|
|||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct Action {
|
||||
pub method: String,
|
||||
pub method: Method,
|
||||
pub params: HashMap<String, Parameter>,
|
||||
pub url: String,
|
||||
pub delay: u64,
|
||||
|
|
@ -59,12 +59,27 @@ pub struct Action {
|
|||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct Config {
|
||||
pub prometheus_url: String,
|
||||
pub prometheus: PrometheusConfig,
|
||||
#[serde(rename = "trigger")]
|
||||
pub triggers: Vec<Trigger>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct PrometheusConfig {
|
||||
pub url: String
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct Trigger {
|
||||
pub trigger: Condition,
|
||||
pub name: String,
|
||||
pub condition: Condition,
|
||||
pub action: Action,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "UPPERCASE")]
|
||||
pub enum Method {
|
||||
Get,
|
||||
Put,
|
||||
Post,
|
||||
}
|
||||
|
|
@ -10,17 +10,19 @@ mod trigger;
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), MainError> {
|
||||
env_logger::init();
|
||||
|
||||
if let Some(path) = std::env::args().nth(1) {
|
||||
let mut file = File::open(path).await?;
|
||||
|
||||
let mut contents = vec![];
|
||||
file.read_to_end(&mut contents).await?;
|
||||
let config: Config = serde_json::from_slice(&contents)?;
|
||||
let config: Config = toml::from_slice(&contents)?;
|
||||
let trigger_manager = TriggerManager::new(config);
|
||||
|
||||
Ok(trigger_manager.run_triggers().await?)
|
||||
} else {
|
||||
println!("Usage {} config.json", std::env::args().next().unwrap());
|
||||
println!("Usage {} config.toml", std::env::args().next().unwrap());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
|
@ -1,11 +1,13 @@
|
|||
use main_error::MainError;
|
||||
use std::collections::HashMap;
|
||||
use crate::config::{Parameter, ParameterError, Trigger, Config, Action};
|
||||
use crate::config::{Parameter, ParameterError, Trigger, Config, Action, Method, Condition};
|
||||
use prometheus_edge_detector::EdgeDetector;
|
||||
use futures_util::future::try_join_all;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use reqwest::Client;
|
||||
use tokio::time::delay_for;
|
||||
use log::{info, warn};
|
||||
use err_derive::Error;
|
||||
|
||||
pub struct TriggerManager {
|
||||
http_client: Client,
|
||||
|
|
@ -21,9 +23,19 @@ fn since(time: u64) -> u64 {
|
|||
now().saturating_sub(time)
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum TriggerError {
|
||||
#[error(display = "{}", _0)]
|
||||
Parameter(#[error(source)] ParameterError),
|
||||
#[error(display = "{}", _0)]
|
||||
Edge(#[error(source)] prometheus_edge_detector::Error),
|
||||
#[error(display = "{}", _0)]
|
||||
Network(#[error(source)] reqwest::Error),
|
||||
}
|
||||
|
||||
impl TriggerManager {
|
||||
pub fn new(config: Config) -> TriggerManager {
|
||||
let edge_detector = EdgeDetector::new(&config.prometheus_url);
|
||||
let edge_detector = EdgeDetector::new(&config.prometheus.url);
|
||||
|
||||
TriggerManager {
|
||||
http_client: Client::new(),
|
||||
|
|
@ -40,34 +52,53 @@ impl TriggerManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run_trigger(&self, trigger: &Trigger) -> Result<(), MainError> {
|
||||
async fn run_trigger(&self, trigger: &Trigger) -> Result<(), MainError> {
|
||||
let delay = trigger.action.delay;
|
||||
let delay_duration = Duration::from_secs(delay);
|
||||
let error_delay = Duration::from_secs(15);
|
||||
loop {
|
||||
let query = interpolate_params(&trigger.trigger.query, &trigger.trigger.params).await?;
|
||||
let edge = self.edge_detector.get_last_edge(&query, trigger.trigger.from, trigger.trigger.to, Duration::from_secs(delay + 60)).await?;
|
||||
if let Some(edge) = edge {
|
||||
let elapsed = since(edge);
|
||||
let wait = delay.saturating_sub(elapsed);
|
||||
println!("Found edge, {}s ago, waiting {}s before triggering", elapsed, wait);
|
||||
let wait_delay = Duration::from_secs(wait);
|
||||
delay_for(wait_delay).await;
|
||||
match self.get_edge(&trigger.condition, delay).await {
|
||||
Ok(Some(edge)) => {
|
||||
let elapsed = since(edge);
|
||||
let wait = delay.saturating_sub(elapsed);
|
||||
info!("[{}] Found edge, {}s ago, waiting {}s before triggering", trigger.name, elapsed, wait);
|
||||
let wait_delay = Duration::from_secs(wait);
|
||||
delay_for(wait_delay).await;
|
||||
|
||||
// verify that the previously found edge is still the most recent
|
||||
let new_edge = self.edge_detector.get_last_edge(&query, trigger.trigger.from, trigger.trigger.to, Duration::from_secs(delay + 60)).await?;
|
||||
if new_edge == Some(edge) {
|
||||
println!("Edge still valid, triggering");
|
||||
run_action(&trigger.action, &self.http_client).await?;
|
||||
// verify that the previously found edge is still the most recent
|
||||
match self.get_edge(&trigger.condition, delay).await {
|
||||
Ok(Some(new_edge)) if new_edge == edge => {
|
||||
info!("[{}] Edge still valid, triggering", trigger.name);
|
||||
if let Err(e) = run_action(&trigger.action, &self.http_client).await {
|
||||
warn!("[{}]: {}", trigger.name, e);
|
||||
}
|
||||
delay_for(delay_duration).await;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("[{}]: {}", trigger.name, e);
|
||||
delay_for(error_delay).await;
|
||||
}
|
||||
_ => {
|
||||
info!("[{}] Edge no longer valid", trigger.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
info!("[{}] No edge found, waiting {}s before looking for new edge", trigger.name, delay);
|
||||
delay_for(delay_duration).await;
|
||||
} else {
|
||||
println!("Edge no longer value");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("[{}]: {}", trigger.name, e);
|
||||
delay_for(error_delay).await;
|
||||
}
|
||||
} else {
|
||||
println!("No edge found, waiting {}s before looking for new edge", delay);
|
||||
delay_for(delay_duration).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_edge(&self, condition: &Condition, delay: u64) -> Result<Option<u64>, TriggerError> {
|
||||
let query = interpolate_params(&condition.query, &condition.params).await?;
|
||||
Ok(self.edge_detector.get_last_edge(&query, condition.from, condition.to, Duration::from_secs(delay + 60)).await?)
|
||||
}
|
||||
}
|
||||
|
||||
async fn interpolate_params(input: &str, params: &HashMap<String, Parameter>) -> Result<String, ParameterError> {
|
||||
|
|
@ -95,14 +126,14 @@ async fn test_interpolate() {
|
|||
assert_eq!("foo_bar".to_string(), result.unwrap());
|
||||
}
|
||||
|
||||
async fn run_action(action: &Action, client: &Client) -> Result<(), MainError> {
|
||||
|
||||
async fn run_action(action: &Action, client: &Client) -> Result<(), TriggerError> {
|
||||
let url = interpolate_params(&action.url, &action.params).await?;
|
||||
|
||||
let req = match action.method.to_ascii_lowercase().as_str() {
|
||||
"put" => client.put(&url),
|
||||
"post" => client.post(&url),
|
||||
"get" => client.get(&url),
|
||||
_ => unimplemented!()
|
||||
let req = match action.method {
|
||||
Method::Put => client.put(&url),
|
||||
Method::Post => client.post(&url),
|
||||
Method::Get => client.get(&url),
|
||||
};
|
||||
req.send().await?;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue