Skip to content

Commit

Permalink
Compute (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
hesampakdaman authored May 6, 2024
1 parent e0ff840 commit 41b6015
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 51 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
memmap2 = "0.9.4"
11 changes: 9 additions & 2 deletions src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use std::sync::mpsc::Receiver;
use std::collections::HashMap;
use std::sync::mpsc::Receiver;

use crate::record::Record;

pub fn reduce(rx: Receiver<HashMap<String, Record>>) {}
pub fn reduce(rx: Receiver<HashMap<String, Record>>) {
let mut hmap = HashMap::new();
while let Ok(stats) = rx.recv() {
for (city, rec) in stats {
hmap.entry(city).or_insert(Record::default()).merge(rec);
}
}
}
71 changes: 67 additions & 4 deletions src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,72 @@
use crate::pre_processing::Chunk;
use crate::record::Record;
use memmap2::Mmap;
use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::Arc;

use crate::pre_processing::Chunk;
use crate::record::Record;
pub fn stats(mmap: Arc<Mmap>, chunk: Chunk, tx: Sender<HashMap<String, Record>>) {
let hmap = calculate(&mmap[chunk.offset as usize..(chunk.offset + chunk.size) as usize]);
tx.send(hmap).unwrap();
}

fn calculate(bytes: &[u8]) -> HashMap<String, Record> {
let mut map: HashMap<String, Record> = HashMap::with_capacity(10_000);
for line in bytes.split(|&b| b == b'\n') {
if !line.is_empty() {
let mut splitted = line.split(|&b| b == b';');
let city = unsafe { std::str::from_utf8_unchecked(splitted.next().unwrap()) };
let float = parse_float(splitted.next().unwrap());
if let Some(rec) = map.get_mut(city) {
rec.add(float);
} else {
map.insert(city.to_string(), Record::from(float));
}
}
}
map
}

fn parse_float(bytes: &[u8]) -> i32 {
let mut result = 0;
let mut is_negative = false;
for &b in bytes {
match b {
b'0'..=b'9' => {
let digit = (b - b'0') as i32;
result = result * 10 + digit;
}
b'-' => is_negative = true,
_ => {}
}
}
if is_negative {
result *= -1;
}
result
}

#[cfg(test)]
mod tests {
use super::*;

type Statistics = HashMap<String, Record>;
fn check(data: &str, expected: HashMap<String, Record>) {
let actual = calculate(data.as_bytes());
assert_eq!(actual, expected);
}

pub fn stats(chunk: Chunk, tx: Sender<Statistics>) {}
#[test]
fn compute() {
let input = "Stockholm;1.5
New York;2.0
Oslo;0.0
Stockholm;11.5
Oslo;10.2";
let expected = HashMap::from([
("Stockholm".to_string(), Record::new(15, 115, 130, 2)),
("New York".to_string(), Record::new(20, 20, 20, 1)),
("Oslo".to_string(), Record::new(0, 102, 102, 2)),
]);
check(input, expected);
}
}
12 changes: 8 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
mod aggregate;
mod compute;
mod pre_processing;
mod record;
mod aggregate;

use memmap2::MmapOptions;
use std::fs::File;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;

fn main() {
let args = std::env::args().collect::<Vec<String>>();
let file = File::open(args[1].clone()).expect("...");
let file = File::open("./measurements.txt").unwrap();
let mmap = Arc::new(unsafe { MmapOptions::new().map(&file).unwrap() });
let (tx, rx) = mpsc::channel();
pre_processing::Partition::try_from(file)
.unwrap()
.chunks
.into_iter()
.for_each(|chunk| {
let mmap_clone = Arc::clone(&mmap);
let tx_clone = tx.clone();
thread::spawn(move || compute::stats(chunk, tx_clone));
thread::spawn(move || compute::stats(mmap_clone, chunk, tx_clone));
});
drop(tx);
aggregate::reduce(rx);
}
4 changes: 2 additions & 2 deletions src/pre_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ impl TryFrom<std::fs::File> for Partition {

#[derive(Debug, PartialEq)]
pub struct Chunk {
offset: u64,
size: u64,
pub offset: u64,
pub size: u64,
}

struct Splitter<T: io::Read + io::Seek> {
Expand Down
65 changes: 26 additions & 39 deletions src/record.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,41 @@
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Record {
min: f32,
max: f32,
sum: f32,
min: i32,
max: i32,
sum: i32,
count: usize,
}

impl Record {
pub fn merge(&mut self, t: f32) {
self.min = if t < self.min { t } else { self.min };
self.max = if t > self.max { t } else { self.max };
self.sum += t;
self.count += 1;
}
}

impl Default for Record {
fn default() -> Self {
pub fn new(min: i32, max: i32, sum: i32, count: usize) -> Self {
Self {
min: 0.0,
max: 0.0,
sum: 0.0,
count: 0,
min, max, sum, count
}

}
}

impl TryFrom<&str> for Record {
type Error = String;
pub fn merge(&mut self, other: Self) {
self.min = std::cmp::min(self.min, other.min);
self.max = std::cmp::max(self.max, other.max);
self.sum += other.sum;
self.count += other.count;
}

fn try_from(value: &str) -> Result<Self, Self::Error> {
let t: f32 = value.split(';').collect::<Vec<_>>()[1]
.parse()
.map_err(|_| "Could not parse temprature")?;
Ok(Record {
min: t,
max: t,
sum: t,
count: 1,
})
pub fn add(&mut self, t: i32) {
self.min = std::cmp::min(self.min, t);
self.max = std::cmp::max(self.max, t);
self.sum += t;
self.count += 1;
}
}

impl std::fmt::Display for Record {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{}/{}",
self.min,
self.max,
self.sum / self.count as f32
)
impl From<i32> for Record {
fn from(value: i32) -> Self {
Self {
min: value,
max: value,
sum: value,
count: 1,
}
}
}

0 comments on commit 41b6015

Please sign in to comment.