Skip to content

Commit

Permalink
improved tracing a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
CommanderStorm committed Jul 11, 2024
1 parent ca90ef5 commit 7c90cb8
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 91 deletions.
20 changes: 8 additions & 12 deletions server/main-api/src/calendar/connectum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sqlx::PgPool;
use tracing::{debug, error, warn};

use crate::calendar::models::Event;
use crate::limited::vec::LimitedVec;

pub(in crate::calendar) struct APIRequestor {
client: reqwest::Client,
Expand Down Expand Up @@ -49,7 +50,6 @@ impl APIRequestor {
#[tracing::instrument]
pub(crate) async fn refresh(&self, id: String) -> Result<(), crate::BoxedError> {
let sync_start = Utc::now();
let start = Instant::now();
let url = format!("https://campus.tum.de/tumonline/co/connectum/api/rooms/{id}/calendars");
let events: Vec<Event> = self
.client
Expand All @@ -60,18 +60,17 @@ impl APIRequestor {
.json()
.await?;
debug!(
"finished fetching for {cnt} calendar events of {id} in {elapsed:?}",
"finished fetching for {cnt} calendar events of {id}",
cnt = events.len(),
elapsed = start.elapsed()
);
let events = events
.into_iter()
.map(|mut e| {
e.room_code.clone_from(&id);
e
})
.collect::<Vec<Event>>();
self.store(&events, &sync_start, &id).await?;
.collect::<LimitedVec<Event>>();
self.store(events, &sync_start, &id).await?;
Ok(())
}
fn should_refresh_token(&self) -> bool {
Expand Down Expand Up @@ -109,13 +108,13 @@ impl APIRequestor {
}

impl APIRequestor {
#[tracing::instrument]
async fn store(
&self,
events: &[Event],
events: LimitedVec<Event>,
last_calendar_scrape_at: &DateTime<Utc>,
id: &str,
) -> Result<(), crate::BoxedError> {
let start = Instant::now();
// insert into db
let mut tx = self.pool.begin().await?;
if let Err(e) = self.delete_events(&mut tx, id).await {
Expand All @@ -124,7 +123,7 @@ impl APIRequestor {
return Err(e.into());
}
let mut failed: Option<(usize, sqlx::Error)> = None;
for event in events {
for event in events.0.iter() {
// conflicts cannot occur because all values for said room were dropped
if let Err(e) = event.store(&mut tx).await {
failed = match failed {
Expand All @@ -148,10 +147,7 @@ impl APIRequestor {
return Err(e.into());
}
tx.commit().await?;
debug!(
"finished inserting into the db for {id} in {elapsed:?}",
elapsed = start.elapsed()
);
debug!("finished inserting into the db for {id}");
Ok(())
}

Expand Down
96 changes: 53 additions & 43 deletions server/main-api/src/calendar/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::time::Duration;

use cached::instant::Instant;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio::time::sleep;
use tracing::{debug, error};

use crate::calendar::connectum::APIRequestor;
use crate::limited::vec::LimitedVec;

const NUMBER_OF_CONCURRENT_SCRAPES: usize = 3;

Expand All @@ -17,20 +18,10 @@ struct LocationKey {
}

#[tracing::instrument(skip(pool))]
pub async fn all_entries(pool: &PgPool) {
if let Err(e) = std::env::var("CONNECTUM_OAUTH_CLIENT_ID") {
error!("Please make sure that CONNECTUM_OAUTH_CLIENT_ID are valid to use calendar features: {e:?}");
return;
}
if let Err(e) = std::env::var("CONNECTUM_OAUTH_CLIENT_SECRET") {
error!("Please make sure that CONNECTUM_OAUTH_CLIENT_SECRET is valid to use calendar features: {e:?}");
return;
}

let mut api = APIRequestor::from(pool);
loop {
let start = Instant::now();
let ids = sqlx::query_as!(LocationKey,r#"
async fn entries_which_need_scraping(
pool: &PgPool,
) -> Result<LimitedVec<LocationKey>, crate::BoxedError> {
let res = sqlx::query_as!(LocationKey,r#"
WITH ENTRIES_TO_SCRAPE AS (SELECT KEY,
CASE WHEN last_calendar_scrape_at IS NULL THEN 100 ELSE 1 END AS priority,
CAST(data -> 'ranking_factors' ->> 'rank_combined' AS INTEGER) AS rank_combined,
Expand All @@ -48,44 +39,63 @@ WHERE would_need_scraping AND can_be_scraped
-- seconds_ago: "how long since we last scraped it?" (range null,30*60/3=600..)
ORDER BY priority * rank_combined + priority * coalesce(seconds_ago/6,1) DESC
LIMIT 30"#)
.fetch_all(pool)
.await;
let mut ids = match ids {
.fetch_all(pool)
.await?;
Ok(LimitedVec::from(res))
}

#[tracing::instrument(skip(pool))]
pub async fn all_entries(pool: &PgPool) {
if let Err(e) = std::env::var("CONNECTUM_OAUTH_CLIENT_ID") {
error!("Please make sure that CONNECTUM_OAUTH_CLIENT_ID are valid to use calendar features: {e:?}");
return;
}
if let Err(e) = std::env::var("CONNECTUM_OAUTH_CLIENT_SECRET") {
error!("Please make sure that CONNECTUM_OAUTH_CLIENT_SECRET is valid to use calendar features: {e:?}");
return;
}

let mut api = APIRequestor::from(pool);
loop {
let ids = match entries_which_need_scraping(pool).await {
Ok(ids) => ids,
Err(e) => {
error!("Could not download get LocationKeys from the database because {e:?}");
continue;
}
};
let len = ids.len();
while let Err(e) = api.try_refresh_token().await {
error!("retrying to get oauth token because {e:?}");
let should_sleep_for_more_results = ids.len() < 20;
if should_sleep_for_more_results {
sleep(Duration::from_secs(60)).await;
}
// we want to scrape all ~2k rooms once per hour
// 1 thread is 15..20 per minute => we need at least 2 threads
// this uses a FuturesUnordered which refills itsself to be able to work effectively with lagging tasks
let mut work_queue = FuturesUnordered::new();
for _ in 0..NUMBER_OF_CONCURRENT_SCRAPES {
if let Some(id) = ids.pop() {
work_queue.push(api.refresh(id.key));
}

request_events(&mut api, ids).await;
}
}

#[tracing::instrument(skip(api))]
async fn request_events(api: &mut APIRequestor, mut ids: LimitedVec<LocationKey>) {
debug!("Downloading {len} room-calendars", len = ids.len());
while let Err(e) = api.try_refresh_token().await {
error!("retrying to get oauth token because {e:?}");
sleep(Duration::from_secs(10)).await;
}
// we want to scrape all ~2k rooms once per hour
// 1 thread is 15..20 per minute => we need at least 2 threads
// this uses a FuturesUnordered which refills itsself to be able to work effectively with lagging tasks
let mut work_queue = FuturesUnordered::new();
for _ in 0..NUMBER_OF_CONCURRENT_SCRAPES {
if let Some(id) = ids.pop() {
work_queue.push(api.refresh(id.key));
}
}

while let Some(res) = work_queue.next().await {
if let Err(e) = res {
error!("Could not download calendar because {e:?}");
}
if let Some(id) = ids.pop() {
work_queue.push(api.refresh(id.key));
}
while let Some(res) = work_queue.next().await {
if let Err(e) = res {
error!("Could not download calendar because {e:?}");
}
debug!(
"Downloaded {len} room-calendars took {elapsed:?}",
elapsed = start.elapsed()
);
let should_sleep_for_more_results = len < 20;
if should_sleep_for_more_results {
tokio::time::sleep(Duration::from_secs(60)).await;
if let Some(id) = ids.pop() {
work_queue.push(api.refresh(id.key));
}
}
}
6 changes: 5 additions & 1 deletion server/main-api/src/limited/vec.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use serde::{Deserialize, Serialize};
use std::fmt;
use std::vec::IntoIter;

use serde::{Deserialize, Serialize};

use crate::limited::OrMore;

#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
Expand Down Expand Up @@ -29,6 +30,9 @@ impl<T> LimitedVec<T> {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn pop(&mut self) -> Option<T> {
self.0.pop()
}
}

impl<T> From<Vec<T>> for LimitedVec<T> {
Expand Down
2 changes: 1 addition & 1 deletion server/main-api/src/maps/fetch_tile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl FetchTileTask {
Ok(bytes) => match image::load_from_memory(&bytes.0) {
Ok(img) => Some((self.index, img)),
Err(e) => {
error!("Error while parsing image: {e:#?} for {self:?}");
error!("Error while parsing image: {e:?} for {self:?}");
None
}
},
Expand Down
17 changes: 5 additions & 12 deletions server/main-api/src/maps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use image::{ImageBuffer, Rgba};
use serde::Deserialize;
use sqlx::Error::RowNotFound;
use sqlx::PgPool;
use tokio::time::Instant;
use tracing::{debug, error, warn};
use tracing::{error, warn};
use unicode_truncate::UnicodeTruncateStr;

use crate::limited::vec::LimitedVec;
Expand Down Expand Up @@ -66,25 +65,23 @@ async fn construct_image_from_data(
data: Location,
format: PreviewFormat,
) -> Option<LimitedVec<u8>> {
let start_time = Instant::now();
let mut img = match format {
PreviewFormat::OpenGraph => image::RgbaImage::new(1200, 630),
PreviewFormat::Square => image::RgbaImage::new(1200, 1200),
};

// add the map
if !OverlayMapTask::with(&data).draw_onto(&mut img).await {
if !OverlayMapTask::from(&data).draw_onto(&mut img).await {
return None;
}
debug!("map draw {:?}", start_time.elapsed());
draw_pin(&mut img);

draw_bottom(&data, &mut img);
debug!("overlay finish {:?}", start_time.elapsed());
Some(wrap_image_in_response(&img))
}

/// add the location pin image to the center
#[tracing::instrument(skip(img),level = tracing::Level::DEBUG, )]
fn draw_pin(img: &mut ImageBuffer<Rgba<u8>, Vec<u8>>) {
let pin = image::load_from_memory(include_bytes!("static/pin.png")).unwrap();
image::imageops::overlay(
Expand All @@ -101,6 +98,8 @@ fn wrap_image_in_response(img: &image::RgbaImage) -> LimitedVec<u8> {
LimitedVec(w.into_inner())
}
const WHITE_PIXEL: Rgba<u8> = Rgba([255, 255, 255, 255]);

#[tracing::instrument(skip(img),level = tracing::Level::DEBUG)]
fn draw_bottom(data: &Location, img: &mut image::RgbaImage) {
// draw background white
for x in 0..img.width() {
Expand Down Expand Up @@ -197,7 +196,6 @@ pub async fn maps_handler(
web::Query(args): web::Query<QueryArgs>,
data: web::Data<crate::AppData>,
) -> HttpResponse {
let start_time = Instant::now();
let id = params
.into_inner()
.replace(|c: char| c.is_whitespace() || c.is_control(), "");
Expand All @@ -215,10 +213,5 @@ pub async fn maps_handler(
let img = construct_image_from_data(data, args.format)
.await
.unwrap_or_else(load_default_image);

debug!(
"Preview Generation for {id} took {elapsed:?}",
elapsed = start_time.elapsed()
);
HttpResponse::Ok().content_type("image/png").body(img.0)
}
31 changes: 17 additions & 14 deletions server/main-api/src/maps/overlay_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,8 @@ pub struct OverlayMapTask {
pub z: u32,
}

impl fmt::Debug for OverlayMapTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("OverlayMapTask")
.field(&self.x)
.field(&self.y)
.field(&self.z)
.finish()
}
}

const POSSIBLE_INDEX_RANGE: Range<u32> = 0..7;

impl OverlayMapTask {
pub fn with(entry: &Location) -> Self {
impl From<&Location> for OverlayMapTask {
fn from(entry: &Location) -> Self {
let zoom = match entry.r#type.as_str() {
"campus" => 14,
"area" | "site" => 15,
Expand All @@ -40,6 +28,21 @@ impl OverlayMapTask {
let (x, y, z) = lat_lon_z_to_xyz(entry.lat, entry.lon, zoom);
Self { x, y, z }
}
}

impl fmt::Debug for OverlayMapTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("OverlayMapTask")
.field(&self.x)
.field(&self.y)
.field(&self.z)
.finish()
}
}

const POSSIBLE_INDEX_RANGE: Range<u32> = 0..7;

impl OverlayMapTask {
#[tracing::instrument(skip(img))]
pub async fn draw_onto(&self, img: &mut image::RgbaImage) -> bool {
// coordinate system is centered around the center of the image
Expand Down
11 changes: 3 additions & 8 deletions server/main-api/src/setup/meilisearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,12 @@ pub async fn setup(client: &Client) -> Result<(), crate::BoxedError> {
.wait_for_completion(client, POLLING_RATE, TIMEOUT)
.await?;
if let Task::Failed { content } = res {
panic!("Failed to add settings to Meilisearch: {content:#?}");
panic!("Failed to add settings to Meilisearch: {content:?}");
}
Ok(())
}
#[tracing::instrument(skip(client))]
pub async fn load_data(client: &Client) -> Result<(), crate::BoxedError> {
let start = std::time::Instant::now();
let entries = client.index("entries");
let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://nav.tum.de/cdn".to_string());
let documents = reqwest::get(format!("{cdn_url}/search_data.json"))
Expand All @@ -114,13 +113,9 @@ pub async fn load_data(client: &Client) -> Result<(), crate::BoxedError> {
.wait_for_completion(client, POLLING_RATE, TIMEOUT)
.await?;
if let Task::Failed { content } = res {
panic!("Failed to add documents to Meilisearch: {content:#?}");
panic!("Failed to add documents to Meilisearch: {content:?}");
}

info!(
"{cnt} documents added in {elapsed:?}",
elapsed = start.elapsed(),
cnt = documents.len()
);
info!("{cnt} documents added", cnt = documents.len());
Ok(())
}

0 comments on commit 7c90cb8

Please sign in to comment.