diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 66d668c..797fd9d 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -20,4 +20,5 @@ jobs: # - run: cargo fmt -- --check - run: cargo clippy --all-targets --all-features - run: cargo test --all-features --no-fail-fast + - run: cargo doc --all-features - run: cargo run --example local_federation axum diff --git a/Cargo.toml b/Cargo.toml index 7ea5c40..488abe5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,13 @@ documentation = "https://docs.rs/activitypub_federation/" [features] default = ["axum"] -axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"] +axum = [ + "dep:axum", + "dep:axum-extra", + "dep:tower", + "dep:hyper", + "dep:http-body-util", +] diesel = ["dep:diesel"] [lints.rust] @@ -36,18 +42,18 @@ serde = { version = "1.0.204", features = ["derive"] } async-trait = "0.1.81" url = { version = "2.5.2", features = ["serde"] } serde_json = { version = "1.0.120", features = ["preserve_order"] } -reqwest = { version = "0.11.27", default-features = false, features = [ +reqwest = { version = "0.12.5", default-features = false, features = [ "json", "stream", "rustls-tls", ] } -reqwest-middleware = "0.2.5" +reqwest-middleware = "0.3.2" tracing = "0.1.40" base64 = "0.22.1" rand = "0.8.5" rsa = "0.9.6" once_cell = "1.19.0" -http = "0.2.12" +http = "1.1.0" sha2 = { version = "0.10.8", features = ["oid"] } thiserror = "1.0.62" derive_builder = "0.20.0" @@ -55,7 +61,7 @@ itertools = "0.13.0" dyn-clone = "1.0.17" enum_delegate = "0.2.0" httpdate = "1.0.3" -http-signature-normalization-reqwest = { version = "0.10.0", default-features = false, features = [ +http-signature-normalization-reqwest = { version = "0.12.0", default-features = false, features = [ "sha-2", "middleware", "default-spawner", @@ -82,24 +88,24 @@ futures = "0.3.30" moka = { version = "0.12.8", features = ["future"] } # Axum -axum = { version = "0.6.20", features = [ +axum = { version = "0.7.5", features = [ "json", - "headers", ], default-features = false, optional = true } +axum-extra = { version = "0.9.3", features = ["typed-header"], optional = true } tower = { version = "0.4.13", optional = true } -hyper = { version = "0.14", optional = true } +hyper = { version = "1.4.1", optional = true } http-body-util = { version = "0.1.2", optional = true } [dev-dependencies] anyhow = "1.0.86" env_logger = "0.11.3" tower-http = { version = "0.5.2", features = ["map-request-body", "util"] } -axum = { version = "0.6.20", features = [ +axum = { version = "0.7.5", features = [ "http1", "tokio", "query", ], default-features = false } -axum-macros = "0.3.8" +axum-macros = "0.4.1" tokio = { version = "1.38.0", features = ["full"] } [profile.dev] diff --git a/docs/06_http_endpoints_axum.md b/docs/06_http_endpoints_axum.md index 3a33410..2feeb5a 100644 --- a/docs/06_http_endpoints_axum.md +++ b/docs/06_http_endpoints_axum.md @@ -15,9 +15,9 @@ The next step is to allow other servers to fetch our actors and objects. For thi # use activitypub_federation::config::FederationMiddleware; # use axum::routing::get; # use crate::activitypub_federation::traits::Object; -# use axum::headers::ContentType; +# use axum_extra::headers::ContentType; # use activitypub_federation::FEDERATION_CONTENT_TYPE; -# use axum::TypedHeader; +# use axum_extra::TypedHeader; # use axum::response::IntoResponse; # use http::HeaderMap; # async fn generate_user_html(_: String, _: Data) -> axum::response::Response { todo!() } @@ -33,11 +33,11 @@ async fn main() -> Result<(), Error> { .route("/user/:name", get(http_get_user)) .layer(FederationMiddleware::new(data)); - let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - tracing::debug!("listening on {}", addr); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await?; + let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") + .await + .unwrap(); + tracing::debug!("listening on {}", listener.local_addr().unwrap()); + axum::serve(listener, app).await.unwrap(); Ok(()) } diff --git a/examples/live_federation/main.rs b/examples/live_federation/main.rs index 3fa0b18..0da5ce5 100644 --- a/examples/live_federation/main.rs +++ b/examples/live_federation/main.rs @@ -12,10 +12,8 @@ use axum::{ Router, }; use error::Error; -use std::{ - net::ToSocketAddrs, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; +use tokio::net::TcpListener; use tracing::log::{info, LevelFilter}; mod activities; @@ -60,13 +58,13 @@ async fn main() -> Result<(), Error> { .route("/.well-known/webfinger", get(webfinger)) .layer(FederationMiddleware::new(config)); - let addr = BIND_ADDRESS - .to_socket_addrs()? - .next() - .expect("Failed to lookup domain name"); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await?; + axum::serve( + TcpListener::bind(BIND_ADDRESS) + .await + .expect("Failed to lookup domain name"), + app.into_make_service(), + ) + .await?; Ok(()) } diff --git a/examples/local_federation/actix_web/http.rs b/examples/local_federation/actix_web/http.rs deleted file mode 100644 index 6298014..0000000 --- a/examples/local_federation/actix_web/http.rs +++ /dev/null @@ -1,97 +0,0 @@ -use crate::{ - error::Error, - instance::DatabaseHandle, - objects::person::{DbUser, PersonAcceptedActivities}, -}; -use activitypub_federation::{ - actix_web::{inbox::receive_activity, signing_actor}, - config::{Data, FederationConfig, FederationMiddleware}, - fetch::webfinger::{build_webfinger_response, extract_webfinger_name}, - protocol::context::WithContext, - traits::{Actor, Object}, - FEDERATION_CONTENT_TYPE, -}; -use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer}; -use anyhow::anyhow; -use serde::Deserialize; -use tracing::info; - -pub fn listen(config: &FederationConfig) -> Result<(), Error> { - let hostname = config.domain(); - info!("Listening with actix-web on {hostname}"); - let config = config.clone(); - let server = HttpServer::new(move || { - App::new() - .wrap(FederationMiddleware::new(config.clone())) - .route("/", web::get().to(http_get_system_user)) - .route("/{user}", web::get().to(http_get_user)) - .route("/{user}/inbox", web::post().to(http_post_user_inbox)) - .route("/.well-known/webfinger", web::get().to(webfinger)) - }) - .bind(hostname)? - .run(); - tokio::spawn(server); - Ok(()) -} - -/// Handles requests to fetch system user json over HTTP -pub async fn http_get_system_user(data: Data) -> Result { - let json_user = data.system_user.clone().into_json(&data).await?; - Ok(HttpResponse::Ok() - .content_type(FEDERATION_CONTENT_TYPE) - .json(WithContext::new_default(json_user))) -} - -/// Handles requests to fetch user json over HTTP -pub async fn http_get_user( - request: HttpRequest, - user_name: web::Path, - data: Data, -) -> Result { - let signed_by = signing_actor::(&request, None, &data).await?; - // here, checks can be made on the actor or the domain to which - // it belongs, to verify whether it is allowed to access this resource - info!( - "Fetch user request is signed by system account {}", - signed_by.id() - ); - - let db_user = data.local_user(); - if user_name.into_inner() == db_user.name { - let json_user = db_user.into_json(&data).await?; - Ok(HttpResponse::Ok() - .content_type(FEDERATION_CONTENT_TYPE) - .json(WithContext::new_default(json_user))) - } else { - Err(anyhow!("Invalid user").into()) - } -} - -/// Handles messages received in user inbox -pub async fn http_post_user_inbox( - request: HttpRequest, - body: Bytes, - data: Data, -) -> Result { - receive_activity::, DbUser, DatabaseHandle>( - request, body, &data, - ) - .await -} - -#[derive(Deserialize)] -pub struct WebfingerQuery { - resource: String, -} - -pub async fn webfinger( - query: web::Query, - data: Data, -) -> Result { - let name = extract_webfinger_name(&query.resource, &data)?; - let db_user = data.read_user(name)?; - Ok(HttpResponse::Ok().json(build_webfinger_response( - query.resource.clone(), - db_user.ap_id.into_inner(), - ))) -} diff --git a/examples/local_federation/actix_web/mod.rs b/examples/local_federation/actix_web/mod.rs deleted file mode 100644 index ae11164..0000000 --- a/examples/local_federation/actix_web/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::error::Error; -use actix_web::ResponseError; - -pub(crate) mod http; - -impl ResponseError for Error {} diff --git a/examples/local_federation/axum/http.rs b/examples/local_federation/axum/http.rs index f17ea4a..6cd8a29 100644 --- a/examples/local_federation/axum/http.rs +++ b/examples/local_federation/axum/http.rs @@ -17,15 +17,14 @@ use axum::{ extract::{Path, Query}, response::IntoResponse, routing::{get, post}, - Json, - Router, + Json, Router, }; use axum_macros::debug_handler; use serde::Deserialize; -use std::net::ToSocketAddrs; +use tokio::net::TcpListener; use tracing::info; -pub fn listen(config: &FederationConfig) -> Result<(), Error> { +pub async fn listen(config: &FederationConfig) -> Result<(), Error> { let hostname = config.domain(); info!("Listening with axum on {hostname}"); let config = config.clone(); @@ -35,13 +34,17 @@ pub fn listen(config: &FederationConfig) -> Result<(), Error> { .route("/.well-known/webfinger", get(webfinger)) .layer(FederationMiddleware::new(config)); - let addr = hostname - .to_socket_addrs()? - .next() - .expect("Failed to lookup domain name"); - let server = axum::Server::bind(&addr).serve(app.into_make_service()); + let server = axum::serve( + TcpListener::bind(hostname) + .await + .expect("Failed to lookup domain name"), + app.into_make_service(), + ); + + tokio::spawn(async move { + server.await.expect("Failed to start server"); + }); - tokio::spawn(server); Ok(()) } diff --git a/examples/local_federation/instance.rs b/examples/local_federation/instance.rs index 2413d21..efe33c3 100644 --- a/examples/local_federation/instance.rs +++ b/examples/local_federation/instance.rs @@ -76,13 +76,12 @@ impl FromStr for Webserver { } } -pub fn listen( +pub async fn listen( config: &FederationConfig, webserver: &Webserver, ) -> Result<(), Error> { match webserver { - Webserver::Axum => crate::axum::http::listen(config)?, - // Webserver::ActixWeb => crate::actix_web::http::listen(config)?, + Webserver::Axum => crate::axum::http::listen(config).await?, } Ok(()) } diff --git a/examples/local_federation/main.rs b/examples/local_federation/main.rs index 8995cb9..a683b4c 100644 --- a/examples/local_federation/main.rs +++ b/examples/local_federation/main.rs @@ -34,8 +34,8 @@ async fn main() -> Result<(), Error> { let alpha = new_instance("localhost:8001", "alpha".to_string()).await?; let beta = new_instance("localhost:8002", "beta".to_string()).await?; - listen(&alpha, &webserver)?; - listen(&beta, &webserver)?; + listen(&alpha, &webserver).await?; + listen(&beta, &webserver).await?; info!("Local instances started"); info!("Alpha user follows beta user via webfinger"); diff --git a/src/activity_queue.rs b/src/activity_queue.rs index 20852bd..ad25402 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -424,6 +424,7 @@ mod tests { use bytes::Bytes; use http::{HeaderMap, StatusCode}; use std::time::Instant; + use tokio::net::TcpListener; use tracing::debug; // This will periodically send back internal errors to test the retry @@ -451,10 +452,12 @@ mod tests { .route("/", post(dodgy_handler)) .with_state(state); - axum::Server::bind(&"0.0.0.0:8002".parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); + axum::serve( + TcpListener::bind("0.0.0.0:8002").await.unwrap(), + app.into_make_service(), + ) + .await + .unwrap(); } #[tokio::test(flavor = "multi_thread")] diff --git a/src/activity_sending.rs b/src/activity_sending.rs index f9023ce..bd57f50 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -232,6 +232,7 @@ mod tests { sync::{atomic::AtomicUsize, Arc}, time::Instant, }; + use tokio::net::TcpListener; use tracing::info; // This will periodically send back internal errors to test the retry @@ -251,10 +252,12 @@ mod tests { .route("/", post(dodgy_handler)) .with_state(state); - axum::Server::bind(&"0.0.0.0:8001".parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); + axum::serve( + TcpListener::bind("0.0.0.0:8001").await.unwrap(), + app.into_make_service(), + ) + .await + .unwrap(); } #[tokio::test(flavor = "multi_thread")] diff --git a/src/axum/inbox.rs b/src/axum/inbox.rs index 5bb147a..e8b89d7 100644 --- a/src/axum/inbox.rs +++ b/src/axum/inbox.rs @@ -11,9 +11,9 @@ use crate::{ }; use axum::{ async_trait, - body::{Bytes, HttpBody}, + body::{Body, HttpBody}, extract::FromRequest, - http::{Request, StatusCode}, + http::StatusCode, response::{IntoResponse, Response}, }; use http::{HeaderMap, Method, Uri}; @@ -59,21 +59,23 @@ pub struct ActivityData { } #[async_trait] -impl FromRequest for ActivityData +impl FromRequest for ActivityData where - Bytes: FromRequest, - B: HttpBody + Send + 'static, + Body: HttpBody + Send + 'static, S: Send + Sync, - ::Error: std::fmt::Display, - ::Data: Send, + ::Error: std::fmt::Display, + ::Data: Send, { type Rejection = Response; - async fn from_request(req: Request, _state: &S) -> Result { + async fn from_request( + req: axum::extract::Request, + _state: &S, + ) -> Result { let (parts, body) = req.into_parts(); // this wont work if the body is an long running stream - let bytes = hyper::body::to_bytes(body) + let bytes = axum::body::to_bytes(body, usize::MAX) .await .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;