From 4923dff268c19558357232587b93d5784bedb937 Mon Sep 17 00:00:00 2001 From: Frank Elsinga Date: Mon, 29 Jul 2024 13:56:09 +0200 Subject: [PATCH] more parquet --- server/Cargo.lock | 2 ++ server/main-api/Cargo.toml | 2 +- server/main-api/src/setup/database/alias.rs | 2 +- server/main-api/src/setup/database/data.rs | 27 +++++++++++++++------ 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/server/Cargo.lock b/server/Cargo.lock index 760b97638..e1b9a0ef9 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -3439,6 +3439,7 @@ dependencies = [ "rand_distr", "rayon", "regex", + "serde", "smartstring", "thiserror", "version_check", @@ -4930,6 +4931,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" dependencies = [ "autocfg", + "serde", "static_assertions", "version_check", ] diff --git a/server/main-api/Cargo.toml b/server/main-api/Cargo.toml index 8bb83a9a6..867c13a42 100644 --- a/server/main-api/Cargo.toml +++ b/server/main-api/Cargo.toml @@ -69,7 +69,7 @@ actix-governor = { version = "0.5.0", features = ["logger"] } tempfile = "3.10.1" base64 = "0.22.1" time = "0.3.36" -polars = { version = "0.41.3", features = ["lazy", "parquet"] } +polars = { version = "0.41.3", features = ["lazy", "parquet", "serde"] } [dev-dependencies] insta = { version = "1.39.0", features = ["yaml", "json", "redactions"] } diff --git a/server/main-api/src/setup/database/alias.rs b/server/main-api/src/setup/database/alias.rs index 08efff366..d605295ad 100644 --- a/server/main-api/src/setup/database/alias.rs +++ b/server/main-api/src/setup/database/alias.rs @@ -44,7 +44,7 @@ pub async fn download_updates() -> Result, crate::BoxedError> file.write_all(&body)?; let df = ParquetReader::new(&mut file) .with_columns(Some(vec!["id".to_string(),"type".to_string(),"visible_id".to_string(),"aliases".to_string()])) - .finish()?; + .finish().unwrap(); let id_col=df.column("id")?.str()?; let type_col=df.column("type")?.str()?; let visible_id_col=df.column("visible_id")?.str()?; diff --git a/server/main-api/src/setup/database/data.rs b/server/main-api/src/setup/database/data.rs index de453aa09..fcd8c7306 100644 --- a/server/main-api/src/setup/database/data.rs +++ b/server/main-api/src/setup/database/data.rs @@ -127,14 +127,25 @@ impl DelocalisedValues { #[tracing::instrument] pub async fn download_updates() -> Result, crate::BoxedError> { let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://nav.tum.de/cdn".to_string()); - let tasks = reqwest::get(format!("{cdn_url}/api_data.parquet")) + let body = reqwest::get(format!("{cdn_url}/api_data.parquet")) .await? - .json::>>() - .await? - .into_iter() - .map(DelocalisedValues::from) - .collect::>(); - Ok(tasks) + .bytes() + .await?; + let mut file = tempfile()?; + file.write_all(&body)?; + let df = ParquetReader::new(&mut file).finish().unwrap(); + let mut vals = Vec::::new(); + let col_names=df.get_column_names().clone(); + for index in 0..df.get_columns()[0].len() { + let row=df.get_row(index)?; + let mut hm=HashMap::new(); + for (i,a) in row.0.into_iter().enumerate(){ + let v=serde_json::to_value(a)?; + hm.insert(col_names[i].to_string(),v); + } + vals.push(DelocalisedValues::from(hm)); + } + Ok(LimitedVec(vals)) } #[tracing::instrument(skip(tx))] pub(super) async fn load_all_to_db( @@ -155,7 +166,7 @@ pub async fn download_status() -> Result, crate::Boxed .await?; let mut file = tempfile()?; file.write_all(&body)?; - let df = ParquetReader::new(&mut file).finish()?; + let df = ParquetReader::new(&mut file).finish().unwrap(); let id_col=Vec::from(df.column("id")?.str()?); let hash_col=Vec::from(df.column("id")?.i64()?); let tasks=id_col.into_iter().zip(hash_col).flat_map(|(id,hash)| match (id,hash) {