mirror of
https://codeberg.org/icewind/prometheus-mdns-rs.git
synced 2026-06-03 09:54:21 +02:00
Handle multiple targets per host
This commit is contained in:
parent
0d080ca339
commit
06762cd4fd
1 changed files with 100 additions and 33 deletions
123
src/main.rs
123
src/main.rs
|
|
@ -1,6 +1,6 @@
|
|||
use atomicwrites::{AllowOverwrite, AtomicFile};
|
||||
use futures_util::{pin_mut, stream::StreamExt};
|
||||
use mdns::Response;
|
||||
use mdns::{RecordKind, Response};
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
|
|
@ -8,6 +8,7 @@ use std::io::Write;
|
|||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use mdns::Record;
|
||||
use log::{debug, error, log_enabled, info, Level};
|
||||
|
||||
|
||||
|
|
@ -38,6 +39,48 @@ impl<'a> From<&'a Service> for PrometheusService<'a> {
|
|||
const TIMEOUT: Duration = Duration::from_secs(360);
|
||||
const INTERVAL: Duration = Duration::from_secs(15);
|
||||
|
||||
/// Extract labels from TXT records and add hostname
|
||||
fn build_labels(response: &Response, hostname: &str, name: &String) -> HashMap<String, String> {
|
||||
let mut labels: HashMap<String, String> = response.records()
|
||||
.filter_map(|record| {
|
||||
debug!("Record Name: {} | Name: {} | Record: {:?}", record.name, name, record);
|
||||
if record.name.eq(name) {
|
||||
match record.kind {
|
||||
RecordKind::TXT(ref txt) => Some(txt),
|
||||
_ => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.flat_map(|txt| txt.iter())
|
||||
.map(|txt| txt.as_str())
|
||||
.flat_map(|pair| {
|
||||
debug!("Labels pair: {:?}", pair);
|
||||
let mut parts = pair.split('=');
|
||||
if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
|
||||
Some((key.to_string(), value.to_string()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect();
|
||||
|
||||
if !labels.is_empty() {
|
||||
labels.insert("hostname".to_string(), hostname.to_string());
|
||||
}
|
||||
labels
|
||||
}
|
||||
|
||||
fn group_records_by_name<'a>(records: impl Iterator<Item = &'a Record>) -> HashMap<String, Vec<&'a Record>> {
|
||||
let mut grouped: HashMap<String, Vec<&Record>> = HashMap::new();
|
||||
for record in records {
|
||||
grouped.entry(record.name.to_string())
|
||||
.or_default()
|
||||
.push(record);
|
||||
}
|
||||
grouped
|
||||
}
|
||||
|
||||
#[async_std::main]
|
||||
async fn main() -> Result<(), main_error::MainError> {
|
||||
env_logger::init();
|
||||
|
|
@ -52,34 +95,37 @@ async fn main() -> Result<(), main_error::MainError> {
|
|||
|
||||
while let Some(Ok(response)) = stream.next().await {
|
||||
let response: Response = response;
|
||||
log::trace!("Received Response: {:?}", response);
|
||||
log::trace!("Received Response: {:#?}", response);
|
||||
let addr = response.socket_address();
|
||||
let mut labels: HashMap<String, String> = response
|
||||
.txt_records()
|
||||
.flat_map(|pair| {
|
||||
let mut parts = pair.split('=');
|
||||
if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
|
||||
Some((key.to_string(), value.to_string()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let hostname = response
|
||||
.hostname()
|
||||
.and_then(|host| host.split('.').next().map(|s| s.to_string()));
|
||||
|
||||
if let (Some(addr), Some(hostname)) = (addr, hostname) {
|
||||
labels.insert("hostname".to_string(), hostname.to_string());
|
||||
if let (Some(addr), Some(hostname_str)) = (addr, &hostname) {
|
||||
let start_count = services.len();
|
||||
let grouped = group_records_by_name(response.records());
|
||||
for (name, records) in grouped {
|
||||
debug!("Records for name: {}, with ServiceAddr: {}", name, addr);
|
||||
let labels = build_labels(&response, hostname_str, &name);
|
||||
let port = get_port(&response, name);
|
||||
|
||||
let service_addr = if let Some(port) = port {
|
||||
SocketAddr::new(addr.ip(), port)
|
||||
} else {
|
||||
addr
|
||||
};
|
||||
|
||||
if !labels.is_empty() {
|
||||
let service = Service {
|
||||
labels,
|
||||
addr,
|
||||
addr: service_addr,
|
||||
last_seen: Instant::now(),
|
||||
};
|
||||
|
||||
let start_count = services.len();
|
||||
log::trace!("Adding service: {}", service.addr);
|
||||
log::trace!("Adding {} with labels: {:#?}", service.addr, service.labels);
|
||||
services.insert(service.addr, service);
|
||||
}
|
||||
}
|
||||
|
||||
let added_count = services.len();
|
||||
|
||||
|
|
@ -89,19 +135,40 @@ async fn main() -> Result<(), main_error::MainError> {
|
|||
let removed_count = services.len();
|
||||
|
||||
if start_count != added_count || added_count != removed_count {
|
||||
let output_services: Vec<PrometheusService> =
|
||||
services.values().map(|service| service.into()).collect();
|
||||
let output = serde_json::to_string(&output_services).unwrap();
|
||||
|
||||
match &out {
|
||||
Some(path) => {
|
||||
let _ = path.write(|f| f.write_all(output.as_bytes()));
|
||||
}
|
||||
None => println!("{}", output),
|
||||
}
|
||||
write_to_file(&out, &mut services);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_port(response: &Response, name: String) -> Option<u16> {
|
||||
response.records()
|
||||
.find_map(|record| {
|
||||
if record.name == name {
|
||||
if let RecordKind::SRV { port, .. } = record.kind {
|
||||
Some(port)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn write_to_file(out: &Option<AtomicFile>, services: &mut HashMap<SocketAddr, Service>) {
|
||||
let output_services: Vec<PrometheusService> =
|
||||
services.values().map(|service| service.into()).collect();
|
||||
let output = serde_json::to_string(&output_services).unwrap();
|
||||
debug!("===== Writing file");
|
||||
|
||||
match &out {
|
||||
Some(path) => {
|
||||
// debug!("Writing output to file: {}", );
|
||||
let _ = path.write(|f| f.write_all(output.as_bytes()));
|
||||
}
|
||||
None => println!("{}", output),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue