rate limited and cache controll attempt

This commit is contained in:
Robin Appelman 2024-12-26 00:25:35 +01:00
commit 988f2e4603
7 changed files with 544 additions and 199 deletions

View file

@ -1,4 +1,3 @@
use color_eyre::{eyre::WrapErr, Result};
use reqwest::header::{HeaderValue, InvalidHeaderValue};
use secretfile::{load, SecretError};
use serde::de::Error;
@ -9,6 +8,7 @@ use std::convert::{TryFrom, TryInto};
use std::fs::read_to_string;
use std::path::Path;
use tokio::time::Duration;
use crate::error::ConfigError;
#[derive(Debug, Deserialize)]
pub struct Config {
@ -27,10 +27,12 @@ pub struct FeedConfig {
}
impl Config {
pub fn from_file(path: &str) -> Result<Self> {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
let path = path.as_ref();
let file = read_to_string(path)
.wrap_err_with(|| format!("Failed to open config file {}", path))?;
toml::from_str(&file).wrap_err_with(|| format!("Failed to open config file {}", path))
.map_err(|error| ConfigError::Read {error, path: path.into()})?;
toml::from_str(&file)
.map_err(|error| ConfigError::Parse {error, path: path.into()})
}
pub fn interval(&self) -> Duration {
@ -55,7 +57,7 @@ impl<'de> Deserialize<'de> for HeaderVal {
impl TryFrom<&HeaderVal> for HeaderValue {
type Error = InvalidHeaderValue;
fn try_from(header: &HeaderVal) -> std::result::Result<Self, Self::Error> {
fn try_from(header: &HeaderVal) -> Result<Self, Self::Error> {
header.0.as_str().try_into()
}
}

62
src/error.rs Normal file
View file

@ -0,0 +1,62 @@
use std::path::PathBuf;
use std::str::FromStr;
use reqwest::StatusCode;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ParseFeedError {
#[error("{0}")]
Parse(<syndication::Feed as FromStr>::Err),
#[error("Empty feed")]
Empty,
#[error("No guid, pubDate or link set on feed item")]
MissingKey,
}
#[derive(Debug, Error)]
pub enum FetchFeedError {
#[error("Error while fetching feed: {0:#}")]
Network(#[from] reqwest::Error),
#[error("Error while parsing feed: {0:#}")]
Parse(#[from] ParseFeedError),
#[error("Docker hub returned a server error {0}")]
ServerError(StatusCode),
#[error("Docker hub returned a client error {0}")]
ClientError(StatusCode),
}
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("Error while reading config file {}: {:#}", path.display(), error)]
Read {
error: std::io::Error,
path: PathBuf,
},
#[error("Error while parse config file {}: {:#}", path.display(), error)]
Parse {
error: toml::de::Error,
path: PathBuf,
},
}
#[derive(Debug, Error)]
pub enum HubError {
#[error("Error while fetching docker hub info: {0:#}")]
Network(#[from] reqwest::Error),
#[error("Error while parsing hub response: {0:#}")]
Parse(#[from] serde_json::Error),
#[error("Docker hub returned a server error {0}")]
ServerError(StatusCode),
#[error("Docker hub returned a client error {0}")]
ClientError(StatusCode),
#[error("Invalid hub url format")]
InvalidFormat,
}
#[derive(Debug, Error)]
pub enum FetchError {
#[error(transparent)]
Feed(#[from] FetchFeedError),
#[error(transparent)]
Hub(#[from] HubError),
}

263
src/fetcher.rs Normal file
View file

@ -0,0 +1,263 @@
use reqwest::header::{
HeaderMap, HeaderValue, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RETRY_AFTER,
};
use reqwest::{Response, StatusCode};
use std::future::Future;
use std::time::{Duration, Instant};
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;
/// waiting 6 hours after a 429 should be slow enough for everyone
const DEFAULT_BACKOFF: Duration = Duration::from_secs(6 * 60 * 60);
const ONE_SEC: Duration = Duration::from_secs(1);
pub enum FetchPlanInput {
Retry {
time: Instant,
headers: CacheHeaders,
},
WithCache {
headers: CacheHeaders,
},
}
impl FetchPlanInput {
#[allow(dead_code)]
pub fn into_cache_headers(self) -> CacheHeaders {
match self {
FetchPlanInput::Retry { headers, .. } => headers,
FetchPlanInput::WithCache { headers } => headers,
}
}
}
#[derive(Default, Debug)]
pub struct CacheHeaders {
etag: Option<String>,
last_modified: Option<OffsetDateTime>,
}
impl CacheHeaders {
pub fn from_headers(headers: &HeaderMap) -> CacheHeaders {
let etag = headers
.get(ETAG)
.and_then(|header| header.to_str().ok())
.map(String::from);
let last_modified = headers
.get(LAST_MODIFIED)
.and_then(|header| header.to_str().ok())
.and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
CacheHeaders {
etag,
last_modified,
}
}
pub fn set_headers(&self, headers: &mut HeaderMap) {
match (&self.last_modified, &self.etag) {
(_, Some(etag)) => {
headers.insert(
IF_NONE_MATCH,
HeaderValue::from_str(etag).expect("malformed etag"),
);
}
(Some(last_modified), None) => {
headers.insert(
IF_MODIFIED_SINCE,
HeaderValue::from_str(&last_modified.format(&Rfc2822).unwrap()).unwrap(),
);
}
_ => {}
}
}
pub fn headers(&self) -> HeaderMap {
let mut headers = HeaderMap::new();
self.set_headers(&mut headers);
headers
}
}
pub struct FetchPlan {
pub time: Instant,
pub headers: CacheHeaders,
}
impl FetchPlan {
pub fn elapsed(&self) -> bool {
Instant::now() > self.time
}
}
impl Default for FetchPlan {
fn default() -> FetchPlan {
FetchPlan {
time: Instant::now(),
headers: CacheHeaders::default(),
}
}
}
/// plan the next fetch, either on startup or right after we finished the previous fetch
pub fn next_fetch(base_interval: Duration, last_result: Option<FetchPlanInput>) -> FetchPlan {
let now = Instant::now();
match last_result {
Some(FetchPlanInput::Retry { time, headers }) => FetchPlan {
time: now.max(time),
headers,
},
Some(FetchPlanInput::WithCache { headers }) => FetchPlan {
time: now + base_interval,
headers,
},
None => FetchPlan {
time: now + base_interval,
headers: CacheHeaders::default(),
},
}
}
pub enum FetchResponse<T, E> {
Retry {
time: Instant,
headers: CacheHeaders,
},
Ok {
headers: CacheHeaders,
response: T,
},
Error {
headers: CacheHeaders,
error: E,
},
}
impl<T, E> FetchResponse<T, E> {
#[allow(dead_code)]
pub fn plan(self) -> FetchPlanInput {
match self {
FetchResponse::Retry { time, headers } => FetchPlanInput::Retry { time, headers },
FetchResponse::Ok { headers, .. } => FetchPlanInput::WithCache { headers },
FetchResponse::Error { headers, .. } => FetchPlanInput::WithCache { headers },
}
}
pub fn into_result(self) -> Result<(Option<T>, FetchPlanInput), (E, FetchPlanInput)> {
match self {
FetchResponse::Retry { time, headers } => {
Ok((None, FetchPlanInput::Retry { time, headers }))
}
FetchResponse::Ok { headers, response } => {
Ok((Some(response), FetchPlanInput::WithCache { headers }))
}
FetchResponse::Error { headers, error } => {
Err((error, FetchPlanInput::WithCache { headers }))
}
}
}
}
impl<E> FetchResponse<Response, E> {
pub fn from_result(result: Result<Response, E>) -> FetchResponse<Response, E> {
match result {
Ok(response) => {
let cache_header = CacheHeaders::from_headers(response.headers());
if response.status() == StatusCode::TOO_MANY_REQUESTS {
let after = response
.headers()
.get(RETRY_AFTER)
.and_then(|header| header.to_str().ok())
.and_then(|str| str.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(DEFAULT_BACKOFF);
FetchResponse::Retry {
time: Instant::now() + after + ONE_SEC,
headers: cache_header,
}
} else {
FetchResponse::Ok {
headers: cache_header,
response,
}
}
}
Err(err) => FetchResponse::Error {
error: err,
headers: CacheHeaders::default(),
},
}
}
pub fn check_status_code<Fc, Fs>(self, client_error: Fc, server_error: Fs) -> Self
where
Fc: Fn(StatusCode) -> E,
Fs: Fn(StatusCode) -> E,
{
match self {
FetchResponse::Ok { headers, response } => {
let status = response.status();
if status.is_client_error() {
FetchResponse::Error {
error: client_error(status),
headers,
}
} else if status.is_server_error() {
FetchResponse::Error {
error: server_error(status),
headers,
}
} else {
FetchResponse::Ok { headers, response }
}
}
rest => rest,
}
}
}
impl<T, E> FetchResponse<T, E> {
pub async fn map<U, Fut, F>(self, f: F) -> FetchResponse<U, E>
where
Fut: Future<Output = U>,
F: Fn(T) -> Fut,
{
match self {
FetchResponse::Retry { time, headers } => FetchResponse::Retry { time, headers },
FetchResponse::Ok { headers, response } => FetchResponse::Ok {
headers,
response: f(response).await,
},
FetchResponse::Error { error, headers } => FetchResponse::Error { error, headers },
}
}
pub fn map_err<U, F>(self, f: F) -> FetchResponse<T, U>
where
F: Fn(E) -> U,
{
match self {
FetchResponse::Retry { time, headers } => FetchResponse::Retry { time, headers },
FetchResponse::Ok { headers, response } => FetchResponse::Ok { headers, response },
FetchResponse::Error { error, headers } => FetchResponse::Error {
error: f(error),
headers,
},
}
}
}
impl<T, E> FetchResponse<Result<T, E>, E> {
pub fn flatten(self) -> FetchResponse<T, E> {
match self {
FetchResponse::Retry { time, headers } => FetchResponse::Retry { time, headers },
FetchResponse::Ok {
headers,
response: Ok(response),
} => FetchResponse::Ok { headers, response },
FetchResponse::Ok {
headers,
response: Err(error),
} => FetchResponse::Error { error, headers },
FetchResponse::Error { error, headers } => FetchResponse::Error { error, headers },
}
}
}

View file

@ -1,33 +1,37 @@
use color_eyre::eyre::WrapErr;
use color_eyre::{eyre::ensure, Result};
use crate::error::HubError;
use crate::fetcher::{CacheHeaders, FetchResponse};
use reqwest::Client;
use serde::Deserialize;
use time::OffsetDateTime;
use tracing::instrument;
#[instrument(skip(client))]
pub async fn tags(client: &Client, user: &str, repo: &str) -> Result<Vec<HubTag>> {
pub async fn tags(
client: &Client,
user: &str,
repo: &str,
cache_headers: &CacheHeaders,
) -> FetchResponse<Vec<HubTag>, HubError> {
let result = client
.get(format!(
"https://hub.docker.com/v2/repositories/{}/{}/tags",
user, repo
))
.headers(cache_headers.headers())
.send()
.await
.wrap_err("error with sending docker hub request")?;
ensure!(
!result.status().is_client_error(),
"error with sending docker hub request {}/{}: {}", user, repo, result.status()
);
ensure!(
!result.status().is_server_error(),
"docker hub request returned an error {}/{}: {}", user, repo, result.status()
);
Ok(result
.json::<HubTagResponse>()
.await
.wrap_err("failed to parse hub response")?
.results)
.await;
FetchResponse::from_result(result)
.map_err(HubError::Network)
.check_status_code(HubError::ClientError, HubError::ServerError)
.map(|response| async {
response
.text()
.await
.map_err(HubError::Network)
.and_then(|text| serde_json::from_str::<HubTagResponse>(&text).map_err(HubError::Parse))
.map(|result| result.results)
}).await.flatten()
}
#[derive(Debug, Deserialize)]

View file

@ -1,24 +1,27 @@
mod config;
mod error;
mod fetcher;
mod hub;
use crate::config::{Config, FeedConfig};
use color_eyre::{
eyre::{eyre, WrapErr},
Result,
};
use reqwest::Client;
use syndication::Feed;
use crate::error::{FetchError, FetchFeedError, HubError, ParseFeedError};
use crate::fetcher::{next_fetch, CacheHeaders, FetchPlan, FetchResponse};
use main_error::MainResult;
use reqwest::{Client, Response};
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::future::ready;
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use tokio::time::sleep;
use tokio::signal::ctrl_c;
use std::time::{Duration};
use syndication::Feed;
use tokio::select;
use tracing::{debug, error, info, instrument};
use tokio::signal::ctrl_c;
use tokio::time::sleep;
use tracing::{debug, error, info, instrument, warn};
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> MainResult {
tracing_subscriber::fmt::init();
let mut args = std::env::args();
let bin = args.next().unwrap();
@ -33,9 +36,11 @@ async fn main() -> Result<()> {
let config = Config::from_file(&file)?;
println!("Running rss trigger for {} feeds", config.feed.len());
info!("Running rss trigger for {} feeds", config.feed.len());
let ctrl_c = async { ctrl_c().await.ok(); };
let ctrl_c = async {
ctrl_c().await.ok();
};
select! {
_ = ctrl_c => {},
@ -45,7 +50,7 @@ async fn main() -> Result<()> {
}
async fn main_loop(config: Config) {
let mut fetcher = FeedFetcher::default();
let mut fetcher = FeedFetcher::new(config.interval());
loop {
for feed in config.feed.iter() {
@ -65,7 +70,9 @@ async fn main_loop(config: Config) {
#[instrument(skip_all, fields(feed = feed.feed))]
async fn trigger(client: &Client, feed: &FeedConfig) {
info!("Triggering hook");
let mut req = client.post(&feed.hook).header("user-agent", "rss-webhook-trigger");
let mut req = client
.post(&feed.hook)
.header("user-agent", "rss-webhook-trigger");
for (key, value) in &feed.headers {
req = req.header(key, value);
}
@ -78,93 +85,151 @@ async fn trigger(client: &Client, feed: &FeedConfig) {
}
}
#[derive(Default)]
pub struct FeedFetcher {
client: Client,
base_interval: Duration,
cache: HashMap<String, u64>,
fetch_plans: HashMap<String, FetchPlan>,
}
impl FeedFetcher {
#[instrument(skip(self))]
pub async fn check_feed_updated(&mut self, feed: &str) -> Result<bool> {
let new_key = self.get_feed_key(feed).await?;
pub fn new(interval: Duration) -> Self {
FeedFetcher {
client: Client::default(),
base_interval: interval,
cache: HashMap::default(),
fetch_plans: HashMap::default(),
}
}
Ok(match self.cache.get_mut(feed) {
Some(cached) => {
pub fn should_update(&self, feed: &str) -> bool {
self.fetch_plans.get(feed).filter(|plan| FetchPlan::elapsed(plan)).is_some()
}
#[instrument(skip(self))]
pub async fn check_feed_updated(&mut self, feed: &str) -> Result<bool, FetchError> {
if !self.should_update(feed) {
warn!("skipping feed util rate limited expires");
return Ok(false);
}
let plan = self.fetch_plans.remove(feed).unwrap_or_default();
let fetch_result = self.get_feed_key(feed, &plan.headers).await;
let new_key = match fetch_result.into_result() {
Ok((new_key, new_plan)) => {
self.fetch_plans.insert(feed.into(), next_fetch(self.base_interval, Some(new_plan)));
new_key
}
Err((err, new_plan)) => {
self.fetch_plans.insert(feed.into(), next_fetch(self.base_interval, Some(new_plan)));
return Err(err);
}
};
Ok(match (self.cache.get_mut(feed), new_key) {
(Some(cached), Some(new_key)) => {
debug!(cached, new_key, "checked existing feed");
if *cached != new_key {
if new_key != *cached {
*cached = new_key;
true
} else {
false
}
}
None => {
(None, Some(new_key)) => {
debug!(feed, "new feed");
self.cache.insert(feed.into(), new_key);
// don't trigger the actions on start
false
}
(_, None) => {
warn!("rate limited by server");
false
}
})
}
#[instrument(skip(self))]
async fn get_feed_key(&self, feed: &str) -> Result<u64> {
async fn get_feed_key(
&self,
feed: &str,
cache_headers: &CacheHeaders,
) -> FetchResponse<u64, FetchError> {
if let Some(hub) = feed.strip_prefix("docker-hub://") {
if let Some((user, repo)) = hub.split_once('/') {
let tags = hub::tags(&self.client, user, repo).await?;
let mut hasher = DefaultHasher::new();
for tag in tags {
tag.id.hash(&mut hasher);
tag.last_updated.hash(&mut hasher);
}
Ok(hasher.finish())
hub::tags(&self.client, user, repo, cache_headers)
.await
.map(|tags| {
let mut hasher = DefaultHasher::new();
for tag in tags {
tag.id.hash(&mut hasher);
tag.last_updated.hash(&mut hasher);
}
ready(hasher.finish())
}).await
.map_err(FetchError::Hub)
} else {
Err(eyre!("Invalid hub format {}", feed))
FetchResponse::Error {
error: HubError::InvalidFormat.into(),
headers: CacheHeaders::default(),
}
}
} else {
self.get_rss_feed_key(feed).await
self.get_rss_feed_key(feed, cache_headers)
.await
.map_err(FetchError::Feed)
}
}
#[instrument(skip(self))]
async fn get_rss_feed_key(&self, feed: &str) -> Result<u64> {
let content = self
async fn get_rss_feed_key(
&self,
feed: &str,
cache_headers: &CacheHeaders,
) -> FetchResponse<u64, FetchFeedError> {
let response = self
.client
.get(feed)
.headers(cache_headers.headers())
.send()
.await;
let plan_result = FetchResponse::from_result(response);
plan_result
.map_err(FetchFeedError::Network)
.check_status_code(FetchFeedError::ClientError, FetchFeedError::ServerError)
.map(parse_rss_response)
.await
.wrap_err_with(|| eyre!("Failed to load feed {}", feed))?
.text()
.await
.wrap_err_with(|| eyre!("Failed to load feed {}", feed))?;
let channel = Feed::from_str(&content)
.map_err(|_| eyre!("Failed to parse feed {}", feed))?;
let mut hasher = DefaultHasher::new();
match channel {
Feed::RSS(channel) => {
let item = channel.items.first().ok_or(eyre!("Empty feed"))?;
if let Some(guid) = item.guid() {
guid.value.hash(&mut hasher);
} else if let Some(date) = item.pub_date() {
date.hash(&mut hasher);
} else if let Some(link) = item.link() {
link.hash(&mut hasher);
} else {
return Err(eyre!("No guid, pubDate or link set on feed item"));
}
}
Feed::Atom(channel) => {
let item = channel.entries().first().ok_or(eyre!("Empty feed"))?;
item.id().hash(&mut hasher);
}
}
Ok(hasher.finish())
.flatten()
}
}
async fn parse_rss_response(response: Response) -> Result<u64, FetchFeedError> {
let content = response.text().await?;
let channel = Feed::from_str(&content).map_err(ParseFeedError::Parse)?;
let mut hasher = DefaultHasher::new();
match channel {
Feed::RSS(channel) => {
let item = channel.items.first().ok_or(ParseFeedError::Empty)?;
if let Some(guid) = item.guid() {
guid.value.hash(&mut hasher);
} else if let Some(date) = item.pub_date() {
date.hash(&mut hasher);
} else if let Some(link) = item.link() {
link.hash(&mut hasher);
} else {
return Err(ParseFeedError::MissingKey.into());
}
}
Feed::Atom(channel) => {
let item = channel.entries().first().ok_or(ParseFeedError::Empty)?;
item.id().hash(&mut hasher);
}
}
Ok(hasher.finish())
}