1
0
Fork 0
mirror of https://codeberg.org/icewind/prometheus-mdns-rs.git synced 2026-06-03 09:54:21 +02:00

async/await

This commit is contained in:
Robin Appelman 2019-12-09 00:34:20 +01:00
commit 31062ff230
3 changed files with 153 additions and 552 deletions

View file

@ -1,14 +1,10 @@
use atomicwrites::{AllowOverwrite, AtomicFile};
use futures::{Future, Stream};
use futures_util::{pin_mut, stream::StreamExt};
use mdns::{Record, RecordKind};
use serde::Serialize;
use std::collections::HashMap;
use std::env;
use std::io::Write;
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use std::thread;
use std::thread::sleep;
use std::time::Instant;
use std::{net::IpAddr, time::Duration};
@ -40,77 +36,57 @@ impl<'a> From<&'a Service> for PrometheusService<'a> {
const TIMEOUT: Duration = Duration::from_secs(60);
const INTERVAL: Duration = Duration::from_secs(15);
fn main() {
#[tokio::main]
async fn main() -> Result<(), main_error::MainError> {
let out = env::args()
.skip(1)
.next()
.map(|path| AtomicFile::new(path, AllowOverwrite));
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
discover(tx);
});
let stream = mdns::discover::all(SERVICE_NAME, INTERVAL)?.listen();
pin_mut!(stream);
let mut services: HashMap<IpAddr, Service> = HashMap::new();
loop {
let start_count = services.len();
while let Some(Ok(response)) = stream.next().await {
let addr = response.records().filter_map(self::to_ip_addr).next();
let port = response.records().filter_map(self::to_port).next();
let labels = response.records().filter_map(self::to_labels).next();
while let Ok(service) = rx.try_recv() {
if let (Some(addr), Some(labels), Some(port)) = (addr, labels, port) {
let service = Service {
labels,
addr,
port,
last_seen: Instant::now(),
};
let start_count = services.len();
services.insert(service.addr, service);
}
let added_count = services.len();
let added_count = services.len();
services.retain(|_, service| Instant::now().duration_since(service.last_seen) < TIMEOUT);
services
.retain(|_, service| Instant::now().duration_since(service.last_seen) < TIMEOUT);
let removed_count = services.len();
let removed_count = services.len();
if start_count != added_count || added_count != removed_count {
let output_services: Vec<PrometheusService> =
services.iter().map(|(_, service)| service.into()).collect();
let output = serde_json::to_string(&output_services).unwrap();
if start_count != added_count || added_count != removed_count {
let output_services: Vec<PrometheusService> =
services.iter().map(|(_, service)| service.into()).collect();
let output = serde_json::to_string(&output_services).unwrap();
match &out {
Some(path) => {
let _ = path.write(|f| f.write_all(output.as_bytes()));
match &out {
Some(path) => {
let _ = path.write(|f| f.write_all(output.as_bytes()));
}
None => println!("{}", output),
}
None => println!("{}", output),
}
}
sleep(INTERVAL);
}
}
fn discover(tx: Sender<Service>) {
tokio::run(
mdns::discover::all(SERVICE_NAME, INTERVAL)
.unwrap()
.for_each(move |response| {
if response
.records()
.any(|record| record.name.as_str() == SERVICE_NAME)
{
let addr = response.records().filter_map(self::to_ip_addr).next();
let port = response.records().filter_map(self::to_port).next();
let labels = response.records().filter_map(self::to_labels).next();
if let (Some(addr), Some(labels), Some(port)) = (addr, labels, port) {
let _ = tx.send(Service {
labels,
addr,
port,
last_seen: Instant::now(),
});
}
}
Ok(())
})
.map_err(|e| eprintln!("{:?}", e)),
);
Ok(())
}
fn to_ip_addr(record: &Record) -> Option<IpAddr> {