This commit is contained in:
Robin Appelman 2024-02-12 20:53:04 +01:00
commit 559773598b
10 changed files with 1443 additions and 573 deletions

1520
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -3,21 +3,22 @@ name = "prometheus-edge-trigger"
version = "0.1.0"
authors = ["Robin Appelman <robin@icewind.nl>"]
edition = "2018"
rust-version = "1.71.0"
[dependencies]
prometheus-edge-detector = { version = "0.2.0", default-features = false, features = ["rustls-tls"] }
mdns = "2.0.2"
tokio = { version = "1.20.1", features = ["macros", "time", "fs", "rt-multi-thread"] }
prometheus-edge-detector = { version = "0.3.0", default-features = false, features = ["rustls-tls"] }
mdns = "3.0.0"
tokio = { version = "1.36.0", features = ["macros", "time", "fs", "rt-multi-thread"] }
main_error = "0.1.2"
futures-util = "0.3.21"
reqwest = { version = "0.11.11", default-features = false, features = ["rustls-tls"] }
serde = { version = "1.0.140", features = ["derive"] }
toml = "0.5.9"
log = "0.4.17"
env_logger = "0.9.0"
err-derive = "0.3.1"
serde_json = "1.0.82"
rumqttc = "0.12.0"
futures-util = "0.3.30"
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls"] }
serde = { version = "1.0.196", features = ["derive"] }
toml = "0.8.10"
log = "0.4.20"
env_logger = "0.11.1"
thiserror = "1.0.57"
serde_json = "1.0.113"
rumqttc = "0.23.0"
hostname = "0.3.1"
[dev-dependencies]

72
flake.lock generated
View file

@ -1,67 +1,57 @@
{
"nodes": {
"flake-utils": {
"locked": {
"lastModified": 1656928814,
"narHash": "sha256-RIFfgBuKz6Hp89yRr7+NR5tzIAbn52h8vT6vXkYjZoM=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "7e2a3b3dfd9af950a856d66b0a7d01e3c18aa249",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"naersk": {
"inputs": {
"nixpkgs": "nixpkgs"
"systems": "systems"
},
"locked": {
"lastModified": 1655042882,
"narHash": "sha256-9BX8Fuez5YJlN7cdPO63InoyBy7dm3VlJkkmTt6fS1A=",
"owner": "nix-community",
"repo": "naersk",
"rev": "cddffb5aa211f50c4b8750adbec0bbbdfb26bb9f",
"lastModified": 1705309234,
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "naersk",
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 0,
"narHash": "sha256-XZ1fKlEkAQvgocuBYq9lXEP3eNW6gF9JHsPVsvkWMhU=",
"path": "/nix/store/2xbq6yjs2lf2j0438h162mf66jvg0wyg-source",
"type": "path"
},
"original": {
"id": "nixpkgs",
"type": "indirect"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 0,
"narHash": "sha256-XZ1fKlEkAQvgocuBYq9lXEP3eNW6gF9JHsPVsvkWMhU=",
"path": "/nix/store/2xbq6yjs2lf2j0438h162mf66jvg0wyg-source",
"type": "path"
"lastModified": 1707650010,
"narHash": "sha256-dOhphIA4MGrH4ElNCy/OlwmN24MsnEqFjRR6+RY7jZw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "809cca784b9f72a5ad4b991e0e7bcf8890f9c3a6",
"type": "github"
},
"original": {
"id": "nixpkgs",
"ref": "nixos-23.11",
"type": "indirect"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"naersk": "naersk",
"nixpkgs": "nixpkgs_2"
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},

201
flake.nix
View file

@ -1,207 +1,46 @@
{
inputs = {
nixpkgs.url = "nixpkgs/nixos-23.11";
flake-utils.url = "github:numtide/flake-utils";
naersk.url = "github:nix-community/naersk";
};
outputs = {
self,
nixpkgs,
flake-utils,
naersk,
}:
flake-utils.lib.eachDefaultSystem (
system: let
pkgs = nixpkgs.legacyPackages."${system}";
naersk-lib = naersk.lib."${system}";
overlays = [
(import ./overlay.nix)
];
pkgs = (import nixpkgs) {
inherit system overlays;
};
in rec {
# `nix build`
packages.prometheus-edge-trigger = naersk-lib.buildPackage {
pname = "prometheus-edge-trigger";
root = ./.;
buildInputs = with pkgs; [gcc];
packages = rec {
prometheus-edge-trigger = pkgs.prometheus-edge-trigger;
default = prometheus-edge-trigger;
};
defaultPackage = packages.prometheus-edge-trigger;
defaultApp = packages.prometheus-edge-trigger;
# `nix develop`
devShell = pkgs.mkShell {
nativeBuildInputs = with pkgs; [rustc cargo bacon cargo-edit cargo-outdated gcc];
nativeBuildInputs = with pkgs; [rustc cargo bacon cargo-edit cargo-outdated clippy cargo-audit];
};
}
)
// {
nixosModule = {
overlays.default = import ./overlay.nix;
nixosModules.default = {
pkgs,
config,
lib,
pkgs,
...
}:
with lib; let
cfg = config.services.prometheus-edge-trigger;
format = pkgs.formats.toml {};
configFile = format.generate "config.toml" {
prometheus.url = cfg.prometheusAddress;
mqtt = {
inherit (cfg.mqtt) host username;
password_file = cfg.mqtt.passwordFile;
};
trigger = map (trigger: {
inherit (trigger) name delay condition;
action = if (trigger.action.method == "MQTT") then {
inherit (trigger.action) method topic payload params;
} else {
inherit (trigger.action) method params url;
};
}) cfg.triggers;
};
in {
options.services.prometheus-edge-trigger = {
enable = mkEnableOption "WiFi prometheus exporter";
prometheusAddress = mkOption {
type = types.str;
description = "address of the prometheus server";
};
logLevel = mkOption {
type = types.str;
default = "INFO";
description = "log level";
};
mqtt = mkOption {
type = types.submodule {
options = {
host = mkOption {
type = types.str;
description = "mqtt server hostname";
};
username = mkOption {
type = types.str;
description = "mqtt username";
};
passwordFile = mkOption {
type = types.str;
description = "path containing the mqtt password";
};
};
};
};
triggers = mkOption {
description = "configured triggers";
type = types.listOf (types.submodule {
options = {
name = mkOption {
type = types.str;
description = "name of the trigger";
};
delay = mkOption {
type = types.int;
description = "delay in secconds";
};
condition = mkOption {
type = types.submodule {
options = {
query = mkOption {
type = types.str;
description = "prometheus query to trigger on";
};
from = mkOption {
type = types.int;
description = "start of the edge";
};
to = mkOption {
type = types.int;
description = "end of the edge";
};
params = mkOption {
type = types.attrs;
default = {};
description = "query substitution parameters";
};
};
};
};
action = mkOption {
type = types.submodule {
options = {
method = mkOption {
type = types.str;
description = "http method or 'MQTT'";
};
topic = mkOption {
type = types.null or types.str;
default = null;
description = "mqtt topic";
};
payload = mkOption {
type = types.null or types.str;
default = null;
description = "mqtt payload";
};
url = mkOption {
type = types.null or types.str;
default = null;
description = "mqtt url";
};
params = mkOption {
type = types.attrs;
default = {};
description = "http url substitution parameters";
};
};
};
};
};
});
};
};
config = mkIf cfg.enable {
systemd.services."prometheus-edge-trigger" = let
pkg = self.defaultPackage.${pkgs.system};
in {
wantedBy = ["multi-user.target"];
script = "${pkg}/bin/prometheus-edge-trigger ${configFile}";
environment = {
RUST_LOG = cfg.logLevel;
};
serviceConfig = {
Restart = "on-failure";
DynamicUser = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
NoNewPrivileges = true;
PrivateDevices = true;
ProtectClock = true;
CapabilityBoundingSet = true;
ProtectKernelLogs = true;
ProtectControlGroups = true;
SystemCallArchitectures = "native";
ProtectKernelModules = true;
RestrictNamespaces = true;
MemoryDenyWriteExecute = true;
ProtectHostname = true;
LockPersonality = true;
ProtectKernelTunables = true;
RestrictAddressFamilies = "AF_INET AF_INET6";
RestrictRealtime = true;
ProtectProc = "noaccess";
SystemCallFilter = ["@system-service" "~@resources" "~@privileged"];
IPAddressDeny = "any";
IPAddressAllow = ["192.168.0.0/16" "localhost" "172.0.0.0/8"];
PrivateUsers = true;
ProcSubset = "pid";
};
};
};
}: {
imports = [./module.nix];
config = lib.mkIf config.services.prometheus-edge-trigger.enable {
nixpkgs.overlays = [self.overlays.default];
services.prometheus-edge-trigger.package = lib.mkDefault pkgs.prometheus-edge-trigger;
};
};
};
}

179
module.nix Normal file
View file

@ -0,0 +1,179 @@
{
config,
lib,
pkgs,
...
}:
with lib; let
cfg = config.services.prometheus-edge-trigger;
format = pkgs.formats.toml {};
configFile = format.generate "config.toml" {
prometheus.url = cfg.prometheusAddress;
mqtt = {
inherit (cfg.mqtt) host username;
password_file = cfg.mqtt.passwordFile;
};
trigger =
map (trigger: {
inherit (trigger) name delay condition;
action =
if (trigger.action.method == "MQTT")
then {
inherit (trigger.action) method topic payload params;
}
else {
inherit (trigger.action) method params url;
};
})
cfg.triggers;
};
in {
options.services.prometheus-edge-trigger = {
enable = mkEnableOption "prometheus edgre trigger";
prometheusAddress = mkOption {
type = types.str;
description = "address of the prometheus server";
};
logLevel = mkOption {
type = types.str;
default = "INFO";
description = "log level";
};
package = mkOption {
type = types.package;
description = "package to use";
};
mqtt = mkOption {
type = types.submodule {
options = {
host = mkOption {
type = types.str;
description = "mqtt server hostname";
};
username = mkOption {
type = types.str;
description = "mqtt username";
};
passwordFile = mkOption {
type = types.str;
description = "path containing the mqtt password";
};
};
};
};
triggers = mkOption {
description = "configured triggers";
type = types.listOf (types.submodule {
options = {
name = mkOption {
type = types.str;
description = "name of the trigger";
};
delay = mkOption {
type = types.int;
description = "delay in secconds";
};
condition = mkOption {
type = types.submodule {
options = {
query = mkOption {
type = types.str;
description = "prometheus query to trigger on";
};
from = mkOption {
type = types.int;
description = "start of the edge";
};
to = mkOption {
type = types.int;
description = "end of the edge";
};
params = mkOption {
type = types.attrs;
default = {};
description = "query substitution parameters";
};
};
};
};
action = mkOption {
type = types.submodule {
options = {
method = mkOption {
type = types.str;
description = "http method or 'MQTT'";
};
topic = mkOption {
type = types.null or types.str;
default = null;
description = "mqtt topic";
};
payload = mkOption {
type = types.null or types.str;
default = null;
description = "mqtt payload";
};
url = mkOption {
type = types.null or types.str;
default = null;
description = "mqtt url";
};
params = mkOption {
type = types.attrs;
default = {};
description = "http url substitution parameters";
};
};
};
};
};
});
};
};
config = mkIf cfg.enable {
systemd.services."prometheus-edge-trigger" = {
wantedBy = ["multi-user.target"];
environment = {
RUST_LOG = cfg.logLevel;
};
serviceConfig = {
ExecStart = "${cfg.package}/bin/prometheus-edge-trigger ${configFile}";
Restart = "on-failure";
DynamicUser = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
NoNewPrivileges = true;
PrivateDevices = true;
ProtectClock = true;
CapabilityBoundingSet = true;
ProtectKernelLogs = true;
ProtectControlGroups = true;
SystemCallArchitectures = "native";
ProtectKernelModules = true;
RestrictNamespaces = true;
MemoryDenyWriteExecute = true;
ProtectHostname = true;
LockPersonality = true;
ProtectKernelTunables = true;
RestrictAddressFamilies = "AF_INET AF_INET6";
RestrictRealtime = true;
ProtectProc = "noaccess";
SystemCallFilter = ["@system-service" "~@resources" "~@privileged"];
IPAddressDeny = "any";
IPAddressAllow = ["192.168.0.0/16" "localhost" "172.0.0.0/8"];
PrivateUsers = true;
ProcSubset = "pid";
};
};
};
}

3
overlay.nix Normal file
View file

@ -0,0 +1,3 @@
final: prev: {
prometheus-edge-trigger = final.callPackage ./package.nix {};
}

28
package.nix Normal file
View file

@ -0,0 +1,28 @@
{
stdenv,
rustPlatform,
libsodium,
pkg-config,
lib,
}: let
inherit (lib.sources) sourceByRegex;
src = sourceByRegex ./. ["Cargo.*" "(src)(/.*)?"];
in
rustPlatform.buildRustPackage rec {
pname = "prometheus-edge-trigger";
version = "0.1.0";
inherit src;
buildInputs = [
libsodium
];
nativeBuildInputs = [
pkg-config
];
cargoLock = {
lockFile = ./Cargo.lock;
};
}

View file

@ -1,21 +1,21 @@
use crate::mdns::resolve_mdns;
use err_derive::Error;
use serde::Deserialize;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs::read_to_string;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ParameterError {
#[error(display = "error while resolving mdns: {}", _0)]
MdnsError(#[error(source)] mdns::Error),
#[error(display = "requested mdns host not found")]
#[error("error while resolving mdns: {0}")]
MdnsError(#[from] mdns::Error),
#[error("requested mdns host not found")]
MdnsHostNotFound,
#[error(display = "error reading file: {}", _0)]
FilesystemError(#[error(source)] std::io::Error),
#[error(display = "malformed service file: {}", _0)]
Service(#[error(source)] serde_json::Error),
#[error(display = "requested service not found")]
#[error("error reading file: {0}")]
FilesystemError(#[from] std::io::Error),
#[error("malformed service file: {0}")]
Service(#[from] serde_json::Error),
#[error("requested service not found")]
ServiceNotFound,
}

View file

@ -1,8 +1,7 @@
use crate::config::Config;
use crate::trigger::TriggerManager;
use main_error::MainError;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use std::fs::read_to_string;
mod config;
mod mdns;
@ -16,11 +15,8 @@ async fn main() -> Result<(), MainError> {
let bin = args.next().unwrap();
if let Some(path) = args.next() {
let mut file = File::open(path).await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let config: Config = toml::from_slice(&contents)?;
let contents = read_to_string(path)?;
let config: Config = toml::from_str(&contents)?;
let trigger_manager = TriggerManager::new(config);
Ok(trigger_manager.run_triggers().await?)

View file

@ -1,7 +1,6 @@
use crate::config::{
Action, Condition, Config, Method, MqttConfig, Parameter, ParameterError, Trigger,
};
use err_derive::Error;
use futures_util::future::try_join_all;
use log::{error, info};
use main_error::MainError;
@ -10,6 +9,7 @@ use reqwest::Client;
use rumqttc::{AsyncClient, ClientError, Event, MqttOptions, Outgoing, QoS};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use thiserror::Error;
use tokio::time::sleep;
pub struct TriggerManager {
@ -32,15 +32,15 @@ fn since(time: u64) -> u64 {
#[derive(Debug, Error)]
pub enum TriggerError {
#[error(display = "{}", _0)]
Parameter(#[error(source)] ParameterError),
#[error(display = "{}", _0)]
Edge(#[error(source)] prometheus_edge_detector::Error),
#[error(display = "{}", _0)]
Network(#[error(source)] reqwest::Error),
#[error(display = "{}", _0)]
Mqtt(#[error(source)] rumqttc::ClientError),
#[error(display = "{}", _0)]
#[error("{0}")]
Parameter(#[from] ParameterError),
#[error("{0}")]
Edge(#[from] prometheus_edge_detector::Error),
#[error("{0}")]
Network(#[from] reqwest::Error),
#[error("{0}")]
Mqtt(#[from] rumqttc::ClientError),
#[error("{0}")]
Configuration(String),
}