-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rsc: update read_job transaction hit path #1671
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
6798995
rsc: made read_job transaction smaller
AbrarQuazi 135b673
rsc: added verification logic to hit path in read_job
AbrarQuazi 5f78b59
rsc: fix comment
AbrarQuazi 257128c
rsc: remove unused import
AbrarQuazi 8c26dbd
rsc: get rid of verification queries, actually not needed
AbrarQuazi ed157cd
rsc: made it so that we resolve all blobs in one query
AbrarQuazi 9fd0467
fix clang errors
AbrarQuazi a41823b
trying older version of clang
AbrarQuazi a635a39
using clang version 18.1.3
AbrarQuazi f126081
rsc: chunk ids in query and apply review comments
AbrarQuazi 4849efc
undo clang changes as we pinned version of ubuntu
AbrarQuazi fa77708
missed clang file
AbrarQuazi f6af0e7
rsc: add job_id to error message
AbrarQuazi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,14 @@ use crate::types::{ | |
}; | ||
use axum::Json; | ||
use entity::{job, job_use, output_dir, output_file, output_symlink}; | ||
use entity::prelude::Blob; | ||
use futures::future::join_all; | ||
use hyper::StatusCode; | ||
use rand::{thread_rng, Rng}; | ||
use rsc::database; | ||
use sea_orm::DatabaseTransaction; | ||
use sea_orm::{ | ||
prelude::Uuid, ActiveModelTrait, ActiveValue::*, ColumnTrait, DatabaseConnection, DbErr, | ||
EntityTrait, ModelTrait, QueryFilter, TransactionTrait, | ||
EntityTrait, ModelTrait, QueryFilter, TransactionTrait, ConnectionTrait, | ||
}; | ||
use std::collections::HashMap; | ||
use std::sync::{Arc, RwLock}; | ||
|
@@ -34,26 +35,55 @@ async fn record_miss(hash: String, conn: Arc<DatabaseConnection>) { | |
} | ||
|
||
#[tracing::instrument(skip(db, stores))] | ||
async fn resolve_blob( | ||
id: Uuid, | ||
db: &DatabaseTransaction, | ||
async fn resolve_blobs<T: ConnectionTrait>( | ||
ids: &Vec<Uuid>, | ||
db: &T, | ||
stores: &HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>, | ||
) -> Result<ResolvedBlob, String> { | ||
let Ok(Some(blob)) = entity::prelude::Blob::find_by_id(id).one(db).await else { | ||
return Err(format!("Unable to find blob {} by id", id)); | ||
}; | ||
) -> Result<HashMap<Uuid, ResolvedBlob>, String> { | ||
//Postgres has a 65,535 parameter limit, ensuring we chunk ID's below that: https://www.postgresql.org/docs/current/limits.html | ||
const CHUNK_SIZE: usize = 50_000; | ||
|
||
let Some(store) = stores.get(&blob.store_id) else { | ||
return Err(format!( | ||
"Unable to find backing store {} for blob {}", | ||
blob.store_id, id | ||
)); | ||
}; | ||
let mut resolved_map = HashMap::new(); | ||
|
||
for chunk in ids.chunks(CHUNK_SIZE) { | ||
// Fetch chunked blobs in a single query | ||
let blob_map: HashMap<Uuid, entity::blob::Model> = Blob::find() | ||
.filter(entity::blob::Column::Id.is_in(chunk.to_vec())) | ||
.all(db) | ||
.await | ||
.map_err(|e| format!("Failed to query blobs, database error: {}", e))? | ||
.into_iter() | ||
.map(|b| (b.id, b)) | ||
.collect(); | ||
|
||
return Ok(ResolvedBlob { | ||
id: blob.id, | ||
url: store.download_url(blob.key).await, | ||
}); | ||
// Ensure we have all requested blobs | ||
for &id in chunk { | ||
if !blob_map.contains_key(&id) { | ||
return Err(format!("Unable to find blob {} by id", id)); | ||
} | ||
} | ||
|
||
// Resolve all download URLs in parallel | ||
let futures = blob_map.iter().map(|(id, blob)| { | ||
let store_opt = stores.get(&blob.store_id).cloned(); | ||
let key = blob.key.clone(); | ||
|
||
async move { | ||
let store = store_opt.ok_or_else(|| { | ||
format!("Unable to find backing store {} for blob {}", blob.store_id, id) | ||
})?; | ||
let url = store.download_url(key).await; | ||
Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url })) | ||
} | ||
}); | ||
|
||
let results = join_all(futures).await; | ||
|
||
let partial_map: HashMap<Uuid, ResolvedBlob> = results.into_iter().collect::<Result<_,_>>()?; | ||
resolved_map.extend(partial_map); | ||
} | ||
|
||
Ok(resolved_map) | ||
} | ||
|
||
#[tracing::instrument(skip_all)] | ||
|
@@ -63,152 +93,139 @@ pub async fn read_job( | |
blob_stores: HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>, | ||
) -> (StatusCode, Json<ReadJobResponse>) { | ||
let hash = payload.hash(); | ||
let hash_for_spawns = hash.clone(); | ||
|
||
// TODO: This transaction is quite large with a bunch of "serialized" queries. If read_job | ||
// becomes a bottleneck it should be rewritten such that joining on promises is delayed for as | ||
// long as possible. Another option would be to collect all blob ids ahead of time and make a | ||
// single db query to list them all out instead of a query per blob id. | ||
let result = conn | ||
// Fetch the job and related entities in a single transaction | ||
let fetch_result = conn | ||
.as_ref() | ||
.transaction::<_, (Option<Uuid>, ReadJobResponse), DbErr>(|txn| { | ||
let hash = hash.clone(); | ||
.transaction::<_, Option<(job::Model, Vec<output_file::Model>, Vec<output_symlink::Model>, Vec<output_dir::Model>)>, DbErr>(|txn| { | ||
Box::pin(async move { | ||
let Some(matching_job) = job::Entity::find() | ||
.filter(job::Column::Hash.eq(hash.clone())) | ||
.one(txn) | ||
.await? | ||
else { | ||
tracing::info!(%hash, "Miss"); | ||
return Ok((None, ReadJobResponse::NoMatch)); | ||
return Ok(None); | ||
}; | ||
|
||
let output_files = matching_job.find_related(output_file::Entity).all(txn).await?; | ||
let output_symlinks = matching_job.find_related(output_symlink::Entity).all(txn).await?; | ||
let output_dirs = matching_job.find_related(output_dir::Entity).all(txn).await?; | ||
|
||
tracing::info!(%hash, "Hit"); | ||
let output_files = matching_job | ||
.find_related(output_file::Entity) | ||
.all(txn) | ||
.await? | ||
.into_iter() | ||
.map(|m| { | ||
let stores_copy = blob_stores.clone(); | ||
async move { | ||
let blob = resolve_blob(m.blob_id, txn, &stores_copy).await?; | ||
|
||
Ok(ResolvedBlobFile { | ||
path: m.path, | ||
mode: m.mode, | ||
blob, | ||
}) | ||
} | ||
}); | ||
|
||
let output_files: Result<Vec<ResolvedBlobFile>, String> = | ||
futures::future::join_all(output_files) | ||
.await | ||
.into_iter() | ||
.collect(); | ||
|
||
let output_files = match output_files { | ||
Err(err) => { | ||
tracing::error! {%err, "Failed to resolve all output files. Resolving job as a cache miss."}; | ||
return Ok((None, ReadJobResponse::NoMatch)) | ||
}, | ||
Ok(files) => files, | ||
}; | ||
Ok(Some((matching_job, output_files, output_symlinks, output_dirs))) | ||
}) | ||
}) | ||
.await; | ||
|
||
let hash_copy = hash_for_spawns.clone(); | ||
let Some((matching_job, output_files, output_symlinks, output_dirs)) = fetch_result.ok().flatten() else { | ||
tokio::spawn(async move { | ||
record_miss(hash_copy, conn.clone()).await; | ||
}); | ||
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); | ||
}; | ||
|
||
let output_symlinks = matching_job | ||
.find_related(output_symlink::Entity) | ||
.all(txn) | ||
.await? | ||
.into_iter() | ||
.map(|m| Symlink { | ||
path: m.path, | ||
link: m.link, | ||
}) | ||
.collect(); | ||
|
||
let output_dirs = matching_job | ||
.find_related(output_dir::Entity) | ||
.all(txn) | ||
.await? | ||
.into_iter() | ||
.map(|m| Dir { | ||
path: m.path, | ||
mode: m.mode, | ||
hidden: Some(m.hidden), | ||
}) | ||
.collect(); | ||
|
||
let stdout_blob = match resolve_blob(matching_job.stdout_blob_id, txn, &blob_stores).await { | ||
Err(err) => { | ||
tracing::error! {%err, "Failed to resolve stdout blob. Resolving job as a cache miss."}; | ||
return Ok((None, ReadJobResponse::NoMatch)) | ||
}, | ||
Ok(blob) => blob, | ||
}; | ||
// Collect all the blob IDs we need to resolve | ||
let mut blob_ids: Vec<Uuid> = output_files.iter().map(|f| f.blob_id).collect(); | ||
blob_ids.push(matching_job.stdout_blob_id); | ||
blob_ids.push(matching_job.stderr_blob_id); | ||
|
||
let stderr_blob = match resolve_blob(matching_job.stderr_blob_id, txn, &blob_stores).await { | ||
Err(err) => { | ||
tracing::error! {%err, "Failed to resolve stderr blob. Resolving job as a cache miss."}; | ||
return Ok((None, ReadJobResponse::NoMatch)) | ||
}, | ||
Ok(blob) => blob, | ||
}; | ||
// Resolve all needed blobs in one go | ||
let resolved_blob_map = match resolve_blobs(&blob_ids, conn.as_ref(), &blob_stores).await { | ||
Ok(map) => map, | ||
Err(err) => { | ||
tracing::error!(%err, "Failed to resolve blobs. Resolving job as a cache miss."); | ||
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); | ||
} | ||
}; | ||
|
||
Ok(( | ||
Some(matching_job.id), | ||
ReadJobResponse::Match { | ||
output_symlinks, | ||
output_dirs, | ||
output_files, | ||
stdout_blob, | ||
stderr_blob, | ||
status: matching_job.status, | ||
runtime: matching_job.runtime, | ||
cputime: matching_job.cputime, | ||
memory: matching_job.memory as u64, | ||
ibytes: matching_job.i_bytes as u64, | ||
obytes: matching_job.o_bytes as u64, | ||
}, | ||
)) | ||
// Construct ResolvedBlobFile for each output file | ||
let output_files = output_files | ||
.into_iter() | ||
.map(|m| { | ||
let blob_id = m.blob_id; | ||
let job_id = m.job_id; | ||
let resolved_blob = resolved_blob_map.get(&blob_id).cloned().ok_or_else(|| { | ||
format!("Missing resolved blob for {}, from job_id: {}", blob_id, job_id) | ||
})?; | ||
Ok(ResolvedBlobFile { | ||
path: m.path, | ||
mode: m.mode, | ||
blob: resolved_blob, | ||
}) | ||
}) | ||
.await; | ||
.collect::<Result<Vec<_>, String>>(); | ||
|
||
match result { | ||
Ok((Some(job_id), response)) => { | ||
// If we get a match we want to record the use but we don't | ||
// want to block sending the response on it so we spawn a task | ||
// to go do that. | ||
let mut status = StatusCode::NOT_FOUND; | ||
if let ReadJobResponse::Match { .. } = response { | ||
status = StatusCode::OK; | ||
let shared_conn = conn.clone(); | ||
tokio::spawn(async move { | ||
record_hit(job_id, hash, shared_conn).await; | ||
}); | ||
} | ||
(status, Json(response)) | ||
let output_files = match output_files { | ||
Ok(files) => files, | ||
Err(err) => { | ||
tracing::error!(%err, "Failed to resolve all output files. Resolving job as a cache miss."); | ||
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); | ||
} | ||
Ok((None, _)) => { | ||
let shared_conn = conn.clone(); | ||
tokio::spawn(async move { | ||
record_miss(hash, shared_conn).await; | ||
}); | ||
(StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)) | ||
}; | ||
|
||
// Collect other resolved entities | ||
let output_symlinks: Vec<Symlink> = output_symlinks | ||
.into_iter() | ||
.map(|m| Symlink { | ||
path: m.path, | ||
link: m.link, | ||
}) | ||
.collect(); | ||
|
||
let output_dirs: Vec<Dir> = output_dirs | ||
.into_iter() | ||
.map(|m| Dir { | ||
path: m.path, | ||
mode: m.mode, | ||
hidden: Some(m.hidden), | ||
}) | ||
.collect(); | ||
|
||
// Resolve stdout and stderr blobs from the map | ||
let stdout_blob = match resolved_blob_map.get(&matching_job.stdout_blob_id) { | ||
Some(blob) => blob.clone(), | ||
None => { | ||
tracing::error!("Failed to resolve stdout blob. Resolving job as a cache miss."); | ||
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); | ||
} | ||
Err(cause) => { | ||
tracing::error! { | ||
%cause, | ||
"failed to read job" | ||
}; | ||
( | ||
StatusCode::INTERNAL_SERVER_ERROR, | ||
Json(ReadJobResponse::NoMatch), | ||
) | ||
}; | ||
|
||
let stderr_blob = match resolved_blob_map.get(&matching_job.stderr_blob_id) { | ||
Some(blob) => blob.clone(), | ||
None => { | ||
tracing::error!("Failed to resolve stderr blob. Resolving job as a cache miss."); | ||
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch)); | ||
} | ||
} | ||
}; | ||
|
||
// Construct response | ||
let response = ReadJobResponse::Match { | ||
output_symlinks, | ||
output_dirs, | ||
output_files, | ||
stdout_blob, | ||
stderr_blob, | ||
status: matching_job.status, | ||
runtime: matching_job.runtime, | ||
cputime: matching_job.cputime, | ||
memory: matching_job.memory as u64, | ||
ibytes: matching_job.i_bytes as u64, | ||
obytes: matching_job.o_bytes as u64, | ||
}; | ||
|
||
let job_id = matching_job.id; | ||
let hash_copy = hash_for_spawns.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again it feels like we didn't need to create this separate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
tracing::info!(%hash_copy, "Hit"); | ||
tokio::spawn(async move { | ||
record_hit(job_id, hash_copy, conn.clone()).await; | ||
}); | ||
|
||
(StatusCode::OK, Json(response)) | ||
} | ||
|
||
|
||
#[tracing::instrument(skip_all)] | ||
pub async fn allow_job( | ||
Json(payload): Json<AllowJobPayload>, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't this just
hash.clone()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the reason why I can't use
hash.clone
is because rust complains thathash
is used by the above transaction, and it is moved into the transactions closure because string does not implement the copy constructor. Because of this I need to create 2 clones of hash, one for the transaction, and one for the tokio spawns (there are 2 of them)