diff --git a/Cargo.lock b/Cargo.lock index 67ecc17..a8fa3f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,24 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "libc" +version = "0.2.154" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" + +[[package]] +name = "memmap2" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322" +dependencies = [ + "libc", +] + [[package]] name = "rust_1brc" version = "0.1.0" +dependencies = [ + "memmap2", +] diff --git a/Cargo.toml b/Cargo.toml index e145f11..5be0aa9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +memmap2 = "0.9.4" diff --git a/src/aggregate.rs b/src/aggregate.rs index 74de21a..1790571 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -1,6 +1,13 @@ -use std::sync::mpsc::Receiver; use std::collections::HashMap; +use std::sync::mpsc::Receiver; use crate::record::Record; -pub fn reduce(rx: Receiver>) {} +pub fn reduce(rx: Receiver>) { + let mut hmap = HashMap::new(); + while let Ok(stats) = rx.recv() { + for (city, rec) in stats { + hmap.entry(city).or_insert(Record::default()).merge(rec); + } + } +} diff --git a/src/compute.rs b/src/compute.rs index 9df962a..2b44f71 100644 --- a/src/compute.rs +++ b/src/compute.rs @@ -1,9 +1,72 @@ +use crate::pre_processing::Chunk; +use crate::record::Record; +use memmap2::Mmap; use std::collections::HashMap; use std::sync::mpsc::Sender; +use std::sync::Arc; -use crate::pre_processing::Chunk; -use crate::record::Record; +pub fn stats(mmap: Arc, chunk: Chunk, tx: Sender>) { + let hmap = calculate(&mmap[chunk.offset as usize..(chunk.offset + chunk.size) as usize]); + tx.send(hmap).unwrap(); +} + +fn calculate(bytes: &[u8]) -> HashMap { + let mut map: HashMap = HashMap::with_capacity(10_000); + for line in bytes.split(|&b| b == b'\n') { + if !line.is_empty() { + let mut splitted = line.split(|&b| b == b';'); + let city = unsafe { std::str::from_utf8_unchecked(splitted.next().unwrap()) }; + let float = parse_float(splitted.next().unwrap()); + if let Some(rec) = map.get_mut(city) { + rec.add(float); + } else { + map.insert(city.to_string(), Record::from(float)); + } + } + } + map +} + +fn parse_float(bytes: &[u8]) -> i32 { + let mut result = 0; + let mut is_negative = false; + for &b in bytes { + match b { + b'0'..=b'9' => { + let digit = (b - b'0') as i32; + result = result * 10 + digit; + } + b'-' => is_negative = true, + _ => {} + } + } + if is_negative { + result *= -1; + } + result +} + +#[cfg(test)] +mod tests { + use super::*; -type Statistics = HashMap; + fn check(data: &str, expected: HashMap) { + let actual = calculate(data.as_bytes()); + assert_eq!(actual, expected); + } -pub fn stats(chunk: Chunk, tx: Sender) {} + #[test] + fn compute() { + let input = "Stockholm;1.5 +New York;2.0 +Oslo;0.0 +Stockholm;11.5 +Oslo;10.2"; + let expected = HashMap::from([ + ("Stockholm".to_string(), Record::new(15, 115, 130, 2)), + ("New York".to_string(), Record::new(20, 20, 20, 1)), + ("Oslo".to_string(), Record::new(0, 102, 102, 2)), + ]); + check(input, expected); + } +} diff --git a/src/main.rs b/src/main.rs index af897d8..4aff3c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,27 @@ +mod aggregate; mod compute; mod pre_processing; mod record; -mod aggregate; +use memmap2::MmapOptions; use std::fs::File; use std::sync::mpsc; +use std::sync::Arc; use std::thread; fn main() { - let args = std::env::args().collect::>(); - let file = File::open(args[1].clone()).expect("..."); + let file = File::open("./measurements.txt").unwrap(); + let mmap = Arc::new(unsafe { MmapOptions::new().map(&file).unwrap() }); let (tx, rx) = mpsc::channel(); pre_processing::Partition::try_from(file) .unwrap() .chunks .into_iter() .for_each(|chunk| { + let mmap_clone = Arc::clone(&mmap); let tx_clone = tx.clone(); - thread::spawn(move || compute::stats(chunk, tx_clone)); + thread::spawn(move || compute::stats(mmap_clone, chunk, tx_clone)); }); + drop(tx); aggregate::reduce(rx); } diff --git a/src/pre_processing.rs b/src/pre_processing.rs index 62c504d..3fe61af 100644 --- a/src/pre_processing.rs +++ b/src/pre_processing.rs @@ -18,8 +18,8 @@ impl TryFrom for Partition { #[derive(Debug, PartialEq)] pub struct Chunk { - offset: u64, - size: u64, + pub offset: u64, + pub size: u64, } struct Splitter { diff --git a/src/record.rs b/src/record.rs index 37baa6b..45d8dae 100644 --- a/src/record.rs +++ b/src/record.rs @@ -1,54 +1,41 @@ +#[derive(Debug, Default, PartialEq, Eq)] pub struct Record { - min: f32, - max: f32, - sum: f32, + min: i32, + max: i32, + sum: i32, count: usize, } impl Record { - pub fn merge(&mut self, t: f32) { - self.min = if t < self.min { t } else { self.min }; - self.max = if t > self.max { t } else { self.max }; - self.sum += t; - self.count += 1; - } -} - -impl Default for Record { - fn default() -> Self { + pub fn new(min: i32, max: i32, sum: i32, count: usize) -> Self { Self { - min: 0.0, - max: 0.0, - sum: 0.0, - count: 0, + min, max, sum, count } + } -} -impl TryFrom<&str> for Record { - type Error = String; + pub fn merge(&mut self, other: Self) { + self.min = std::cmp::min(self.min, other.min); + self.max = std::cmp::max(self.max, other.max); + self.sum += other.sum; + self.count += other.count; + } - fn try_from(value: &str) -> Result { - let t: f32 = value.split(';').collect::>()[1] - .parse() - .map_err(|_| "Could not parse temprature")?; - Ok(Record { - min: t, - max: t, - sum: t, - count: 1, - }) + pub fn add(&mut self, t: i32) { + self.min = std::cmp::min(self.min, t); + self.max = std::cmp::max(self.max, t); + self.sum += t; + self.count += 1; } } -impl std::fmt::Display for Record { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}/{}/{}", - self.min, - self.max, - self.sum / self.count as f32 - ) +impl From for Record { + fn from(value: i32) -> Self { + Self { + min: value, + max: value, + sum: value, + count: 1, + } } }