Skip to content

Commit

Permalink
Made sure that Meilisearch initalisation does not have gaps (#1279)
Browse files Browse the repository at this point in the history
implemented transparent-downtime ms-updates
  • Loading branch information
CommanderStorm authored Jul 3, 2024
1 parent 203d929 commit 55a1a8d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 39 deletions.
9 changes: 6 additions & 3 deletions server/main-api/src/calendar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ pub async fn calendar_handler(
Ok(ids) => ids,
Err(e) => return e,
};
let locations = match get_locations(&data.db, &ids).await {
let locations = match get_locations(&data.pool, &ids).await {
Ok(l) => l.0,
Err(e) => return e,
};
if let Err(e) = validate_locations(&ids, &locations) {
return e;
}
match get_from_db(&data.db, &locations, &args.start_after, &args.end_before).await {
match get_from_db(&data.pool, &locations, &args.start_after, &args.end_before).await {
Ok(events) => HttpResponse::Ok().json(events),
Err(e) => {
error!("could not get entries from the db for {ids:?} because {e:?}");
Expand Down Expand Up @@ -133,6 +133,8 @@ async fn get_from_db(

#[cfg(test)]
mod tests {
use std::sync::Arc;

use actix_web::http::header::ContentType;
use actix_web::test;
use actix_web::App;
Expand Down Expand Up @@ -278,7 +280,8 @@ mod tests {
let app = test::init_service(
App::new()
.app_data(web::Data::new(AppData {
db: pg.pool.clone(),
pool: pg.pool.clone(),
meilisearch_initialised: Arc::new(Default::default()),
}))
.service(calendar_handler),
)
Expand Down
6 changes: 3 additions & 3 deletions server/main-api/src/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ pub async fn get_handler(
let id = params
.into_inner()
.replace(|c: char| c.is_whitespace() || c.is_control(), "");
let Some((probable_id, redirect_url)) = get_alias_and_redirect(&data.db, &id).await else {
let Some((probable_id, redirect_url)) = get_alias_and_redirect(&data.pool, &id).await else {
return HttpResponse::NotFound().body("Not found");
};
let result = if args.should_use_english() {
sqlx::query_scalar!("SELECT data FROM en WHERE key = $1", probable_id)
.fetch_optional(&data.db)
.fetch_optional(&data.pool)
.await
} else {
sqlx::query_scalar!("SELECT data FROM de WHERE key = $1", probable_id)
.fetch_optional(&data.db)
.fetch_optional(&data.pool)
.await
};
match result {
Expand Down
89 changes: 59 additions & 30 deletions server/main-api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;

use actix_cors::Cors;
use actix_web::{get, middleware, web, App, HttpResponse, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use actix_web_prom::{PrometheusMetrics, PrometheusMetricsBuilder};
use meilisearch_sdk::client::Client;
use sentry::SessionMode;
use sqlx::postgres::PgPoolOptions;
use sqlx::prelude::*;
use sqlx::PgPool;
use tracing::{debug, debug_span, error, info};
use sqlx::{PgPool, Pool, Postgres};
use tokio::sync::{Barrier, RwLock};
use tracing::{debug_span, error, info};
use tracing_actix_web::TracingLogger;

mod calendar;
Expand All @@ -27,9 +29,25 @@ type BoxedError = Box<dyn Error + Send + Sync>;

const MAX_JSON_PAYLOAD: usize = 1024 * 1024; // 1 MB

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct AppData {
db: PgPool,
/// shared [sqlx::PgPool] to connect to postgres
pool: PgPool,
/// necessary, as otherwise we could return empty results during initialisation
meilisearch_initialised: Arc<RwLock<()>>,
}

impl AppData {
async fn new() -> Self {
let pool = PgPoolOptions::new()
.connect(&connection_string())
.await
.expect("make sure that postgres is running in the background");
AppData {
pool,
meilisearch_initialised: Arc::new(Default::default()),
}
}
}

#[get("/api/status")]
Expand All @@ -38,7 +56,7 @@ async fn health_status_handler(data: web::Data<AppData>) -> HttpResponse {
Some(hash) => format!("https://github.com/TUM-Dev/navigatum/tree/{hash}"),
None => "unknown commit hash, probably running in development".to_string(),
};
return match data.db.execute("SELECT 1").await {
return match data.pool.execute("SELECT 1").await {
Ok(_) => HttpResponse::Ok()
.content_type("text/plain")
.body(format!("healthy\nsource_code: {github_link}")),
Expand Down Expand Up @@ -103,20 +121,23 @@ fn main() -> Result<(), BoxedError> {
actix_web::rt::System::new().block_on(async { run().await })?;
Ok(())
}
async fn run_maintenance_work() {
let pool = PgPoolOptions::new()
.connect(&connection_string())
.await
.expect("make sure that postgres is running in the background");
async fn run_maintenance_work(
pool: Pool<Postgres>,
meilisearch_initalised: Arc<RwLock<()>>,
initalisation_started: Arc<Barrier>,
) {
if std::env::var("SKIP_MS_SETUP") != Ok("true".to_string()) {
let _ = debug_span!("updating meilisearch data").enter();
let _ = meilisearch_initalised.write().await;
initalisation_started.wait().await;
let ms_url =
std::env::var("MIELI_URL").unwrap_or_else(|_| "http://localhost:7700".to_string());
let client = Client::new(ms_url, std::env::var("MEILI_MASTER_KEY").ok()).unwrap();
setup::meilisearch::setup(&client).await.unwrap();
setup::meilisearch::load_data(&client).await.unwrap();
} else {
info!("skipping the database setup as SKIP_MS_SETUP=true");
initalisation_started.wait().await;
}
if std::env::var("SKIP_DB_SETUP") != Ok("true".to_string()) {
let _ = debug_span!("updating postgres data").enter();
Expand All @@ -130,25 +151,19 @@ async fn run_maintenance_work() {

/// we split main and run because otherwise sentry could not be properly instrumented
async fn run() -> Result<(), BoxedError> {
let maintenance_thread = tokio::spawn(run_maintenance_work());
let data = AppData::new().await;

debug!("setting up metrics");
let labels = HashMap::from([(
"revision".to_string(),
option_env!("GIT_COMMIT_SHA")
.unwrap_or_else(|| "development")
.to_string(),
)]);
let prometheus = PrometheusMetricsBuilder::new("navigatum_mainapi")
.endpoint("/api/main/metrics")
.const_labels(labels)
.build()
.unwrap();
let pool = PgPoolOptions::new()
.connect(&connection_string())
.await
.expect("make sure that postgres is running in the background");
let shutdown_pool_clone = pool.clone();
// without this barrier an external client might race the RWLock for meilisearch_initialised and gain the read lock before it is allowed
let initialisation_started = Arc::new(Barrier::new(2));
let maintenance_thread = tokio::spawn(run_maintenance_work(
data.pool.clone(),
data.meilisearch_initialised.clone(),
initialisation_started.clone(),
));

let prometheus = build_metrics().expect("specified metrics are valid");
let shutdown_pool_clone = data.pool.clone();
initialisation_started.wait().await;
info!("running the server");
HttpServer::new(move || {
let cors = Cors::default()
Expand All @@ -164,7 +179,7 @@ async fn run() -> Result<(), BoxedError> {
.wrap(middleware::Compress::default())
.wrap(sentry_actix::Sentry::new())
.app_data(web::JsonConfig::default().limit(MAX_JSON_PAYLOAD))
.app_data(web::Data::new(AppData { db: pool.clone() }))
.app_data(web::Data::new(data.clone()))
.service(health_status_handler)
.service(calendar::calendar_handler)
.service(web::scope("/api/preview").configure(maps::configure))
Expand All @@ -179,3 +194,17 @@ async fn run() -> Result<(), BoxedError> {
shutdown_pool_clone.close().await;
Ok(())
}

#[tracing::instrument]
fn build_metrics() -> Result<PrometheusMetrics, BoxedError> {
let labels = HashMap::from([(
"revision".to_string(),
option_env!("GIT_COMMIT_SHA")
.unwrap_or_else(|| "development")
.to_string(),
)]);
PrometheusMetricsBuilder::new("navigatum_api")
.endpoint("/api/metrics")
.const_labels(labels)
.build()
}
4 changes: 2 additions & 2 deletions server/main-api/src/maps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ pub async fn maps_handler(
let id = params
.into_inner()
.replace(|c: char| c.is_whitespace() || c.is_control(), "");
if let Some(redirect_url) = get_possible_redirect_url(&data.db, &id, &args).await {
if let Some(redirect_url) = get_possible_redirect_url(&data.pool, &id, &args).await {
let mut res = HttpResponse::PermanentRedirect();
res.insert_header((LOCATION, redirect_url));
return res.finish();
}
let data = match get_localised_data(&data.db, &id, args.lang.should_use_english()).await {
let data = match get_localised_data(&data.pool, &id, args.lang.should_use_english()).await {
Ok(data) => data,
Err(e) => {
return e;
Expand Down
7 changes: 6 additions & 1 deletion server/main-api/src/search/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Instant;

use crate::AppData;
use actix_web::{get, web, HttpResponse};
use cached::proc_macro::cached;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -78,8 +79,12 @@ impl From<&SearchQueryArgs> for Highlighting {
}
}
#[get("/api/search")]
pub async fn search_handler(web::Query(args): web::Query<SearchQueryArgs>) -> HttpResponse {
pub async fn search_handler(
data: web::Data<AppData>,
web::Query(args): web::Query<SearchQueryArgs>,
) -> HttpResponse {
let start_time = Instant::now();
let _ = data.meilisearch_initialised.read().await; // otherwise we could return empty results during initialisation

let limits = Limits::from(&args);
let highlighting = Highlighting::from(&args);
Expand Down

0 comments on commit 55a1a8d

Please sign in to comment.