extract to lib:

This commit is contained in:
Robin Appelman 2021-03-27 21:23:47 +01:00
commit 0b77b8d6a3
6 changed files with 262 additions and 264 deletions

7
Cargo.lock generated
View file

@ -752,6 +752,12 @@ dependencies = [
"want",
]
[[package]]
name = "iai"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71a816c97c42258aa5834d07590b718b4c9a598944cd39a52dc25b351185d678"
[[package]]
name = "idna"
version = "0.2.2"
@ -1034,6 +1040,7 @@ dependencies = [
"futures-lite",
"futures-util",
"heim",
"iai",
"once_cell",
"parse-display",
"regex",

View file

@ -15,4 +15,7 @@ futures-util = "0.3"
futures-lite = "1"
parse-display = "0.4"
regex = { version = "1", default-features = false }
once_cell = "1"
once_cell = "1"
[dev-dependencies]
iai = "0.1"

View file

@ -38,102 +38,96 @@ pub struct DiskUsage {
pub free: u64,
}
#[derive(Default)]
pub struct Heim {}
pub async fn temperatures() -> Result<HashMap<TemperatureLabel, f32>> {
// ugly workaround problems between async-fs and tokio
let results = tokio::task::spawn_blocking(|| {
futures_lite::future::block_on(
heim::sensors::temperatures().collect::<Vec<Result<TemperatureSensor, heim::Error>>>(),
)
})
.await
.wrap_err("Failed to resolve future")?
.into_iter()
.filter_map(|result| result.ok())
.filter_map(|sensor| match (sensor.unit(), sensor.label()) {
("k10temp", Some("Tdie")) => Some((
TemperatureLabel::CPU,
sensor
.current()
.get::<thermodynamic_temperature::degree_celsius>(),
)),
_ => None,
});
Ok(results.collect())
}
impl Heim {
pub async fn temperatures(&self) -> Result<HashMap<TemperatureLabel, f32>> {
// ugly workaround problems between async-fs and tokio
let results = tokio::task::spawn_blocking(|| {
futures_lite::future::block_on(
heim::sensors::temperatures()
.collect::<Vec<Result<TemperatureSensor, heim::Error>>>(),
pub async fn memory() -> Result<Memory> {
let memory = heim::memory::memory().await?;
Ok(Memory {
total: memory.total().get::<information::byte>(),
free: memory.free().get::<information::byte>(),
available: memory.available().get::<information::byte>(),
})
}
pub async fn cpu_time() -> Result<f64> {
let time = time().await?;
Ok(time.user().get::<time::second>() + time.system().get::<time::second>())
}
pub async fn network_stats() -> Result<impl Stream<Item = IOStats>> {
let networks = heim::net::io_counters().await?;
Ok(networks
.filter_map(|network| future::ready(network.ok()))
.filter(|network| future::ready(network.interface().starts_with("enp")))
.map(|network| IOStats {
interface: network.interface().into(),
bytes_sent: network.bytes_sent().get::<information::byte>(),
bytes_received: network.bytes_recv().get::<information::byte>(),
}))
}
pub async fn hostname() -> Result<String> {
Ok(heim::host::platform().await?.hostname().to_string())
}
pub async fn disk_stats() -> Result<impl Stream<Item = IOStats>> {
static DISK_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r"^([sv]d[a-z]+|nvme\dn\d)$").unwrap());
let disks = heim::disk::io_counters().await?;
Ok(disks
.filter_map(|disk| future::ready(disk.ok()))
.filter_map(|disk| {
future::ready(
disk.device_name()
.to_str()
.map(str::to_string)
.map(|name| (disk, name)),
)
})
.await
.wrap_err("Failed to resolve future")?
.into_iter()
.filter_map(|result| result.ok())
.filter_map(|sensor| match (sensor.unit(), sensor.label()) {
("k10temp", Some("Tdie")) => Some((
TemperatureLabel::CPU,
sensor
.current()
.get::<thermodynamic_temperature::degree_celsius>(),
)),
_ => None,
});
Ok(results.collect())
}
.filter(|(_disk, name)| future::ready(DISK_REGEX.is_match(&name)))
.map(|(disk, name)| IOStats {
interface: name,
bytes_sent: disk.write_bytes().get::<information::byte>(),
bytes_received: disk.read_bytes().get::<information::byte>(),
}))
}
pub async fn memory(&self) -> Result<Memory> {
let memory = heim::memory::memory().await?;
Ok(Memory {
total: memory.total().get::<information::byte>(),
free: memory.free().get::<information::byte>(),
available: memory.available().get::<information::byte>(),
pub async fn disk_usage() -> Result<impl Stream<Item = DiskUsage>> {
Ok(heim::disk::partitions_physical()
.await?
.filter_map(|result| future::ready(result.ok()))
.filter(|partition: &Partition| {
future::ready(!partition.file_system().eq(&FileSystem::Zfs))
})
}
pub async fn cpu_time(&self) -> Result<f64> {
let time = time().await?;
Ok(time.user().get::<time::second>() + time.system().get::<time::second>())
}
pub async fn network_stats(&self) -> Result<impl Stream<Item = IOStats>> {
let networks = heim::net::io_counters().await?;
Ok(networks
.filter_map(|network| future::ready(network.ok()))
.filter(|network| future::ready(network.interface().starts_with("enp")))
.map(|network| IOStats {
interface: network.interface().into(),
bytes_sent: network.bytes_sent().get::<information::byte>(),
bytes_received: network.bytes_recv().get::<information::byte>(),
}))
}
pub async fn hostname(&self) -> Result<String> {
Ok(heim::host::platform().await?.hostname().to_string())
}
pub async fn disk_stats(&self) -> Result<impl Stream<Item = IOStats>> {
static DISK_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r"^([sv]d[a-z]+|nvme\dn\d)$").unwrap());
let disks = heim::disk::io_counters().await?;
Ok(disks
.filter_map(|disk| future::ready(disk.ok()))
.filter_map(|disk| {
future::ready(
disk.device_name()
.to_str()
.map(str::to_string)
.map(|name| (disk, name)),
)
})
.filter(|(_disk, name)| future::ready(DISK_REGEX.is_match(&name)))
.map(|(disk, name)| IOStats {
interface: name,
bytes_sent: disk.write_bytes().get::<information::byte>(),
bytes_received: disk.read_bytes().get::<information::byte>(),
}))
}
pub async fn disk_usage(&self) -> Result<impl Stream<Item = DiskUsage>> {
Ok(heim::disk::partitions_physical()
.await?
.filter_map(|result| future::ready(result.ok()))
.filter(|partition: &Partition| {
future::ready(!partition.file_system().eq(&FileSystem::Zfs))
})
.filter_map(|partition: Partition| async move {
let name = partition.mount_point().to_string_lossy().to_string();
partition.usage().await.ok().map(|usage| (name, usage))
})
.filter(|(mount_point, _usage)| future::ready(!mount_point.contains("/snap/")))
.map(|(mount_point, usage)| DiskUsage {
name: mount_point,
size: usage.total().get::<information::byte>(),
free: usage.free().get::<information::byte>(),
}))
}
.filter_map(|partition: Partition| async move {
let name = partition.mount_point().to_string_lossy().to_string();
partition.usage().await.ok().map(|usage| (name, usage))
})
.filter(|(mount_point, _usage)| future::ready(!mount_point.contains("/snap/")))
.map(|(mount_point, usage)| DiskUsage {
name: mount_point,
size: usage.total().get::<information::byte>(),
free: usage.free().get::<information::byte>(),
}))
}

133
src/lib.rs Normal file
View file

@ -0,0 +1,133 @@
pub mod heim;
mod zfs;
use crate::heim::*;
use crate::zfs::pools;
use color_eyre::Result;
use futures_util::stream::StreamExt;
use futures_util::{pin_mut, try_join};
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
pub async fn get_metrics() -> Result<String> {
let (hostname, pools, cpu, memory, network, temperatures, disks, disk_usage): (
String,
Vec<DiskUsage>,
f64,
Memory,
_,
HashMap<TemperatureLabel, f32>,
_,
_,
) = try_join! {
hostname(),
pools(),
cpu_time(),
memory(),
network_stats(),
temperatures(),
disk_stats(),
disk_usage(),
}?;
pin_mut!(network);
pin_mut!(disks);
pin_mut!(disk_usage);
let mut result = String::with_capacity(256);
writeln!(&mut result, "cpu_time{{host=\"{}\"}} {:.1}", hostname, cpu).ok();
writeln!(
&mut result,
"memory_total{{host=\"{}\"}} {}",
hostname, memory.total
)
.ok();
writeln!(
&mut result,
"memory_available{{host=\"{}\"}} {}",
hostname, memory.available
)
.ok();
writeln!(
&mut result,
"memory_free{{host=\"{}\"}} {}",
hostname, memory.free
)
.ok();
for pool in pools {
writeln!(
&mut result,
"zfs_pool_size{{host=\"{}\", pool=\"{}\"}} {}",
hostname, pool.name, pool.size
)
.ok();
writeln!(
&mut result,
"zfs_pool_free{{host=\"{}\", pool=\"{}\"}} {}",
hostname, pool.name, pool.free
)
.ok();
}
while let Some(network) = network.next().await {
let network: IOStats = network;
if network.bytes_received > 0 || network.bytes_sent > 0 {
writeln!(
&mut result,
"net_sent{{host=\"{}\", network=\"{}\"}} {}",
hostname, network.interface, network.bytes_sent
)
.ok();
writeln!(
&mut result,
"net_received{{host=\"{}\", network=\"{}\"}} {}",
hostname, network.interface, network.bytes_received
)
.ok();
}
}
while let Some(disk) = disks.next().await {
let disk: IOStats = disk;
if disk.bytes_received > 0 && disk.bytes_sent > 0 {
writeln!(
&mut result,
"disk_sent{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.interface, disk.bytes_sent
)
.ok();
writeln!(
&mut result,
"disk_received{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.interface, disk.bytes_received
)
.ok();
}
}
let mut found_sizes = HashSet::new();
while let Some(disk) = disk_usage.next().await {
let disk: DiskUsage = disk;
if disk.size > 0 {
if found_sizes.insert((disk.size, disk.free)) {
writeln!(
&mut result,
"disk_size{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.name, disk.size
)
.ok();
writeln!(
&mut result,
"disk_free{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.name, disk.free
)
.ok();
}
}
}
for (label, temp) in temperatures {
writeln!(
&mut result,
"temperature{{host=\"{}\", sensor=\"{}\"}} {:.1}",
hostname, label, temp
)
.ok();
}
Ok(result)
}

View file

@ -1,13 +1,5 @@
pub mod heim;
mod zfs;
use crate::heim::{DiskUsage, Heim, IOStats, Memory, TemperatureLabel};
use crate::zfs::ZFS;
use color_eyre::{Report, Result};
use futures_util::stream::StreamExt;
use futures_util::{pin_mut, try_join};
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use palantir::get_metrics;
use warp::reject::Reject;
use warp::{Filter, Rejection};
@ -23,131 +15,11 @@ impl From<Report> for ReportRejection {
impl Reject for ReportRejection {}
async fn get_metrics(heim: Heim, zfs: ZFS) -> Result<String, ReportRejection> {
let (hostname, pools, cpu, memory, network, temperatures, disks, disk_usage): (
String,
Vec<DiskUsage>,
f64,
Memory,
_,
HashMap<TemperatureLabel, f32>,
_,
_,
) = try_join! {
heim.hostname(),
zfs.pools(),
heim.cpu_time(),
heim.memory(),
heim.network_stats(),
heim.temperatures(),
heim.disk_stats(),
heim.disk_usage(),
}?;
pin_mut!(network);
pin_mut!(disks);
pin_mut!(disk_usage);
let mut result = String::with_capacity(256);
writeln!(&mut result, "cpu_time{{host=\"{}\"}} {:.1}", hostname, cpu).ok();
writeln!(
&mut result,
"memory_total{{host=\"{}\"}} {}",
hostname, memory.total
)
.ok();
writeln!(
&mut result,
"memory_available{{host=\"{}\"}} {}",
hostname, memory.available
)
.ok();
writeln!(
&mut result,
"memory_free{{host=\"{}\"}} {}",
hostname, memory.free
)
.ok();
for pool in pools {
writeln!(
&mut result,
"zfs_pool_size{{host=\"{}\", pool=\"{}\"}} {}",
hostname, pool.name, pool.size
)
.ok();
writeln!(
&mut result,
"zfs_pool_free{{host=\"{}\", pool=\"{}\"}} {}",
hostname, pool.name, pool.free
)
.ok();
}
while let Some(network) = network.next().await {
let network: IOStats = network;
if network.bytes_received > 0 || network.bytes_sent > 0 {
writeln!(
&mut result,
"net_sent{{host=\"{}\", network=\"{}\"}} {}",
hostname, network.interface, network.bytes_sent
)
.ok();
writeln!(
&mut result,
"net_received{{host=\"{}\", network=\"{}\"}} {}",
hostname, network.interface, network.bytes_received
)
.ok();
}
}
while let Some(disk) = disks.next().await {
let disk: IOStats = disk;
if disk.bytes_received > 0 && disk.bytes_sent > 0 {
writeln!(
&mut result,
"disk_sent{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.interface, disk.bytes_sent
)
.ok();
writeln!(
&mut result,
"disk_received{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.interface, disk.bytes_received
)
.ok();
}
}
let mut found_sizes = HashSet::new();
while let Some(disk) = disk_usage.next().await {
let disk: DiskUsage = disk;
if disk.size > 0 {
if found_sizes.insert((disk.size, disk.free)) {
writeln!(
&mut result,
"disk_size{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.name, disk.size
)
.ok();
writeln!(
&mut result,
"disk_free{{host=\"{}\", disk=\"{}\"}} {}",
hostname, disk.name, disk.free
)
.ok();
}
}
}
for (label, temp) in temperatures {
writeln!(
&mut result,
"temperature{{host=\"{}\", sensor=\"{}\"}} {:.1}",
hostname, label, temp
)
.ok();
}
Result::<_, ReportRejection>::Ok(result)
}
async fn serve_metrics(heim: Heim, zfs: ZFS) -> Result<String, Rejection> {
get_metrics(heim, zfs).await.map_err(warp::reject::custom)
async fn serve_metrics() -> Result<String, Rejection> {
get_metrics()
.await
.map_err(ReportRejection::from)
.map_err(warp::reject::custom)
}
#[tokio::main]
@ -163,13 +35,7 @@ async fn main() -> Result<()> {
})
.expect("Error setting Ctrl-C handler");
let heim = warp::any().map(|| Heim::default());
let zfs = warp::any().map(|| ZFS::default());
let metrics = warp::path!("metrics")
.and(heim)
.and(zfs)
.and_then(serve_metrics);
let metrics = warp::path!("metrics").and_then(serve_metrics);
warp::serve(metrics).run(([0, 0, 0, 0], host_port)).await;
Ok(())

View file

@ -3,34 +3,29 @@ use color_eyre::Result;
use std::process::Command;
use tokio::task::spawn_blocking;
#[derive(Default)]
pub struct ZFS;
impl ZFS {
pub async fn pools(self) -> Result<Vec<DiskUsage>> {
spawn_blocking(move || {
let mut z = Command::new("zpool");
z.args(&["list", "-p", "-H", "-o", "name,size,free"]);
let out = match z.output() {
Ok(out) => out,
Err(_) => return Ok(Vec::new()),
};
if out.status.success() {
let output = String::from_utf8(out.stdout)?;
Ok(output
.lines()
.flat_map(|line| {
let mut parts = line.split_ascii_whitespace();
let name = parts.next()?.to_string();
let size = parts.next()?.parse().ok()?;
let free = parts.next()?.parse().ok()?;
Some(DiskUsage { name, size, free })
})
.collect())
} else {
Ok(Vec::new())
}
})
.await?
}
pub async fn pools() -> Result<Vec<DiskUsage>> {
spawn_blocking(move || {
let mut z = Command::new("zpool");
z.args(&["list", "-p", "-H", "-o", "name,size,free"]);
let out = match z.output() {
Ok(out) => out,
Err(_) => return Ok(Vec::new()),
};
if out.status.success() {
let output = String::from_utf8(out.stdout)?;
Ok(output
.lines()
.flat_map(|line| {
let mut parts = line.split_ascii_whitespace();
let name = parts.next()?.to_string();
let size = parts.next()?.parse().ok()?;
let free = parts.next()?.parse().ok()?;
Some(DiskUsage { name, size, free })
})
.collect())
} else {
Ok(Vec::new())
}
})
.await?
}