diff --git a/server/main-api/Cargo.toml b/server/main-api/Cargo.toml index 6500d5d81..30cbc7e05 100644 --- a/server/main-api/Cargo.toml +++ b/server/main-api/Cargo.toml @@ -69,7 +69,8 @@ actix-governor = { version = "0.5.0", features = ["logger"] } tempfile = "3.10.1" base64 = "0.22.1" time = "0.3.36" -polars = { git = "https://github.com/CommanderStorm/polars.git", branch = "serialisation-experiment", features = ["parquet", "serde", "dtype-full"] } +polars = { version = "0.41.3", features = ["parquet", "dtype-struct"] } +#polars = { git = "https://github.com/CommanderStorm/polars.git", branch = "serialisation-experiment", features = ["parquet", "serde", "dtype-full"] } [dev-dependencies] insta = { version = "1.39.0", features = ["yaml", "json", "redactions"] } diff --git a/server/main-api/src/setup/database/data.rs b/server/main-api/src/setup/database/data.rs index ca8258ee2..efc9754db 100644 --- a/server/main-api/src/setup/database/data.rs +++ b/server/main-api/src/setup/database/data.rs @@ -125,28 +125,19 @@ impl DelocalisedValues { } } #[tracing::instrument] -pub async fn download_updates() -> Result, crate::BoxedError> { +pub async fn download_updates( + keys_which_need_updating: &LimitedVec, +) -> Result, crate::BoxedError> { let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://nav.tum.de/cdn".to_string()); - let body = reqwest::get(format!("{cdn_url}/api_data.parquet")) + let tasks = reqwest::get(format!("{cdn_url}/api_data.json")) .await? - .error_for_status()? - .bytes() - .await?; - let mut file = tempfile()?; - file.write_all(&body)?; - let df = ParquetReader::new(&mut file).finish().unwrap(); - let mut vals = Vec::::new(); - let col_names = df.get_column_names().clone(); - for index in 0..df.get_columns()[0].len() { - let row = df.get_row(index)?; - let mut hm = HashMap::new(); - for (i, a) in row.0.into_iter().enumerate() { - let v = serde_json::to_value(a)?; - hm.insert(col_names[i].to_string(), v); - } - vals.push(DelocalisedValues::from(hm)); - } - Ok(LimitedVec(vals)) + .json::>>() + .await? + .into_iter() + .map(DelocalisedValues::from) + .filter(|d| keys_which_need_updating.0.contains(&d.key)) + .collect::>(); + Ok(tasks) } #[tracing::instrument(skip(tx))] pub(super) async fn load_all_to_db( @@ -159,7 +150,7 @@ pub(super) async fn load_all_to_db( Ok(()) } #[tracing::instrument] -pub async fn download_status() -> Result, crate::BoxedError> { +pub async fn download_status() -> Result<(LimitedVec,LimitedVec), crate::BoxedError> { let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://nav.tum.de/cdn".to_string()); let body = reqwest::get(format!("{cdn_url}/status_data.parquet")) .await? @@ -170,14 +161,8 @@ pub async fn download_status() -> Result, crate::Boxed file.write_all(&body)?; let df = ParquetReader::new(&mut file).finish().unwrap(); let id_col = Vec::from(df.column("id")?.str()?); - let hash_col = Vec::from(df.column("id")?.i64()?); - let tasks = id_col - .into_iter() - .zip(hash_col) - .flat_map(|(id, hash)| match (id, hash) { - (Some(id), Some(hash)) => Some((id.to_string(), hash)), - _ => None, - }) - .collect(); - Ok(LimitedVec(tasks)) + let id_col=id_col.into_iter().filter_map(|s| s.map(String::from)).collect(); + let hash_col = Vec::from(df.column("hash")?.i64()?); + let hash_col = hash_col.into_iter().flatten().collect(); + Ok((LimitedVec(id_col), LimitedVec(hash_col))) } diff --git a/server/main-api/src/setup/database/mod.rs b/server/main-api/src/setup/database/mod.rs index 5d92ebedb..9f18d4439 100644 --- a/server/main-api/src/setup/database/mod.rs +++ b/server/main-api/src/setup/database/mod.rs @@ -1,4 +1,4 @@ -use tracing::{debug, debug_span, info}; +use tracing::{debug, debug_span, info, info_span}; use crate::limited::vec::LimitedVec; @@ -14,13 +14,22 @@ pub async fn setup(pool: &sqlx::PgPool) -> Result<(), crate::BoxedError> { } #[tracing::instrument(skip(pool))] pub async fn load_data(pool: &sqlx::PgPool) -> Result<(), crate::BoxedError> { + let (new_keys,new_hashes) = data::download_status().await?; { - let data = data::download_updates().await?; + let _ = info_span!("deleting old data").enter(); + let mut tx = pool.begin().await?; + cleanup_deleted(&new_keys, &mut tx).await?; + tx.commit().await?; + } + let keys_which_need_updating = + find_keys_which_need_updating(pool, &new_keys, &new_hashes).await?; + if !keys_which_need_updating.is_empty() { + let _ = info_span!("loading changed data").enter(); + let data = data::download_updates(&keys_which_need_updating).await?; let mut tx = pool.begin().await?; data::load_all_to_db(data, &mut tx).await?; tx.commit().await?; } - { let aliases = alias::download_updates().await?; let mut tx = pool.begin().await?;