Skip to content
This repository has been archived by the owner on Jun 7, 2022. It is now read-only.

world_population -can't check AWS because of credentials error #15

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ slack-hook = "0.7"
sentry = "0.12.0"
log = "0.4"
pretty_env_logger = "0.3"
byte-unit = "2.1.0"
byte-unit = "2.1.0"
futures = "0.1"
144 changes: 144 additions & 0 deletions src/covid19_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//use dracula_covid19::*;
use crate::error::DracErr;
//use sentry::integrations::panic::register_panic_handler;

use crate::CovidRecord;
use crate::aws::*;
use crate::cleaner::*;
use crate::parquet_writer::write_records_to_file;

//const CONFIRMED_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Confirmed.csv";
const CONFIRMED_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv";
//const DEATHS_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Confirmed.csv";
const DEATHS_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv";
//const RECOVERED_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Recovered.csv";
const RECOVERED_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv";

#[tokio::main]
pub async fn covid19_data() -> Result<(), DracErr> {
let mut records: Vec<CovidRecord> = Vec::new();

extract_records(CONFIRMED_URL, "confirmed", &mut records)
.await
.unwrap();
extract_records(DEATHS_URL, "deaths", &mut records)
.await
.unwrap();
extract_records(RECOVERED_URL, "recovered", &mut records)
.await
.unwrap();

write_records_to_file("combined.parquet", records);

let bucket = "scientist-datawarehouse".to_string();
let key =
"csse_covid_19_time_series/combined/time_series_19-covid-Combined.parquet".to_string();
let crawler_name = "covid19-combined".to_string();

let key_parts = key.split('/').collect::<Vec<&str>>();
let key_dir = key_parts[0..key_parts.len() - 1].join("/");
let s3_path = format!("s3://{}/{}", bucket, key_dir);

upload_file("combined.parquet", bucket.clone(), key.clone())
.await
.unwrap();
create_crawler(crawler_name.clone(), s3_path).await.unwrap();
start_crawler(crawler_name.clone(), true).await.unwrap();

Ok(())
}

async fn extract_records(
input_url: &str,
status: &str,
records: &mut Vec<CovidRecord>,
) -> Result<(), DracErr> {
let req = reqwest::get(input_url).await?;
let bytes = req.bytes().await?;
let bytes_reader = std::io::Cursor::new(&bytes[..]);

let mut reader = csv::ReaderBuilder::new().from_reader(bytes_reader);

let dates: Vec<chrono::NaiveDateTime> = {
let headers = reader.headers()?;
let mut header_iter = headers.iter();
header_iter.next(); // Province
header_iter.next(); // Country
header_iter.next(); // Lat
header_iter.next(); // Long
header_iter
.map(|date_str| {
// panic!("{}", date_str);
let res = chrono::NaiveDateTime::parse_from_str(
&format!("{} 00:00", date_str),
"%-m/%-d/%Y %H:%M",
);
if let Ok(res) = res {
res
} else {
panic!("could not parse `{}`", date_str)
}
})
.collect()
};

for row in reader.records() {
let row = row?;
let mut row_iter = row.iter();

let province_state = row_iter.next().unwrap().to_string();
let province_state = if province_state == "" {
None
} else {
Some(province_state)
};

let country_region = row_iter.next().unwrap().to_string();

let (city, county, state) = if country_region == "US" {
extract_us_data(&province_state.as_ref().unwrap_or(&"".to_string())[..])
} else {
(None, None, None)
};

let lat = row_iter.next().unwrap().to_string();
let long = row_iter.next().unwrap().to_string();

for date in dates.iter().cloned() {
let date_count_str = row_iter.next().unwrap();

let count: i64 = date_count_str.parse().unwrap_or_default();

let lat = if lat == "" {
None
} else {
Some(lat.parse().unwrap())
};

let lon = if long == "" {
None
} else {
Some(long.parse().unwrap())
};

let mut record = CovidRecord {
status: status.to_string(),
province_state: province_state.clone(),
state: state.clone(),
county: county.clone(),
city: city.clone(),
country_region: country_region.clone(),
lat,
lon,
date,
count,
};

remap_territories(&mut record);

records.push(record)
}
}

Ok(())
}
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ pub use parquet_writer::*;

mod cleaner;
pub use cleaner::*;

pub mod covid19_data;
pub use covid19_data::*;

pub mod world_population;
pub use world_population::*;
129 changes: 6 additions & 123 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use dracula_covid19::*;
use sentry::integrations::panic::register_panic_handler;
//use dracula_covid19::covid19_data::covid19_data;

const CONFIRMED_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Confirmed.csv";
const DEATHS_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Deaths.csv";
const RECOVERED_URL: &str = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Recovered.csv";

#[tokio::main]
async fn main() -> Result<(), DracErr> {
//#[tokio::main]
//async
fn main() -> Result<(), ()> {
let mut records: Vec<CovidRecord> = Vec::new();

let _guard = sentry::init((
Expand All @@ -19,127 +21,8 @@ async fn main() -> Result<(), DracErr> {

register_panic_handler();

extract_records(CONFIRMED_URL, "confirmed", &mut records)
.await
.unwrap();
extract_records(DEATHS_URL, "deaths", &mut records)
.await
.unwrap();
extract_records(RECOVERED_URL, "recovered", &mut records)
.await
.unwrap();

write_records_to_file("combined.parquet", records);

let bucket = "scientist-datawarehouse".to_string();
let key =
"csse_covid_19_time_series/combined/time_series_19-covid-Combined.parquet".to_string();
let crawler_name = "covid19-combined".to_string();

let key_parts = key.split('/').collect::<Vec<&str>>();
let key_dir = key_parts[0..key_parts.len() - 1].join("/");
let s3_path = format!("s3://{}/{}", bucket, key_dir);

upload_file("combined.parquet", bucket.clone(), key.clone())
.await
.unwrap();
create_crawler(crawler_name.clone(), s3_path).await.unwrap();
start_crawler(crawler_name.clone(), true).await.unwrap();

Ok(())
}

async fn extract_records(
input_url: &str,
status: &str,
records: &mut Vec<CovidRecord>,
) -> Result<(), DracErr> {
let req = reqwest::get(input_url).await?;
let bytes = req.bytes().await?;
let bytes_reader = std::io::Cursor::new(&bytes[..]);

let mut reader = csv::ReaderBuilder::new().from_reader(bytes_reader);

let dates: Vec<chrono::NaiveDateTime> = {
let headers = reader.headers()?;
let mut header_iter = headers.iter();
header_iter.next(); // Province
header_iter.next(); // Country
header_iter.next(); // Lat
header_iter.next(); // Long
header_iter
.map(|date_str| {
// panic!("{}", date_str);
let res = chrono::NaiveDateTime::parse_from_str(
&format!("{} 00:00", date_str),
"%-m/%-d/%y %H:%M",
);
if let Ok(res) = res {
res
} else {
panic!("could not parse `{}`", date_str)
}
})
.collect()
};

for row in reader.records() {
let row = row?;
let mut row_iter = row.iter();

let province_state = row_iter.next().unwrap().to_string();
let province_state = if province_state == "" {
None
} else {
Some(province_state)
};

let country_region = row_iter.next().unwrap().to_string();

let (city, county, state) = if country_region == "US" {
extract_us_data(&province_state.as_ref().unwrap()[..])
} else {
(None, None, None)
};

let lat = row_iter.next().unwrap().to_string();
let long = row_iter.next().unwrap().to_string();

for date in dates.iter().cloned() {
let date_count_str = row_iter.next().unwrap();

let count: i64 = date_count_str.parse().unwrap_or_default();

let lat = if lat == "" {
None
} else {
Some(lat.parse().unwrap())
};

let lon = if long == "" {
None
} else {
Some(long.parse().unwrap())
};

let mut record = CovidRecord {
status: status.to_string(),
province_state: province_state.clone(),
state: state.clone(),
county: county.clone(),
city: city.clone(),
country_region: country_region.clone(),
lat,
lon,
date,
count,
};

remap_territories(&mut record);

records.push(record)
}
}
covid19_data();

world_pop();
Ok(())
}
11 changes: 11 additions & 0 deletions src/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::fs::File;
use std::rc::Rc;

use crate::CovidRecord;
use crate::Population;

pub fn write_records_to_file(path: &str, records: Vec<CovidRecord>) {
let mut parquet_writer = parquet_writer::<CovidRecord>(path).unwrap();
Expand All @@ -30,3 +31,13 @@ pub fn parquet_writer<R: RecordSchema>(

SerializedFileWriter::new(file, Rc::new(schema), props)
}

pub fn write_records_to_file_population(path: &str, records: Vec<Population>) {
let mut parquet_writer = parquet_writer::<Population>(path).unwrap();

let mut row_group = parquet_writer.next_row_group().unwrap();
(&records[..]).write_to_row_group(&mut row_group).unwrap();
parquet_writer.close_row_group(row_group).unwrap();

parquet_writer.close().unwrap();
}
Loading