killstreaks

This commit is contained in:
Robin Appelman 2022-06-04 23:37:43 +02:00
commit 506de3f06e
7 changed files with 696 additions and 306 deletions

867
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,6 +19,8 @@ serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
steamid-ng = "0.3.4" steamid-ng = "0.3.4"
test-case = "1.0.0" test-case = "1.0.0"
tracing = "0.1.33"
tracing-subscriber = "0.3.11"
[replace] [replace]
"sqlx-macros:0.3.5" = { git = "https://github.com/icewind1991/sqlx", branch = "type_override-exprgroup-0.3.5" } "sqlx-macros:0.3.5" = { git = "https://github.com/icewind1991/sqlx", branch = "type_override-exprgroup-0.3.5" }

View file

@ -26,7 +26,8 @@ CREATE TABLE logs (
clean_map TEXT GENERATED ALWAYS AS (clean_map_name(map)) STORED, clean_map TEXT GENERATED ALWAYS AS (clean_map_name(map)) STORED,
type map_type NOT NULL, type map_type NOT NULL,
date TIMESTAMP WITHOUT TIME ZONE NOT NULL, date TIMESTAMP WITHOUT TIME ZONE NOT NULL,
winner team GENERATED ALWAYS AS (CASE WHEN red_score > blue_score THEN 'red'::team WHEN blue_score > red_score THEN 'blue'::team ELSE 'other'::team END) STORED winner team GENERATED ALWAYS AS (CASE WHEN red_score > blue_score THEN 'red'::team WHEN blue_score > red_score THEN 'blue'::team ELSE 'other'::team END) STORED,
version SMALLINT NOT NULL
); );
CREATE INDEX logs_map_idx CREATE INDEX logs_map_idx
@ -44,6 +45,9 @@ CREATE INDEX logs_winner_idx
CREATE INDEX logs_date_idx CREATE INDEX logs_date_idx
ON logs USING BTREE (date); ON logs USING BTREE (date);
CREATE INDEX logs_version_idx
ON logs USING BTREE (version);
CREATE TABLE rounds ( CREATE TABLE rounds (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
round INTEGER NOT NULL, round INTEGER NOT NULL,
@ -360,3 +364,23 @@ CREATE MATERIALIZED VIEW user_names AS
CREATE UNIQUE INDEX user_names_steam_id_idx CREATE UNIQUE INDEX user_names_steam_id_idx
ON user_names USING BTREE (steam_id); ON user_names USING BTREE (steam_id);
CREATE TABLE kill_streaks (
id BIGSERIAL PRIMARY KEY,
log_id INTEGER NOT NULL REFERENCES logs(id),
steam_id BIGINT NOT NULL,
time INTEGER NOT NULL,
streak INTEGER NOT NULL
);
CREATE INDEX kill_streaks_id_idx
ON kill_streaks USING BTREE (id);
CREATE INDEX kill_streaks_steam_id_idx
ON kill_streaks USING BTREE (steam_id);
CREATE INDEX kill_streaks_log_id_idx
ON kill_streaks USING BTREE (log_id);
CREATE INDEX kill_streaks_steam_id_streak_idx
ON kill_streaks USING BTREE (steam_id, streak);

View file

@ -5,12 +5,14 @@ use chrono::{DateTime, Utc};
use sqlx::PgPool; use sqlx::PgPool;
use std::collections::HashMap; use std::collections::HashMap;
use steamid_ng::SteamID; use steamid_ng::SteamID;
use tracing::instrument;
#[instrument(skip(pool, log))]
pub async fn store_log(pool: &PgPool, id: i32, log: &NormalizedLog) -> Result<(), sqlx::Error> { pub async fn store_log(pool: &PgPool, id: i32, log: &NormalizedLog) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?; let mut tx = pool.begin().await?;
sqlx::query!( sqlx::query!(
"INSERT INTO logs(id, red_score, blue_score, length, game_mode, map, type, date)\ "INSERT INTO logs(id, red_score, blue_score, length, game_mode, map, type, date, version)\
VALUES($1, $2, $3, $4, $5, $6, $7, $8)", VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)",
id, id,
log.teams.red.score as i32, log.teams.red.score as i32,
log.teams.blue.score as i32, log.teams.blue.score as i32,
@ -18,7 +20,8 @@ pub async fn store_log(pool: &PgPool, id: i32, log: &NormalizedLog) -> Result<()
log.game_mode() as GameMode, log.game_mode() as GameMode,
log.info.map, log.info.map,
log.info.map_type() as MapType, log.info.map_type() as MapType,
log.info.date() as DateTime<Utc> log.info.date() as DateTime<Utc>,
2
) )
.execute(&mut tx) .execute(&mut tx)
.await?; .await?;
@ -272,6 +275,53 @@ pub async fn store_log(pool: &PgPool, id: i32, log: &NormalizedLog) -> Result<()
} }
} }
for kill_streak in &log.kill_streaks {
sqlx::query!(
"INSERT INTO kill_streaks(log_id, steam_id, time, streak)\
VALUES($1, $2, $3, $4)",
id,
u64::from(kill_streak.steamid) as i64,
kill_streak.time,
kill_streak.streak,
)
.execute(&mut tx)
.await?;
}
tx.commit().await?;
Ok(())
}
#[instrument(skip(pool, log))]
pub async fn upgrade(
pool: &PgPool,
id: i32,
log: &NormalizedLog,
from: i16,
to: i16,
) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;
if from <= 1 && to >= 2 {
for kill_streak in &log.kill_streaks {
sqlx::query!(
"INSERT INTO kill_streaks(log_id, steam_id, time, streak)\
VALUES($1, $2, $3, $4)",
id,
u64::from(kill_streak.steamid) as i64,
kill_streak.time,
kill_streak.streak,
)
.execute(&mut tx)
.await?;
}
}
sqlx::query!("UPDATE logs SET version = $1 WHERE id = $2", to, id)
.execute(&mut tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())

View file

@ -3,14 +3,19 @@ mod database;
mod normalized; mod normalized;
mod raw; mod raw;
use crate::database::store_log; use crate::database::{store_log, upgrade};
use crate::normalized::NormalizedLog; use crate::normalized::NormalizedLog;
use main_error::MainError; use main_error::MainError;
use sqlx::{postgres::PgQueryAs, PgPool}; use sqlx::{postgres::PgQueryAs, PgPool};
use tokio::time::{delay_for, Duration}; use tokio::time::{delay_for, Duration};
use tracing::{error, info, instrument};
const OLD_VERSION: i16 = 1;
const VERSION: i16 = 2;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), MainError> { async fn main() -> Result<(), MainError> {
tracing_subscriber::fmt::init();
let database_url = dotenv::var("DATABASE_URL")?; let database_url = dotenv::var("DATABASE_URL")?;
let raw_database_url = dotenv::var("RAW_DATABASE_URL")?; let raw_database_url = dotenv::var("RAW_DATABASE_URL")?;
@ -28,21 +33,41 @@ async fn normalize(database_url: &str, raw_database_url: &str) -> Result<(), Mai
.await?; .await?;
let max = get_max_log(&raw_pool).await?; let max = get_max_log(&raw_pool).await?;
let old = get_min_old_stored_log(&pool, VERSION).await?;
let from = get_max_stored_log(&pool).await?; let from = get_max_stored_log(&pool).await?;
for id in (from + 1)..=max { for id in old..=from {
print!("{} ", id); info!(id = id, from = OLD_VERSION, to = VERSION, "migrating");
if let Some(log) = get_log(&raw_pool, id).await? { if let Some(log) = get_log(&raw_pool, id).await? {
println!("{}", log.info.map); upgrade(&pool, id, &log, OLD_VERSION, VERSION).await?;
} else {
error!(id = id, "invalid");
}
}
for id in (from + 1)..=max {
if let Some(log) = get_log(&raw_pool, id).await? {
info!(id = id, map = display(&log.info.map), "normalizing");
store_log(&pool, id, &log).await?; store_log(&pool, id, &log).await?;
} else { } else {
println!("invalid"); error!(id = id, "invalid");
} }
} }
Ok(()) Ok(())
} }
async fn get_min_old_stored_log(pool: &PgPool, version: i16) -> Result<i32, MainError> {
Ok(sqlx::query!(
r#"SELECT MIN(id) as id from logs WHERE version < $1"#,
version
)
.fetch_one(pool)
.await?
.id
.unwrap_or_default())
}
async fn get_max_stored_log(pool: &PgPool) -> Result<i32, MainError> { async fn get_max_stored_log(pool: &PgPool) -> Result<i32, MainError> {
Ok(sqlx::query!(r#"SELECT MAX(id) as id from logs"#) Ok(sqlx::query!(r#"SELECT MAX(id) as id from logs"#)
.fetch_one(pool) .fetch_one(pool)
@ -58,6 +83,7 @@ async fn get_max_log(pool: &PgPool) -> Result<i32, MainError> {
Ok(row.0) Ok(row.0)
} }
#[instrument(skip(pool))]
async fn get_log(pool: &PgPool, id: i32) -> Result<Option<NormalizedLog>, MainError> { async fn get_log(pool: &PgPool, id: i32) -> Result<Option<NormalizedLog>, MainError> {
let row: (serde_json::Value,) = let row: (serde_json::Value,) =
sqlx::query_as(r#"SELECT json as id from logs_raw where id = $1"#) sqlx::query_as(r#"SELECT json as id from logs_raw where id = $1"#)

View file

@ -2,7 +2,7 @@ pub use crate::data::TeamId;
use crate::data::{GameMode, MapType}; use crate::data::{GameMode, MapType};
use crate::raw::RawLog; use crate::raw::RawLog;
pub use crate::raw::{ pub use crate::raw::{
ChatMessage, ClassNumbers, Event, Player, RoundPlayer, Team, Teams, Uploader, ChatMessage, ClassNumbers, Event, KillStreak, Player, RoundPlayer, Team, Teams, Uploader,
}; };
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
use serde::Deserialize; use serde::Deserialize;
@ -24,6 +24,7 @@ pub struct NormalizedLog {
pub class_kill_assists: HashMap<SteamID, ClassNumbers>, pub class_kill_assists: HashMap<SteamID, ClassNumbers>,
pub chat: Vec<ChatMessage>, pub chat: Vec<ChatMessage>,
pub info: Info, pub info: Info,
pub kill_streaks: Vec<KillStreak>,
} }
impl NormalizedLog { impl NormalizedLog {
@ -149,6 +150,7 @@ impl From<RawLog> for NormalizedLog {
class_kill_assists: raw.class_kill_assists.unwrap_or_default(), class_kill_assists: raw.class_kill_assists.unwrap_or_default(),
chat: raw.chat, chat: raw.chat,
info, info,
kill_streaks: raw.kill_streaks.unwrap_or_default(),
}; };
normalize_stopwatch_events(&mut normalized); normalize_stopwatch_events(&mut normalized);

View file

@ -1,7 +1,7 @@
use crate::data::{Class, Medigun, TeamId}; use crate::data::{Class, Medigun, TeamId};
use serde::export::TryFrom;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom;
use steamid_ng::SteamID; use steamid_ng::SteamID;
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
@ -24,6 +24,8 @@ pub struct RawLog {
pub class_kill_assists: Option<HashMap<SteamID, ClassNumbers>>, pub class_kill_assists: Option<HashMap<SteamID, ClassNumbers>>,
pub chat: Vec<ChatMessage>, pub chat: Vec<ChatMessage>,
pub info: Info, pub info: Info,
#[serde(rename = "killstreaks")]
pub kill_streaks: Option<Vec<KillStreak>>,
} }
#[derive(Debug, Clone, Deserialize, Default)] #[derive(Debug, Clone, Deserialize, Default)]
@ -345,6 +347,13 @@ pub struct Uploader {
pub info: Option<String>, pub info: Option<String>,
} }
#[derive(Debug, Clone, Deserialize)]
pub struct KillStreak {
pub steamid: SteamID,
pub streak: i32,
pub time: i32,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;