mirror of
https://codeberg.org/icewind/prometheus-edge-trigger.git
synced 2026-06-03 10:14:12 +02:00
initial implementation
This commit is contained in:
parent
3ee69f7557
commit
6ff31cc389
7 changed files with 1585 additions and 5 deletions
1267
Cargo.lock
generated
Normal file
1267
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
14
Cargo.toml
14
Cargo.toml
|
|
@ -4,6 +4,16 @@ version = "0.1.0"
|
||||||
authors = ["Robin Appelman <robin@icewind.nl>"]
|
authors = ["Robin Appelman <robin@icewind.nl>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
prometheus-edge-detector = { version = "0.1", path = "../prometheus-edge-detector" }
|
||||||
|
mdns = { version = "0.3", git = "https://github.com/icewind1991/mdns", branch = "async_await" }
|
||||||
|
tokio = { version = "0.2.4", features = ["macros", "time", "fs"] }
|
||||||
|
main_error = "0.1.0"
|
||||||
|
futures-util = "0.3.1"
|
||||||
|
reqwest = "0.10.0"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0.44"
|
||||||
|
err-derive = "0.2.1"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
maplit = "1.0.2"
|
||||||
31
config.json
Normal file
31
config.json
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
{
|
||||||
|
"prometheus_url": "http://astoria:9291",
|
||||||
|
"triggers": [
|
||||||
|
{
|
||||||
|
"trigger": {
|
||||||
|
"params": {
|
||||||
|
"instance": {
|
||||||
|
"type": "mdns",
|
||||||
|
"service": "_switch-http._tcp.local",
|
||||||
|
"name": "lighthouse1"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"query": "switch_state{instance=\"$instance\"}",
|
||||||
|
"from": 1,
|
||||||
|
"to": 0
|
||||||
|
},
|
||||||
|
"action": {
|
||||||
|
"method": "PUT",
|
||||||
|
"params": {
|
||||||
|
"host": {
|
||||||
|
"type": "mdns",
|
||||||
|
"service": "_switch-http._tcp.local",
|
||||||
|
"host": "vr_switch"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"url": "http://$host/off",
|
||||||
|
"delay": 100
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
70
src/config.rs
Normal file
70
src/config.rs
Normal file
|
|
@ -0,0 +1,70 @@
|
||||||
|
use serde::Deserialize;
|
||||||
|
use err_derive::Error;
|
||||||
|
use crate::mdns::resolve_mdns;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum ParameterError {
|
||||||
|
#[error(display = "error while resolving mdns: {}", _0)]
|
||||||
|
MdnsError(#[error(source)] mdns::Error),
|
||||||
|
#[error(display = "requested mdns host not found")]
|
||||||
|
MdnsHostNotFound,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(tag = "type")]
|
||||||
|
#[serde(rename_all = "lowercase")]
|
||||||
|
pub enum Parameter {
|
||||||
|
Mdns {
|
||||||
|
service: String,
|
||||||
|
name: String,
|
||||||
|
},
|
||||||
|
Value {
|
||||||
|
value: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Parameter {
|
||||||
|
pub async fn get_value(&self) -> Result<String, ParameterError> {
|
||||||
|
match self {
|
||||||
|
Parameter::Mdns {
|
||||||
|
service,
|
||||||
|
name
|
||||||
|
} => {
|
||||||
|
match resolve_mdns(service, name).await? {
|
||||||
|
Some(service) => Ok(format!("{}:{}", service.addr, service.port)),
|
||||||
|
None => Err(ParameterError::MdnsHostNotFound)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Parameter::Value { value } => Ok(value.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub struct Condition {
|
||||||
|
pub params: HashMap<String, Parameter>,
|
||||||
|
pub query: String,
|
||||||
|
pub from: u64,
|
||||||
|
pub to: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub struct Action {
|
||||||
|
pub method: String,
|
||||||
|
pub params: HashMap<String, Parameter>,
|
||||||
|
pub url: String,
|
||||||
|
pub delay: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub struct Config {
|
||||||
|
pub prometheus_url: String,
|
||||||
|
pub triggers: Vec<Trigger>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub struct Trigger {
|
||||||
|
pub trigger: Condition,
|
||||||
|
pub action: Action
|
||||||
|
}
|
||||||
35
src/main.rs
35
src/main.rs
|
|
@ -1,3 +1,34 @@
|
||||||
fn main() {
|
use main_error::MainError;
|
||||||
println!("Hello, world!");
|
use tokio::fs::File;
|
||||||
|
use crate::config::Config;
|
||||||
|
use crate::trigger::TriggerManager;
|
||||||
|
use tokio::time::delay_for;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::prelude::*;
|
||||||
|
|
||||||
|
mod config;
|
||||||
|
mod mdns;
|
||||||
|
mod trigger;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), MainError> {
|
||||||
|
let args: Vec<String> = std::env::args().collect();
|
||||||
|
|
||||||
|
if args.len() < 2 {
|
||||||
|
println!("Usage {} config.json", args[0]);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut file = File::open("foo.txt").await?;
|
||||||
|
|
||||||
|
let mut contents = vec![];
|
||||||
|
file.read_to_end(&mut contents).await?;
|
||||||
|
let config: Config = serde_json::from_slice(&contents)?;
|
||||||
|
let trigger_manager = TriggerManager::new(config);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
trigger_manager.poll_triggers().await?;
|
||||||
|
|
||||||
|
delay_for(Duration::from_secs(60)).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
87
src/mdns.rs
Normal file
87
src/mdns.rs
Normal file
|
|
@ -0,0 +1,87 @@
|
||||||
|
use tokio::time::timeout;
|
||||||
|
use futures_util::{pin_mut, stream::StreamExt};
|
||||||
|
use mdns::{Record, RecordKind};
|
||||||
|
use std::{net::IpAddr, time::Duration};
|
||||||
|
|
||||||
|
const INTERVAL: Duration = Duration::from_secs(3);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MdnsService {
|
||||||
|
pub id: String,
|
||||||
|
pub name: String,
|
||||||
|
pub addr: IpAddr,
|
||||||
|
pub port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn resolve_mdns(service: &str, search_name: &str) -> Result<Option<MdnsService>, mdns::Error> {
|
||||||
|
let stream = mdns::discover::all(service, INTERVAL)?.listen();
|
||||||
|
pin_mut!(stream);
|
||||||
|
|
||||||
|
let process = async move {
|
||||||
|
while let Some(Ok(response)) = stream.next().await {
|
||||||
|
let id = response.records().find_map(to_id);
|
||||||
|
let addr = response.records().find_map(to_ip_addr);
|
||||||
|
let port = response.records().find_map(to_port);
|
||||||
|
let name = response.records().find_map(to_name);
|
||||||
|
|
||||||
|
if let (Some(id), Some(addr), Some(name), Some(port)) = (id, addr, name, port) {
|
||||||
|
let service = MdnsService {
|
||||||
|
id,
|
||||||
|
name,
|
||||||
|
addr,
|
||||||
|
port,
|
||||||
|
};
|
||||||
|
|
||||||
|
if service.id == search_name {
|
||||||
|
return Some(service);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
match timeout(Duration::from_secs(5), process).await {
|
||||||
|
Err(_) => Ok(None),
|
||||||
|
Ok(res) => Ok(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_id(record: &Record) -> Option<String> {
|
||||||
|
match &record.kind {
|
||||||
|
RecordKind::PTR(id) => {
|
||||||
|
id.split('.').next().map(|s| s.to_string())
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_ip_addr(record: &Record) -> Option<IpAddr> {
|
||||||
|
match record.kind {
|
||||||
|
RecordKind::A(addr) => Some(addr.into()),
|
||||||
|
RecordKind::AAAA(addr) => Some(addr.into()),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_port(record: &Record) -> Option<u16> {
|
||||||
|
match record.kind {
|
||||||
|
RecordKind::SRV { port, .. } => Some(port),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_name(record: &Record) -> Option<String> {
|
||||||
|
if let RecordKind::TXT(txt) = &record.kind {
|
||||||
|
txt.iter()
|
||||||
|
.find_map(|pair| {
|
||||||
|
let mut parts = pair.split('=');
|
||||||
|
if let (Some("name"), Some(value)) = (parts.next(), parts.next()) {
|
||||||
|
Some(value.to_string())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
84
src/trigger.rs
Normal file
84
src/trigger.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
||||||
|
use main_error::MainError;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use crate::config::{Parameter, ParameterError, Trigger, Config, Action};
|
||||||
|
use prometheus_edge_detector::EdgeDetector;
|
||||||
|
use futures_util::future::try_join_all;
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
use reqwest::Client;
|
||||||
|
|
||||||
|
|
||||||
|
pub struct TriggerManager {
|
||||||
|
http_client: Client,
|
||||||
|
edge_detector: EdgeDetector,
|
||||||
|
triggers: Vec<Trigger>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TriggerManager {
|
||||||
|
pub fn new(config: Config) -> TriggerManager {
|
||||||
|
let edge_detector = EdgeDetector::new(&config.prometheus_url);
|
||||||
|
|
||||||
|
TriggerManager {
|
||||||
|
http_client: Client::new(),
|
||||||
|
edge_detector,
|
||||||
|
triggers: config.triggers,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn poll_triggers(&self) -> Result<(), MainError> {
|
||||||
|
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||||
|
|
||||||
|
for trigger in &self.triggers {
|
||||||
|
let delay = trigger.action.delay;
|
||||||
|
let query = interpolate_params(&trigger.trigger.query, &trigger.trigger.params).await?;
|
||||||
|
let edge = self.edge_detector.get_last_edge(&query, trigger.trigger.from, trigger.trigger.to, Duration::from_secs(delay * 2)).await?;
|
||||||
|
|
||||||
|
if let Some(edge) = edge {
|
||||||
|
let edge_from_now = now - edge;
|
||||||
|
if edge_from_now > delay && (edge_from_now - delay) < 60 {
|
||||||
|
run_action(&trigger.action, &self.http_client).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn interpolate_params(input: &str, params: &HashMap<String, Parameter>) -> Result<String, ParameterError> {
|
||||||
|
let futures = params.values().map(|definition| {
|
||||||
|
Box::pin(definition.get_value())
|
||||||
|
});
|
||||||
|
|
||||||
|
let resolved_params: Vec<String> = try_join_all(futures).await?;
|
||||||
|
let mut result = input.to_string();
|
||||||
|
|
||||||
|
for (name, value) in params.keys().zip(resolved_params.into_iter()) {
|
||||||
|
result = result.replace(&format!("${}", name), &value);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_interpolate() {
|
||||||
|
use maplit::hashmap;
|
||||||
|
|
||||||
|
let result = interpolate_params("foo_$param", &hashmap! {
|
||||||
|
"param".to_string() => Parameter::Value{value: "bar".to_string()}
|
||||||
|
}).await;
|
||||||
|
assert_eq!("foo_bar".to_string(), result.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_action(action: &Action, client: &Client) -> Result<(), MainError> {
|
||||||
|
let url = interpolate_params(&action.url, &action.params).await?;
|
||||||
|
|
||||||
|
let req = match action.method.to_ascii_lowercase().as_str() {
|
||||||
|
"put" => client.put(&url),
|
||||||
|
"post" => client.post(&url),
|
||||||
|
"get" => client.get(&url),
|
||||||
|
_ => unimplemented!()
|
||||||
|
};
|
||||||
|
req.send().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue