Skip to content

Commit

Permalink
Manage existing but incomplete uploads by deleting them and restartin…
Browse files Browse the repository at this point in the history
…g when uploading the same file, split out the delete service from the endpoint to reuse elsewhere
  • Loading branch information
evanjt committed Oct 29, 2024
1 parent b22ddb7 commit 639bf60
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 73 deletions.
60 changes: 34 additions & 26 deletions src/external/tus/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use crate::submissions::db as SubmissionDB;
use crate::uploads::associations::db as AssociationDB;
use crate::uploads::db as InputObjectDB;
use anyhow::Result;
use aws_sdk_s3::Client as S3Client;
use chrono::Utc;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, ModelTrait, QueryFilter, Set};
use std::sync::Arc;
use uuid::Uuid;

pub(super) async fn handle_pre_create(
db: DatabaseConnection,
s3: Arc<S3Client>,
payload: EventPayload,
) -> Result<PreCreateResponse> {
println!("Handling pre-create");
Expand All @@ -28,13 +31,6 @@ pub(super) async fn handle_pre_create(
println!("Filename: {}, Filetype: {}", filename, filetype);

// Check that the submission does not already have that same filename
// let existing_file = InputObjectDB::Entity::find()
// .filter(InputObjectDB::Column::Filename.eq(filename.clone()))
// .find_also_related(AssociationDB::Entity)
// .filter(AssociationDB::Column::SubmissionId.eq(submission_id))
// .one(&db)
// .await?;

let results: Vec<(SubmissionDB::Model, Vec<InputObjectDB::Model>)> =
SubmissionDB::Entity::find()
.filter(SubmissionDB::Column::Id.eq(submission_id))
Expand All @@ -44,36 +40,48 @@ pub(super) async fn handle_pre_create(
.await
.unwrap();

// Unpack the tuples, if there are any input objects, then the filename is already in use
if results.iter().any(|(_, objs)| objs.len() > 0) {
return Ok(PreCreateResponse {
// change_file_info: Some(ChangeFileInfo {
// id: object.last_insert_id.to_string(),
// }),
status: "success".to_string(),
http_response: Some(HttpResponse {
status_code: Some(400),
body: Some(
"File already uploaded with this filename in this submission".to_string(),
),
..Default::default()
}),
reject_upload: true,
..Default::default()
});
// Unpack the tuples to check if filename is already in use
for (_, objs) in results.iter() {
if !objs.is_empty() {
let existing_object = &objs[0];
if existing_object.all_parts_received {
// File upload is complete, return 400 error
return Ok(PreCreateResponse {
status: "failure".to_string(),
http_response: Some(HttpResponse {
status_code: Some(400),
body: Some(
"File already uploaded with this filename in this submission"
.to_string(),
),
..Default::default()
}),
reject_upload: true,
..Default::default()
});
} else {
// File upload is incomplete, delete from S3 and DB
crate::uploads::services::delete_object(&db, &s3, existing_object.id).await?;
match existing_object.clone().delete(&db).await {
Ok(_) => println!("Deleted incomplete file from S3 and database"),
_ => return Err(anyhow::anyhow!("Failed to delete object")),
}
}
}
}

// Proceed if no complete files are found
let allowed_types: Vec<&str> = vec!["application/octet-stream"];
let allowed_file_extensions: Vec<&str> = vec!["pod5"];

if !allowed_types.contains(&filetype.clone().as_str()) {
if !allowed_types.contains(&filetype.as_str()) {
return Err(anyhow::anyhow!("Filetype not allowed"));
}
if !allowed_file_extensions.contains(&filename.split('.').last().unwrap()) {
return Err(anyhow::anyhow!("File extension not allowed"));
}

// Check that the submission does not already have that same filename
// Create new object in DB
let object = InputObjectDB::ActiveModel {
id: Set(Uuid::new_v4()),
created_on: Set(Utc::now().naive_utc()),
Expand Down
13 changes: 9 additions & 4 deletions src/external/tus/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ use axum_keycloak_auth::{
instance::KeycloakAuthInstance, layer::KeycloakAuthLayer, PassthroughMode,
};

use aws_sdk_s3::Client as S3Client;
use sea_orm::DatabaseConnection;
use std::sync::Arc;

pub fn router(db: DatabaseConnection, keycloak_auth_instance: Arc<KeycloakAuthInstance>) -> Router {
pub fn router(
db: DatabaseConnection,
keycloak_auth_instance: Arc<KeycloakAuthInstance>,
s3: Arc<S3Client>,
) -> Router {
Router::new()
.route("/hooks", post(handle_tus_hooks))
.with_state(db)
.with_state((db, s3))
// Add the KeycloakAuthLayer to validate JWT tokens for tus hooks
.layer(
KeycloakAuthLayer::<Role>::builder()
Expand All @@ -33,11 +38,11 @@ pub fn router(db: DatabaseConnection, keycloak_auth_instance: Arc<KeycloakAuthIn
// Example of async function to handle tus hook events
#[axum::debug_handler]
pub async fn handle_tus_hooks(
State(db): State<DatabaseConnection>,
State((db, s3)): State<(DatabaseConnection, Arc<S3Client>)>,
Json(payload): Json<EventPayload>,
) -> (StatusCode, Json<PreCreateResponse>) {
match payload.event_type {
EventType::PreCreate => match handle_pre_create(db, payload).await {
EventType::PreCreate => match handle_pre_create(db, s3, payload).await {
Ok(response) => (StatusCode::CREATED, Json(response)),
Err(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
8 changes: 6 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ async fn main() {
)
.nest(
"/api/uploads",
uploads::views::router(db.clone(), keycloak_auth_instance.clone(), s3_client),
uploads::views::router(
db.clone(),
keycloak_auth_instance.clone(),
s3_client.clone(),
),
)
.nest(
"/tus",
external::tus::views::router(db.clone(), keycloak_auth_instance),
external::tus::views::router(db.clone(), keycloak_auth_instance, s3_client),
);

let addr: std::net::SocketAddr = "0.0.0.0:3000".parse().unwrap();
Expand Down
47 changes: 47 additions & 0 deletions src/uploads/services.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use super::associations;
use super::db;
use crate::config::Config;
use anyhow::Error;
use aws_sdk_s3::Client as S3Client;
use sea_orm::entity::prelude::*;
use sea_orm::{DatabaseConnection, EntityTrait};
use std::sync::Arc;
use uuid::Uuid;

pub async fn delete_object(
db: &DatabaseConnection,
s3: &Arc<S3Client>,
id: Uuid,
) -> Result<(), Error> {
// Delete all associations for the object
associations::db::Entity::delete_many()
.filter(associations::db::Column::InputObjectId.eq(id.clone()))
.exec(db)
.await
.map_err(|e| Error::new(e))?;

// Find the object to delete
match db::Entity::find_by_id(id).one(db).await? {
Some(obj) => {
let obj: db::ActiveModel = obj.into();

// Delete from the database
let res = db::Entity::delete(obj).exec(db).await?;
if res.rows_affected == 0 {
return Err(anyhow::anyhow!("Object not found"));
}

// Delete from S3
let config = Config::from_env();
s3.delete_object()
.bucket(config.s3_bucket)
.key(format!("{}/{}", config.s3_prefix, id))
.send()
.await
.map_err(|e| Error::new(e))?;

Ok(())
}
_ => Err(anyhow::anyhow!("Object not found")),
}
}
51 changes: 10 additions & 41 deletions src/uploads/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::common::filter::{apply_filters, parse_range};
use crate::common::models::FilterOptions;
use crate::common::pagination::calculate_content_range;
use crate::common::sort::generic_sort;
use crate::config::Config;
use aws_sdk_s3::Client as S3Client;
use axum::{
extract::{DefaultBodyLimit, Path, Query, State},
Expand All @@ -14,7 +13,7 @@ use axum::{
use axum_keycloak_auth::{
instance::KeycloakAuthInstance, layer::KeycloakAuthLayer, PassthroughMode,
};
use sea_orm::{query::*, ColumnTrait, DatabaseConnection, EntityTrait};
use sea_orm::{query::*, DatabaseConnection, EntityTrait};
use std::sync::Arc;
use uuid::Uuid;

Expand Down Expand Up @@ -122,46 +121,16 @@ pub async fn delete_one(
State((db, s3)): State<(DatabaseConnection, Arc<S3Client>)>,
Path(id): Path<Uuid>,
) -> StatusCode {
// Delete all associations, then delete the object
super::associations::db::Entity::delete_many()
.filter(super::associations::db::Column::InputObjectId.eq(id.clone()))
.exec(&db)
.await
.unwrap();

match super::db::Entity::find_by_id(id).one(&db).await {
Ok(Some(obj)) => {
let obj: super::db::ActiveModel = obj.into();

match super::db::Entity::delete(obj).exec(&db).await {
Ok(res) => {
if res.rows_affected == 0 {
return StatusCode::NOT_FOUND;
}

// Delete from S3
let config = Config::from_env();
let s3_response = s3
.delete_object()
.bucket(config.s3_bucket)
.key(format!("{}/{}", config.s3_prefix, id))
.send()
.await
.unwrap();

println!("S3 response: {:?}", s3_response);
return StatusCode::NO_CONTENT;
}
Err(_) => {
return StatusCode::INTERNAL_SERVER_ERROR;
}
match super::services::delete_object(&db, &s3, id).await {
Ok(_) => StatusCode::NO_CONTENT,
Err(err) => {
// Log the error if needed
eprintln!("Failed to delete object: {:?}", err);
if err.to_string() == "Object not found" {
StatusCode::NOT_FOUND
} else {
StatusCode::INTERNAL_SERVER_ERROR
}
}
Ok(_) => {
return StatusCode::NOT_FOUND;
}
Err(_) => {
return StatusCode::NOT_FOUND;
}
}
}

0 comments on commit 639bf60

Please sign in to comment.