Skip to content

Commit

Permalink
reverted the data update to use json again
Browse files Browse the repository at this point in the history
  • Loading branch information
CommanderStorm committed Aug 4, 2024
1 parent fe6360d commit 91c5110
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 35 deletions.
3 changes: 2 additions & 1 deletion server/main-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ actix-governor = { version = "0.5.0", features = ["logger"] }
tempfile = "3.10.1"
base64 = "0.22.1"
time = "0.3.36"
polars = { git = "https://github.com/CommanderStorm/polars.git", branch = "serialisation-experiment", features = ["parquet", "serde", "dtype-full"] }
polars = { version = "0.41.3", features = ["parquet", "dtype-struct"] }
#polars = { git = "https://github.com/CommanderStorm/polars.git", branch = "serialisation-experiment", features = ["parquet", "serde", "dtype-full"] }

[dev-dependencies]
insta = { version = "1.39.0", features = ["yaml", "json", "redactions"] }
Expand Down
47 changes: 16 additions & 31 deletions server/main-api/src/setup/database/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,19 @@ impl DelocalisedValues {
}
}
#[tracing::instrument]
pub async fn download_updates() -> Result<LimitedVec<DelocalisedValues>, crate::BoxedError> {
pub async fn download_updates(
keys_which_need_updating: &LimitedVec<String>,
) -> Result<LimitedVec<DelocalisedValues>, crate::BoxedError> {
let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://nav.tum.de/cdn".to_string());
let body = reqwest::get(format!("{cdn_url}/api_data.parquet"))
let tasks = reqwest::get(format!("{cdn_url}/api_data.json"))
.await?
.error_for_status()?
.bytes()
.await?;
let mut file = tempfile()?;
file.write_all(&body)?;
let df = ParquetReader::new(&mut file).finish().unwrap();
let mut vals = Vec::<DelocalisedValues>::new();
let col_names = df.get_column_names().clone();
for index in 0..df.get_columns()[0].len() {
let row = df.get_row(index)?;
let mut hm = HashMap::new();
for (i, a) in row.0.into_iter().enumerate() {
let v = serde_json::to_value(a)?;
hm.insert(col_names[i].to_string(), v);
}
vals.push(DelocalisedValues::from(hm));
}
Ok(LimitedVec(vals))
.json::<Vec<HashMap<String, Value>>>()
.await?
.into_iter()
.map(DelocalisedValues::from)
.filter(|d| keys_which_need_updating.0.contains(&d.key))
.collect::<LimitedVec<DelocalisedValues>>();
Ok(tasks)
}
#[tracing::instrument(skip(tx))]
pub(super) async fn load_all_to_db(
Expand All @@ -159,7 +150,7 @@ pub(super) async fn load_all_to_db(
Ok(())
}
#[tracing::instrument]
pub async fn download_status() -> Result<LimitedVec<(String, i64)>, crate::BoxedError> {
pub async fn download_status() -> Result<(LimitedVec<String>,LimitedVec<i64>), crate::BoxedError> {
let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://nav.tum.de/cdn".to_string());
let body = reqwest::get(format!("{cdn_url}/status_data.parquet"))
.await?
Expand All @@ -170,14 +161,8 @@ pub async fn download_status() -> Result<LimitedVec<(String, i64)>, crate::Boxed
file.write_all(&body)?;
let df = ParquetReader::new(&mut file).finish().unwrap();
let id_col = Vec::from(df.column("id")?.str()?);
let hash_col = Vec::from(df.column("id")?.i64()?);
let tasks = id_col
.into_iter()
.zip(hash_col)
.flat_map(|(id, hash)| match (id, hash) {
(Some(id), Some(hash)) => Some((id.to_string(), hash)),
_ => None,
})
.collect();
Ok(LimitedVec(tasks))
let id_col=id_col.into_iter().filter_map(|s| s.map(String::from)).collect();
let hash_col = Vec::from(df.column("hash")?.i64()?);
let hash_col = hash_col.into_iter().flatten().collect();
Ok((LimitedVec(id_col), LimitedVec(hash_col)))
}
15 changes: 12 additions & 3 deletions server/main-api/src/setup/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tracing::{debug, debug_span, info};
use tracing::{debug, debug_span, info, info_span};

use crate::limited::vec::LimitedVec;

Expand All @@ -14,13 +14,22 @@ pub async fn setup(pool: &sqlx::PgPool) -> Result<(), crate::BoxedError> {
}
#[tracing::instrument(skip(pool))]
pub async fn load_data(pool: &sqlx::PgPool) -> Result<(), crate::BoxedError> {
let (new_keys,new_hashes) = data::download_status().await?;
{
let data = data::download_updates().await?;
let _ = info_span!("deleting old data").enter();
let mut tx = pool.begin().await?;
cleanup_deleted(&new_keys, &mut tx).await?;
tx.commit().await?;
}
let keys_which_need_updating =
find_keys_which_need_updating(pool, &new_keys, &new_hashes).await?;
if !keys_which_need_updating.is_empty() {
let _ = info_span!("loading changed data").enter();
let data = data::download_updates(&keys_which_need_updating).await?;
let mut tx = pool.begin().await?;
data::load_all_to_db(data, &mut tx).await?;
tx.commit().await?;
}

{
let aliases = alias::download_updates().await?;
let mut tx = pool.begin().await?;
Expand Down

0 comments on commit 91c5110

Please sign in to comment.