cleanup not-modified handling

This commit is contained in:
Robin Appelman 2024-12-26 15:28:21 +01:00
commit 3b430e3ab1
3 changed files with 44 additions and 38 deletions

View file

@ -29,6 +29,10 @@ impl FetchPlanInput {
FetchPlanInput::WithCache { headers } => headers, FetchPlanInput::WithCache { headers } => headers,
} }
} }
pub fn is_retry(&self) -> bool {
matches!(self, FetchPlanInput::Retry { .. })
}
} }
#[derive(Default, Debug)] #[derive(Default, Debug)]
@ -126,6 +130,9 @@ pub enum FetchResponse<T, E> {
headers: CacheHeaders, headers: CacheHeaders,
response: T, response: T,
}, },
NotModified {
headers: CacheHeaders,
},
Error { Error {
headers: CacheHeaders, headers: CacheHeaders,
error: E, error: E,
@ -138,20 +145,24 @@ impl<T, E> FetchResponse<T, E> {
match self { match self {
FetchResponse::Retry { time, headers } => FetchPlanInput::Retry { time, headers }, FetchResponse::Retry { time, headers } => FetchPlanInput::Retry { time, headers },
FetchResponse::Ok { headers, .. } => FetchPlanInput::WithCache { headers }, FetchResponse::Ok { headers, .. } => FetchPlanInput::WithCache { headers },
FetchResponse::NotModified { headers, .. } => FetchPlanInput::WithCache { headers },
FetchResponse::Error { headers, .. } => FetchPlanInput::WithCache { headers }, FetchResponse::Error { headers, .. } => FetchPlanInput::WithCache { headers },
} }
} }
pub fn into_result(self) -> Result<(Option<T>, FetchPlanInput), (E, FetchPlanInput)> { pub fn into_result(self) -> (Result<Option<T>, E>, FetchPlanInput) {
match self { match self {
FetchResponse::Retry { time, headers } => { FetchResponse::Retry { time, headers } => {
Ok((None, FetchPlanInput::Retry { time, headers })) (Ok(None), FetchPlanInput::Retry { time, headers })
} }
FetchResponse::Ok { headers, response } => { FetchResponse::Ok { headers, response } => {
Ok((Some(response), FetchPlanInput::WithCache { headers })) (Ok(Some(response)), FetchPlanInput::WithCache { headers })
}
FetchResponse::NotModified { headers } => {
(Ok(None), FetchPlanInput::WithCache { headers })
} }
FetchResponse::Error { headers, error } => { FetchResponse::Error { headers, error } => {
Err((error, FetchPlanInput::WithCache { headers })) (Err(error), FetchPlanInput::WithCache { headers })
} }
} }
} }
@ -174,6 +185,10 @@ impl<E> FetchResponse<Response, E> {
time: Instant::now() + after + ONE_SEC, time: Instant::now() + after + ONE_SEC,
headers: cache_header, headers: cache_header,
} }
} else if response.status() == StatusCode::NOT_MODIFIED {
FetchResponse::NotModified {
headers: cache_header,
}
} else { } else {
FetchResponse::Ok { FetchResponse::Ok {
headers: cache_header, headers: cache_header,
@ -227,6 +242,7 @@ impl<T, E> FetchResponse<T, E> {
headers, headers,
response: f(response).await, response: f(response).await,
}, },
FetchResponse::NotModified { headers } => FetchResponse::NotModified { headers },
FetchResponse::Error { error, headers } => FetchResponse::Error { error, headers }, FetchResponse::Error { error, headers } => FetchResponse::Error { error, headers },
} }
} }
@ -236,7 +252,8 @@ impl<T, E> FetchResponse<T, E> {
{ {
match self { match self {
FetchResponse::Retry { time, headers } => FetchResponse::Retry { time, headers }, FetchResponse::Retry { time, headers } => FetchResponse::Retry { time, headers },
FetchResponse::Ok { headers, response } => FetchResponse::Ok { headers, response }, FetchResponse::Ok { response, headers } => FetchResponse::Ok { response, headers },
FetchResponse::NotModified { headers } => FetchResponse::NotModified { headers },
FetchResponse::Error { error, headers } => FetchResponse::Error { FetchResponse::Error { error, headers } => FetchResponse::Error {
error: f(error), error: f(error),
headers, headers,
@ -257,7 +274,11 @@ impl<T, E> FetchResponse<Result<T, E>, E> {
headers, headers,
response: Err(error), response: Err(error),
} => FetchResponse::Error { error, headers }, } => FetchResponse::Error { error, headers },
FetchResponse::Error { error, headers } => FetchResponse::Error { error, headers }, FetchResponse::NotModified { headers } => FetchResponse::NotModified { headers },
FetchResponse::Error { error, headers } => FetchResponse::Error {
error,
headers,
},
} }
} }
} }

View file

@ -1,6 +1,6 @@
use crate::error::HubError; use crate::error::HubError;
use crate::fetcher::{CacheHeaders, FetchResponse}; use crate::fetcher::{CacheHeaders, FetchResponse};
use reqwest::{Client, StatusCode}; use reqwest::{Client};
use reqwest::header::{HeaderValue, USER_AGENT}; use reqwest::header::{HeaderValue, USER_AGENT};
use serde::Deserialize; use serde::Deserialize;
use time::OffsetDateTime; use time::OffsetDateTime;
@ -28,9 +28,6 @@ pub async fn tags(
.map_err(HubError::Network) .map_err(HubError::Network)
.check_status_code(HubError::ClientError, HubError::ServerError) .check_status_code(HubError::ClientError, HubError::ServerError)
.map(|response| async { .map(|response| async {
if response.status() == StatusCode::NOT_MODIFIED {
return Ok(Vec::new());
}
response response
.text() .text()
.await .await

View file

@ -7,7 +7,7 @@ use crate::config::{Config, FeedConfig};
use crate::error::{FetchError, FetchFeedError, HubError, ParseFeedError}; use crate::error::{FetchError, FetchFeedError, HubError, ParseFeedError};
use crate::fetcher::{next_fetch, CacheHeaders, FetchPlan, FetchResponse}; use crate::fetcher::{next_fetch, CacheHeaders, FetchPlan, FetchResponse};
use main_error::MainResult; use main_error::MainResult;
use reqwest::{Client, Response, StatusCode}; use reqwest::{Client, Response};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::ready; use std::future::ready;
@ -121,40 +121,35 @@ impl FeedFetcher {
let plan = self.fetch_plans.remove(feed).unwrap_or_default(); let plan = self.fetch_plans.remove(feed).unwrap_or_default();
let fetch_result = self.get_feed_key(feed, &plan.headers).await; let fetch_result = self.get_feed_key(feed, &plan.headers).await;
let new_key = match fetch_result.into_result() { let (result, new_plan) = fetch_result.into_result();
Ok((new_key, new_plan)) => { let is_retry = new_plan.is_retry();
self.fetch_plans.insert(feed.into(), next_fetch(self.base_interval, Some(new_plan))); self.fetch_plans.insert(feed.into(), next_fetch(self.base_interval, Some(new_plan)));
new_key let new_key = result?;
}
Err((err, new_plan)) => {
self.fetch_plans.insert(feed.into(), next_fetch(self.base_interval, Some(new_plan)));
return Err(err);
}
};
Ok(match (self.cache.get_mut(feed), new_key) { Ok(match (self.cache.get_mut(feed), new_key) {
(Some(cached), Some(Some(new_key))) => { (Some(cached), Some(new_key)) => {
debug!(cached, new_key, "checked existing feed"); debug!(cached, new_key, "checked existing feed");
if new_key != *cached { if new_key != *cached {
*cached = new_key; *cached = new_key;
info!("feed updated");
true true
} else { } else {
false false
} }
} }
(None, Some(Some(new_key))) => { (None, Some(new_key)) => {
debug!(feed, "new feed"); debug!(feed, "new feed");
self.cache.insert(feed.into(), new_key); self.cache.insert(feed.into(), new_key);
// don't trigger the actions on start // don't trigger the actions on start
false false
} }
(_, Some(None)) => { (_, None) if is_retry => {
debug!("not modified response"); warn!("rate limited by server");
false false
} }
(_, None) => { (_, None) => {
warn!("rate limited by server"); debug!("not modified");
false false
} }
}) })
@ -165,21 +160,18 @@ impl FeedFetcher {
&self, &self,
feed: &str, feed: &str,
cache_headers: &CacheHeaders, cache_headers: &CacheHeaders,
) -> FetchResponse<Option<u64>, FetchError> { ) -> FetchResponse<u64, FetchError> {
if let Some(hub) = feed.strip_prefix("docker-hub://") { if let Some(hub) = feed.strip_prefix("docker-hub://") {
if let Some((user, repo)) = hub.split_once('/') { if let Some((user, repo)) = hub.split_once('/') {
hub::tags(&self.client, user, repo, cache_headers) hub::tags(&self.client, user, repo, cache_headers)
.await .await
.map(|tags| { .map(|tags| {
if tags.is_empty() {
return ready(None);
}
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
for tag in tags { for tag in tags {
tag.id.hash(&mut hasher); tag.id.hash(&mut hasher);
tag.last_updated.hash(&mut hasher); tag.last_updated.hash(&mut hasher);
} }
ready(Some(hasher.finish())) ready(hasher.finish())
}).await }).await
.map_err(FetchError::Hub) .map_err(FetchError::Hub)
} else { } else {
@ -200,7 +192,7 @@ impl FeedFetcher {
&self, &self,
feed: &str, feed: &str,
cache_headers: &CacheHeaders, cache_headers: &CacheHeaders,
) -> FetchResponse<Option<u64>, FetchFeedError> { ) -> FetchResponse<u64, FetchFeedError> {
let response = self let response = self
.client .client
.get(feed) .get(feed)
@ -219,11 +211,7 @@ impl FeedFetcher {
} }
} }
async fn parse_rss_response(response: Response) -> Result<Option<u64>, FetchFeedError> { async fn parse_rss_response(response: Response) -> Result<u64, FetchFeedError> {
if response.status() == StatusCode::NOT_MODIFIED {
return Ok(None);
}
let content = response.text().await?; let content = response.text().await?;
let channel = Feed::from_str(&content).map_err(ParseFeedError::Parse)?; let channel = Feed::from_str(&content).map_err(ParseFeedError::Parse)?;
@ -249,5 +237,5 @@ async fn parse_rss_response(response: Response) -> Result<Option<u64>, FetchFeed
} }
} }
Ok(Some(hasher.finish())) Ok(hasher.finish())
} }