parallel matching

This commit is contained in:
Robin Appelman 2024-07-28 16:02:00 +02:00
commit e9b5807127
6 changed files with 97 additions and 61 deletions

40
Cargo.lock generated
View file

@ -261,6 +261,25 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -561,6 +580,7 @@ dependencies = [
"logsmash-data",
"main_error",
"ratatui",
"rayon",
"regex",
"serde",
"serde_json",
@ -823,6 +843,26 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "rayon"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.3"

View file

@ -22,6 +22,7 @@ hdrhistogram = "7.5.4"
ahash = "0.8.11"
base64 = "0.21.7"
derive_more = { version = "1.0.0-beta.6", features = ["from"] }
rayon = "1.10.0"
[profile.dev.package."*"]
opt-level = 3

View file

@ -4,7 +4,6 @@ use crate::matcher::MatchResult;
use crate::timegraph::TimeGraph;
use logsmash_data::StatementList;
use std::collections::BTreeMap;
use std::sync::Mutex;
use time::OffsetDateTime;
pub struct App {
@ -16,7 +15,7 @@ pub struct App {
pub error_count: usize,
pub all: LogMatch,
pub unmatched: LogMatch,
pub log_file: Mutex<LogFile>,
pub log_file: LogFile,
}
impl App {
@ -29,8 +28,8 @@ impl App {
self.matches.len() + 1 + unmatched_line_count
}
pub fn get_line(&self, index: usize) -> Option<String> {
self.log_file.lock().unwrap().nth(index)
pub fn get_line(&self, index: usize) -> Option<&str> {
self.log_file.nth(index)
}
}

View file

@ -1,17 +1,15 @@
use crate::error::ReadError;
use itertools::Either;
use std::fs::File;
use std::io::{BufRead, BufReader, Seek};
use std::io::Read;
use zip::ZipArchive;
pub enum LogFile {
Plain(BufReader<File>),
Zip(ZipArchive<File>),
pub struct LogFile {
content: String,
}
impl LogFile {
pub fn open(path: &str) -> Result<LogFile, ReadError> {
let file = File::open(path)?;
let mut file = File::open(path)?;
if path.ends_with(".zip") {
let mut zip = ZipArchive::new(file)?;
if zip.len() > 1 {
@ -19,40 +17,25 @@ impl LogFile {
} else if zip.is_empty() {
return Err(ReadError::NoFiles);
}
// ensure we can open the file
let _ = zip.by_index(0)?;
Ok(LogFile::Zip(zip))
let mut log = zip.by_index(0)?;
let mut content = String::with_capacity(log.size() as usize);
log.read_to_string(&mut content)?;
Ok(LogFile { content })
} else {
Ok(LogFile::Plain(BufReader::new(file)))
let mut content = String::new();
file.read_to_string(&mut content)?;
Ok(LogFile { content })
}
}
pub fn iter(&mut self) -> impl Iterator<Item = String> + '_ {
match self {
LogFile::Plain(file) => Either::Left(file.lines().flatten()),
LogFile::Zip(zip) => {
let file = zip.by_index(0).expect("failed to open zip content again");
Either::Right(BufReader::new(file).lines().flatten())
}
}
pub fn iter<'a>(&'a self) -> impl Iterator<Item = &'a str> + Send + 'a {
self.content.lines()
}
pub fn nth(&mut self, index: usize) -> Option<String> {
match self {
LogFile::Plain(file) => {
file.rewind().unwrap();
file.lines().nth(index).transpose().ok().flatten()
}
LogFile::Zip(zip) => {
let file = zip.by_index(0).expect("failed to open zip content again");
BufReader::new(file)
.lines()
.nth(index)
.transpose()
.ok()
.flatten()
}
}
pub fn nth(&self, index: usize) -> Option<&str> {
self.iter().nth(index)
}
}

View file

@ -8,10 +8,11 @@ use base64::prelude::*;
use clap::Parser;
use logsmash_data::{default_apps, get_statements, SourceDefinition};
use main_error::MainResult;
use rayon::prelude::ParallelBridge;
use rayon::prelude::*;
use std::borrow::Cow;
use std::collections::HashMap;
use std::iter::once;
use std::sync::Mutex;
mod app;
mod error;
@ -32,7 +33,7 @@ struct Args {
fn main() -> MainResult {
let args = Args::parse();
let mut log_file = LogFile::open(&args.file).map_err(|err| LogError::Read {
let log_file = LogFile::open(&args.file).map_err(|err| LogError::Read {
err,
path: args.file,
})?;
@ -59,32 +60,44 @@ fn main() -> MainResult {
let matcher = Matcher::new(&statements);
let lines = once(first).chain(lines);
let results: Vec<_> = lines
.enumerate()
.par_bridge()
.filter(|(_, line)| line.starts_with('{'))
.map(|(index, line)| {
let mut parsed = serde_json::from_str::<LogLine>(&line)?;
parsed.index = index;
let log_match = matcher.match_log(&parsed);
Result::<_, serde_json::Error>::Ok((parsed, log_match))
})
.collect();
let mut error_count = 0;
let mut unmatched_counts: HashMap<String, Vec<usize>> = HashMap::new();
let mut parsed_lines = Vec::with_capacity(1024);
let mut unmatched_lines = Vec::with_capacity(256);
let mut parsed_index = 0;
for (index, line) in lines.enumerate() {
if line.starts_with('{') {
let mut parsed = match serde_json::from_str::<LogLine>(&line) {
Ok(parsed) => parsed,
Err(_) => {
error_count += 1;
continue;
}
};
parsed.index = index;
if let Some(index) = matcher.match_log(&parsed) {
counts.entry(index).or_default().push(parsed_index);
} else if let Some(entry) = unmatched_counts.get_mut(parsed.app.as_str()) {
entry.push(parsed_index)
} else {
unmatched_lines.push(parsed_index);
for result in results {
let parsed = match result {
Ok((parsed, Some(match_result))) => {
counts.entry(match_result).or_default().push(parsed_index);
parsed
}
parsed_lines.push(parsed);
parsed_index += 1;
}
Ok((parsed, None)) => {
unmatched_lines.push(parsed_index);
parsed
}
Err(_) => {
error_count += 1;
continue;
}
};
parsed_lines.push(parsed);
parsed_index += 1;
}
parsed_lines.sort_by_key(|line| line.index);
let mut matched_lines: Vec<(_, _)> = counts.into_iter().collect();
matched_lines.sort_by_key(|(_, lines)| lines.len());
@ -111,7 +124,7 @@ fn main() -> MainResult {
unmatched,
all,
error_count,
log_file: Mutex::new(log_file),
log_file,
};
if args.profile {

View file

@ -38,7 +38,7 @@ pub struct Matcher {
impl Matcher {
pub fn new(statements: &StatementList) -> Matcher {
let mut matches: Vec<_> = statements
let matches: Vec<_> = statements
.iter()
.enumerate()
.map(|(index, statement)| LogMatch::new(index, statement))