move to api client crate

This commit is contained in:
Robin Appelman 2022-04-30 19:53:18 +02:00
commit 71f1ba6f3d
7 changed files with 759 additions and 205 deletions

View file

@ -1,101 +0,0 @@
use crate::Error;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer};
use std::borrow::Cow;
use std::fmt;
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Demo {
pub id: u32,
pub url: String,
pub name: String,
pub server: String,
pub duration: u16,
pub nick: String,
pub map: String,
#[serde(with = "chrono::serde::ts_seconds")]
pub time: DateTime<Utc>,
pub red: String,
pub blue: String,
pub red_score: u8,
pub blue_score: u8,
pub player_count: u8,
pub uploader: u32,
#[serde(deserialize_with = "hex_to_digest")]
pub hash: [u8; 16],
pub backend: String,
pub path: String,
}
/// Deserializes a lowercase hex string to a `[u8; 16]`.
pub fn hex_to_digest<'de, D>(deserializer: D) -> Result<[u8; 16], D::Error>
where
D: Deserializer<'de>,
{
use hex::FromHex;
use serde::de::Error;
let string = <Cow<str>>::deserialize(deserializer)?;
if string.len() == 0 {
return Ok([0; 16]);
}
<[u8; 16]>::from_hex(string.as_ref()).map_err(|err| Error::custom(err.to_string()))
}
#[derive(Debug)]
pub enum ListOrder {
Ascending,
Descending,
}
impl Default for ListOrder {
fn default() -> Self {
ListOrder::Descending
}
}
impl fmt::Display for ListOrder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ListOrder::Ascending => "ASC".fmt(f),
ListOrder::Descending => "DESC".fmt(f),
}
}
}
#[derive(Debug, Default)]
pub struct ListParams {
order: ListOrder,
backend: Option<String>,
}
impl ListParams {
#[allow(dead_code)]
pub fn with_backend(self, backend: impl ToString) -> Self {
ListParams {
backend: Some(backend.to_string()),
..self
}
}
pub fn with_order(self, order: ListOrder) -> Self {
ListParams { order, ..self }
}
}
pub fn list_demos(params: ListParams, page: u32) -> Result<Vec<Demo>, Error> {
let mut req = ureq::get("https://api.demos.tf/demos")
.query("page", &format!("{}", page))
.query("order", &format!("{}", params.order));
if let Some(backend) = params.backend.as_ref() {
req = req.query("backend", backend);
}
let resp = req.call()?;
Ok(resp.into_json()?)
}

View file

@ -1,41 +1,53 @@
use crate::api::{list_demos, ListOrder, ListParams};
use crate::store::Store;
use crate::Error;
use demostf_client::{ApiClient, Demo, ListOrder, ListParams};
use tracing::{info, instrument};
pub struct Backup {
client: ApiClient,
store: Store,
}
impl Backup {
pub fn new(store: Store) -> Self {
Backup { store }
Backup {
store,
client: ApiClient::new(),
}
}
fn backup_demo(&self, name: &str, url: &str, hash: [u8; 16]) -> Result<(), Error> {
let resp = ureq::get(url).call()?;
#[instrument(skip_all, fields(demo.id = demo.id, demo.name = name))]
async fn backup_demo(&self, name: &str, demo: &Demo) -> Result<(), Error> {
info!("backing up");
let chunks = demo.download(&self.client).await?;
let digest = self.store.store(name, &mut resp.into_reader())?;
let digest = self.store.store(name, chunks).await?;
if digest == hash || digest == [0; 16] {
if digest == demo.hash || digest == [0; 16] {
Ok(())
} else {
let _ = self.store.remove(name);
Err(Error::DigestMismatch {
expected: hash,
expected: demo.hash,
got: digest,
})
}
}
fn backup_page(&self, page: u32) -> Result<usize, Error> {
let demos = list_demos(ListParams::default().with_order(ListOrder::Ascending), page)?;
#[instrument(skip(self))]
async fn backup_page(&self, page: u32) -> Result<usize, Error> {
let demos = self
.client
.list(ListParams::default().with_order(ListOrder::Ascending), page)
.await?;
for demo in demos.iter() {
if !demo.url.is_empty() {
let name = demo.url.rsplit('/').next().unwrap();
println!("{} {}", demo.id, name);
if !self.store.exists(name) {
self.backup_demo(name, &demo.url, demo.hash)?;
self.backup_demo(name, demo).await?;
} else {
info!(demo = demo.id, name, "already backed up");
}
}
}
@ -43,8 +55,8 @@ impl Backup {
Ok(demos.len())
}
pub fn backup_from(&self, mut page: u32) -> Result<u32, Error> {
while self.backup_page(page)? > 0 {
pub async fn backup_from(&self, mut page: u32) -> Result<u32, Error> {
while self.backup_page(page).await? > 0 {
page += 1;
}

View file

@ -8,26 +8,22 @@ use std::cmp::max;
use std::collections::HashMap;
use std::path::PathBuf;
use thiserror::Error;
mod api;
use tracing::info;
#[derive(Debug, Error)]
pub enum Error {
#[error("Request failed: {0}")]
Request(#[from] std::io::Error),
#[error("Request failed: {0}")]
UReq(Box<ureq::Error>),
#[error(transparent)]
Api(#[from] demostf_client::Error),
#[error("MD5 digest mismatch for downloaded demo, expected {expected:?}, received {got:?}")]
DigestMismatch { expected: [u8; 16], got: [u8; 16] },
}
impl From<ureq::Error> for Error {
fn from(e: ureq::Error) -> Self {
Error::UReq(Box::new(e))
}
}
#[tokio::main]
async fn main() -> Result<(), MainError> {
tracing_subscriber::fmt::init();
fn main() -> Result<(), MainError> {
let mut args: HashMap<_, _> = dotenv::vars().collect();
let store = Store::new(args.get("STORAGE_ROOT").expect("no STORAGE_ROOT set"));
let state_path = PathBuf::from(args.remove("STATE_FILE").expect("no STATE_FILE set"));
@ -44,8 +40,9 @@ fn main() -> Result<(), MainError> {
} else {
1u32
};
info!(last_page, "starting backup");
let current_page = backup.backup_from(last_page)?;
let current_page = backup.backup_from(last_page).await?;
std::fs::write(&state_path, format!("{}", current_page))?;

View file

@ -1,9 +1,13 @@
use crate::Error;
use bytes::Bytes;
use futures_util::{Stream, StreamExt};
use md5::Context;
use std::fs;
use std::fs::{File, Permissions};
use std::io::{ErrorKind, Read, Write};
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use tokio::pin;
pub struct Store {
basedir: PathBuf,
@ -16,30 +20,28 @@ impl Store {
}
}
pub fn store(&self, name: &str, data: &mut impl Read) -> std::io::Result<[u8; 16]> {
pub async fn store(
&self,
name: &str,
data: impl Stream<Item = Result<Bytes, demostf_client::Error>>,
) -> Result<[u8; 16], Error> {
let path = self.generate_path(name);
fs::create_dir_all(path.parent().unwrap())?;
let mut file = File::create(&path)?;
let mut context = Context::new();
let mut buf = [0u8; 8 * 1024];
pin!(data);
// copy the file and compute the digest was we go
loop {
let len = match data.read(&mut buf) {
Ok(0) => return Ok(context.compute().0),
Ok(len) => len,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
let data = &buf[..len];
context.consume(data);
file.write_all(data)?;
file.set_permissions(Permissions::from_mode(0o644))?;
while let Some(chunk) = data.next().await {
let chunk = chunk?;
context.consume(&chunk);
file.write_all(&chunk)?;
}
file.set_permissions(Permissions::from_mode(0o644))?;
Ok(context.compute().0)
}
pub fn exists(&self, name: &str) -> bool {