Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable S3-compatible uploads #39

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
759 changes: 740 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions services/silo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
urlencoding = "2.1"
reqwest = { version = "0.12.9", features = ["json"] }
aws-sdk-s3 = "1.66.0"
aws-config = "1.5.11"
aws-types = "1.3.3"
38 changes: 38 additions & 0 deletions services/silo-api/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::sync::Arc;

use queue::{PostgresQueue, Queue};
use sqlx::PgPool;

use crate::config::Config;

/// Shared state available to the API
pub struct AppState {
pub db: PgPool,
pub config: Config,
pub queue: Arc<dyn Queue>,
pub s3_client: Option<aws_sdk_s3::Client>,
}

impl AppState {
pub async fn new(config: Config) -> Result<Self, Box<dyn std::error::Error>> {
// Initialize a connection to the database
let db = db::connect_to_database()
.await
.expect("Could not connect to database");
// Initialize the queue
let queue = Arc::new(PostgresQueue::new(db.clone()));

// If there's S3 credentials, construct an S3 client
let s3_client = match config.s3_options.clone() {
None => None,
Some(options) => Some(aws_sdk_s3::Client::from_conf(options)),
};

Ok(Self {
config,
db,
queue,
s3_client,
})
}
}
82 changes: 68 additions & 14 deletions services/silo-api/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,85 @@
use aws_sdk_s3::config::{Credentials, Region};
use sha2::digest::typenum::Xor;

pub const DEFAULT_PORT: &str = "3000"; // This is stored as a string to match environment vars

/// Global Configuration for the API Server
pub struct Config {
pub port: String,
pub upload_dir: String,
pub upload_dir: Option<String>,
pub s3_options: Option<aws_sdk_s3::Config>,
}

impl Config {
pub fn new() -> Self {
// Get the port to run the app on
let port = std::env::var("PORT")
Config {
port: Self::get_port(),
upload_dir: Self::get_upload_dir(),
s3_options: Self::get_s3_config(),
}
}
/// Gets the port from environment variables
pub fn get_port() -> String {
std::env::var("PORT")
.unwrap_or_else(|_| DEFAULT_PORT.to_string())
.parse()
.expect("PORT must be a number");

// Get and create upload directory if it doesn't exist
let upload_dir = match std::env::var("UPLOAD_DIR") {
Ok(dir) => dir,
Err(_) => "uploads".to_string(),
};

std::fs::create_dir_all(&upload_dir).expect("Failed to create upload directory");

Config { port, upload_dir }
.expect("PORT must be a number")
}
/// Formats the host and port into an address for a TCPListener to bind to
pub fn get_address(&self) -> String {
format!("{}:{}", "0.0.0.0", &self.port)
}
/// Gets the upload directory from environment variables and initializes it
pub fn get_upload_dir() -> Option<String> {
match std::env::var("UPLOAD_DIR") {
Ok(dir) => {
std::fs::create_dir_all(&dir).expect("Failed to create upload directory");
Some(dir)
}
Err(_) => None,
}
}
/// Gets the S3 configuration from environment variables
pub fn get_s3_config() -> Option<aws_sdk_s3::Config> {
let endpoint = std::env::var("R2_ENDPOINT");
let bucket = std::env::var("R2_BUCKET");
let access_key = std::env::var("R2_ACCESS_KEY_ID");
let secret_key = std::env::var("R2_SECRET_ACCESS_KEY");
let region = std::env::var("R2_REGION");

let config_values = [&endpoint, &bucket, &access_key, &secret_key, &region];
// Check if any of the variables exist or if they're all empty
let has_some = config_values.iter().any(|result| result.is_ok());
let has_all = config_values.iter().all(|result| result.is_ok());

// Warn the user that the configuration was ignored due to missing values
if has_some && !has_all {
tracing::warn!(
"Some S3 configuration variables are set but not all - S3 storage will be disabled"
);
return None;
}
// Just return none if nothing was specified
if !has_some {
return None;
}

// Setup creds
let creds = Credentials::new(
access_key.unwrap(),
secret_key.unwrap(),
None,
None,
"farmhand",
);

// Construct config
let config = aws_sdk_s3::Config::builder()
.credentials_provider(creds)
.endpoint_url(endpoint.unwrap())
.region(Region::new(region.unwrap()))
.build();

Some(config)
}
}
43 changes: 14 additions & 29 deletions services/silo-api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
mod app_state;
mod config;
mod jwt;
mod middleware;
mod routes;

pub use app_state::AppState;
use axum::{
extract::DefaultBodyLimit,
middleware as axum_mw,
response::IntoResponse,
routing::{delete, get, post},
Router,
};
use config::Config;
use queue::{PostgresQueue, Queue};
use routes::upload::UPLOAD_CHUNK_SIZE;
use sqlx::PgPool;

use std::sync::Arc;
use tower_http::{
cors::CorsLayer,
Expand All @@ -23,13 +22,6 @@ use tower_http::{
use tracing::Level;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

/// Shared state available to the API
pub struct AppState {
db: PgPool,
config: Config,
queue: Arc<dyn Queue>,
}

#[tokio::main]
async fn main() {
// Start the tracer
Expand All @@ -47,15 +39,11 @@ async fn main() {
let listener = tokio::net::TcpListener::bind(&config.get_address())
.await
.unwrap();
// Initialize a connection to the database
let db = db::connect_to_database()
.await
.expect("Could not connect to database");
// Initialize the queue
let queue = Arc::new(PostgresQueue::new(db.clone()));
// Store shared data as state between routes
let state = Arc::new(AppState { db, config, queue });
routes::upload::init_cleanup().await;
let app_state = AppState::new(config)
.await
.expect("Could not construct app state");
let state = Arc::new(app_state);
// Initialize our router with the shared state and required routes
let app = Router::new()
.route("/", get(index))
Expand Down Expand Up @@ -88,16 +76,13 @@ async fn main() {
middleware::auth::auth_middleware,
)),
)
// TODO: Update this to use Backblaze instead
// .route(
// "/upload",
// post(routes::upload::upload_video)
// .layer(DefaultBodyLimit::max(UPLOAD_CHUNK_SIZE * 8))
// .layer(axum_mw::from_fn_with_state(
// state.clone(),
// middleware::auth::auth_middleware,
// )),
// )
.route(
"/upload",
post(routes::upload::on_disk::upload_video).layer(axum_mw::from_fn_with_state(
state.clone(),
middleware::auth::auth_middleware,
)),
)
.nest(
"/video",
Router::new()
Expand Down
Empty file.
1 change: 1 addition & 0 deletions services/silo-api/src/routes/upload/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod on_disk;
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ lazy_static::lazy_static! {
static ref UPLOAD_STATES: Mutex<HashMap<String, UploadState>> = Mutex::new(HashMap::new());
}

/// Upload video directly to the server
/// DEPRECATED: Favor using Cloudflare-R2 instead
pub async fn upload_video(
State(state): State<Arc<AppState>>,
Extension(user): Extension<Option<User>>,
Expand Down Expand Up @@ -131,7 +133,13 @@ pub async fn upload_video(
}

// Create paths for temporary and final file locations
let upload_dir = Path::new(&state.config.upload_dir);
let upload_dir = Path::new(
state
.config
.upload_dir
.as_ref()
.expect("No upload directory specified"),
);
let final_path = upload_dir.join(&filename);
let temp_path = upload_dir.join(format!("{}.temp", Uuid::new_v4()));

Expand Down Expand Up @@ -396,6 +404,8 @@ pub async fn upload_video(
}
}

/// Removes temp files that are more than an hour old. This is useful for cleaning up
/// partially uploaded files that were never completed.
async fn cleanup_temp_files() -> Result<(), std::io::Error> {
let upload_dir = std::env::current_dir()?.join("uploads");
let mut read_dir = tokio::fs::read_dir(&upload_dir).await?;
Expand All @@ -416,6 +426,7 @@ async fn cleanup_temp_files() -> Result<(), std::io::Error> {
Ok(())
}

/// Cleans up temporary upload files by removing those older than 1 hour
pub async fn init_cleanup() {
tokio::spawn(async {
loop {
Expand Down
Empty file added services/silo-api/src/s3.rs
Empty file.
Loading