Skip to content

Commit

Permalink
more parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
CommanderStorm committed Jul 29, 2024
1 parent 69a3f33 commit 4923dff
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
2 changes: 2 additions & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/main-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ actix-governor = { version = "0.5.0", features = ["logger"] }
tempfile = "3.10.1"
base64 = "0.22.1"
time = "0.3.36"
polars = { version = "0.41.3", features = ["lazy", "parquet"] }
polars = { version = "0.41.3", features = ["lazy", "parquet", "serde"] }

[dev-dependencies]
insta = { version = "1.39.0", features = ["yaml", "json", "redactions"] }
Expand Down
2 changes: 1 addition & 1 deletion server/main-api/src/setup/database/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn download_updates() -> Result<LimitedVec<Alias>, crate::BoxedError>
file.write_all(&body)?;
let df = ParquetReader::new(&mut file)
.with_columns(Some(vec!["id".to_string(),"type".to_string(),"visible_id".to_string(),"aliases".to_string()]))
.finish()?;
.finish().unwrap();
let id_col=df.column("id")?.str()?;
let type_col=df.column("type")?.str()?;
let visible_id_col=df.column("visible_id")?.str()?;
Expand Down
27 changes: 19 additions & 8 deletions server/main-api/src/setup/database/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,25 @@ impl DelocalisedValues {
#[tracing::instrument]
pub async fn download_updates() -> Result<LimitedVec<DelocalisedValues>, crate::BoxedError> {
let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://nav.tum.de/cdn".to_string());
let tasks = reqwest::get(format!("{cdn_url}/api_data.parquet"))
let body = reqwest::get(format!("{cdn_url}/api_data.parquet"))
.await?
.json::<Vec<HashMap<String, Value>>>()
.await?
.into_iter()
.map(DelocalisedValues::from)
.collect::<LimitedVec<DelocalisedValues>>();
Ok(tasks)
.bytes()
.await?;
let mut file = tempfile()?;
file.write_all(&body)?;
let df = ParquetReader::new(&mut file).finish().unwrap();
let mut vals = Vec::<DelocalisedValues>::new();
let col_names=df.get_column_names().clone();
for index in 0..df.get_columns()[0].len() {
let row=df.get_row(index)?;
let mut hm=HashMap::new();
for (i,a) in row.0.into_iter().enumerate(){
let v=serde_json::to_value(a)?;
hm.insert(col_names[i].to_string(),v);
}
vals.push(DelocalisedValues::from(hm));
}
Ok(LimitedVec(vals))
}
#[tracing::instrument(skip(tx))]
pub(super) async fn load_all_to_db(
Expand All @@ -155,7 +166,7 @@ pub async fn download_status() -> Result<LimitedVec<(String, i64)>, crate::Boxed
.await?;
let mut file = tempfile()?;
file.write_all(&body)?;
let df = ParquetReader::new(&mut file).finish()?;
let df = ParquetReader::new(&mut file).finish().unwrap();
let id_col=Vec::from(df.column("id")?.str()?);
let hash_col=Vec::from(df.column("id")?.i64()?);
let tasks=id_col.into_iter().zip(hash_col).flat_map(|(id,hash)| match (id,hash) {
Expand Down

0 comments on commit 4923dff

Please sign in to comment.