Skip to content

Commit

Permalink
finish
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Dec 3, 2023
1 parent 8f93242 commit d0da289
Show file tree
Hide file tree
Showing 29 changed files with 219 additions and 134 deletions.
14 changes: 1 addition & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/kitsune-activitypub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition.workspace = true
version.workspace = true

[dependencies]
async-recursion = "1.0.5"
async-trait = "0.1.74"
autometrics = { version = "0.6.0", default-features = false }
base64-simd = { version = "0.8.0", features = ["unstable"] }
Expand Down
9 changes: 9 additions & 0 deletions crates/kitsune-activitypub/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ pub enum Error {
#[error(transparent)]
FederationFilter(#[from] kitsune_federation_filter::error::Error),

#[error(transparent)]
FetchAccount(BoxError),

#[error(transparent)]
FetchEmoji(BoxError),

#[error(transparent)]
FetchPost(BoxError),

#[error(transparent)]
Http(#[from] http::Error),

Expand Down
9 changes: 6 additions & 3 deletions crates/kitsune-activitypub/src/fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use kitsune_cache::ArcCache;
use kitsune_core::{
consts::USER_AGENT,
error::BoxError,
traits::{fetcher::AccountFetchOptions, Fetcher as FetcherTrait, Resolver},
traits::{
fetcher::{AccountFetchOptions, PostFetchOptions},
Fetcher as FetcherTrait, Resolver,
},
};
use kitsune_db::{
model::{account::Account, custom_emoji::CustomEmoji, post::Post},
Expand Down Expand Up @@ -130,7 +133,7 @@ impl FetcherTrait for Fetcher {
Ok(self.fetch_emoji(url).await?)
}

async fn fetch_post(&self, url: &str) -> Result<Option<Post>, BoxError> {
Ok(self.fetch_object(url).await?)
async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result<Option<Post>, BoxError> {
Ok(self.fetch_object(opts.url, opts.call_depth).await?)
}
}
16 changes: 3 additions & 13 deletions crates/kitsune-activitypub/src/fetcher/object.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::Fetcher;
use crate::{error::Result, process_new_object, ProcessNewObject};
use async_recursion::async_recursion;
use autometrics::autometrics;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
Expand All @@ -13,12 +12,9 @@ use scoped_futures::ScopedFutureExt;
pub const MAX_FETCH_DEPTH: u32 = 30;

impl Fetcher {
#[async_recursion]
pub(crate) async fn fetch_object_inner(
&self,
url: &str,
call_depth: u32,
) -> Result<Option<Post>> {
#[instrument(skip(self))]
#[autometrics(track_concurrency)]
pub(crate) async fn fetch_object(&self, url: &str, call_depth: u32) -> Result<Option<Post>> {
if call_depth > MAX_FETCH_DEPTH {
return Ok(None);
}
Expand Down Expand Up @@ -65,10 +61,4 @@ impl Fetcher {

Ok(Some(post))
}

#[instrument(skip(self))]
#[autometrics(track_concurrency)]
pub(crate) async fn fetch_object(&self, url: &str) -> Result<Option<Post>> {
self.fetch_object_inner(url, 0).await
}
}
26 changes: 19 additions & 7 deletions crates/kitsune-activitypub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
use http::Uri;
use iso8601_timestamp::Timestamp;
use kitsune_core::traits::{fetcher::PostFetchOptions, Fetcher as FetcherTrait};
use kitsune_db::{
model::{
account::Account,
Expand Down Expand Up @@ -79,7 +80,7 @@ async fn handle_mentions(
async fn handle_custom_emojis(
db_conn: &mut AsyncPgConnection,
post_id: Uuid,
fetcher: &Fetcher,
fetcher: &dyn FetcherTrait,
tags: &[Tag],
) -> Result<()> {
let emoji_iter = tags.iter().filter(|tag| tag.r#type == TagType::Emoji);
Expand Down Expand Up @@ -107,7 +108,8 @@ async fn handle_custom_emojis(
emoji_text: emoji_tag.name.clone(),
})
.try_collect::<Vec<PostCustomEmoji>>()
.await?;
.await
.map_err(Error::FetchEmoji)?;

diesel::insert_into(posts_custom_emojis::table)
.values(emojis)
Expand Down Expand Up @@ -171,7 +173,7 @@ pub struct ProcessNewObject<'a> {
db_pool: &'a PgPool,
embed_client: Option<&'a EmbedClient>,
object: Box<Object>,
fetcher: &'a Fetcher,
fetcher: &'a dyn FetcherTrait,
search_backend: &'a AnySearchBackend,
}

Expand All @@ -184,7 +186,7 @@ struct PreprocessedObject<'a> {
content_lang: Language,
db_pool: &'a PgPool,
object: Box<Object>,
fetcher: &'a Fetcher,
fetcher: &'a dyn FetcherTrait,
search_backend: &'a AnySearchBackend,
}

Expand All @@ -208,7 +210,11 @@ async fn preprocess_object(
return Err(Error::InvalidDocument);
}

let Some(author) = fetcher.fetch_actor(attributed_to.into()).await? else {
let Some(author) = fetcher
.fetch_account(attributed_to.into())
.await
.map_err(Error::FetchAccount)?
else {
return Err(Error::NotFound);
};

Expand All @@ -218,8 +224,14 @@ async fn preprocess_object(
let visibility = Visibility::from_activitypub(&user, &object).unwrap();
let in_reply_to_id = if let Some(ref in_reply_to) = object.in_reply_to {
fetcher
.fetch_object_inner(in_reply_to, call_depth + 1)
.await?
.fetch_post(
PostFetchOptions::builder()
.url(in_reply_to)
.call_depth(call_depth + 1)
.build(),
)
.await
.map_err(Error::FetchPost)?
.map(|post| post.id)
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-activitypub/tests/fetcher/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn fetch_note() {
.build();

let note = fetcher
.fetch_post("https://corteximplant.com/@0x0/109501674056556919")
.fetch_post("https://corteximplant.com/@0x0/109501674056556919".into())
.await
.expect("Fetch note")
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions crates/kitsune-activitypub/tests/fetcher/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn federation_allow() {

assert!(matches!(
*fetcher
.fetch_post("https://example.com/fakeobject")
.fetch_post("https://example.com/fakeobject".into())
.await
.unwrap_err()
.downcast_ref()
Expand All @@ -57,7 +57,7 @@ async fn federation_allow() {

assert!(matches!(
*fetcher
.fetch_post("https://other.badstuff.com/otherfake")
.fetch_post("https://other.badstuff.com/otherfake".into())
.await
.unwrap_err()
.downcast_ref()
Expand All @@ -77,7 +77,7 @@ async fn federation_allow() {

assert!(matches!(
fetcher
.fetch_post("https://corteximplant.com/@0x0/109501674056556919")
.fetch_post("https://corteximplant.com/@0x0/109501674056556919".into())
.await,
Ok(..)
));
Expand Down Expand Up @@ -118,7 +118,7 @@ async fn federation_deny() {

assert!(matches!(
fetcher
.fetch_post("https://example.com/fakeobject")
.fetch_post("https://example.com/fakeobject".into())
.await
.unwrap_err()
.downcast_ref()
Expand All @@ -127,7 +127,7 @@ async fn federation_deny() {
));
assert!(matches!(
*fetcher
.fetch_post("https://other.badstuff.com/otherfake")
.fetch_post("https://other.badstuff.com/otherfake".into())
.await
.unwrap_err()
.downcast_ref()
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-activitypub/tests/fetcher/infinite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn fetch_infinitely_long_reply_chain() {
.build();

assert!(fetcher
.fetch_post("https://example.com/notes/0")
.fetch_post("https://example.com/notes/0".into())
.await
.is_ok());
})
Expand Down
4 changes: 2 additions & 2 deletions crates/kitsune-activitypub/tests/fetcher/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn check_ap_id_authority() {
.build();

let _ = fetcher
.fetch_post("https://example.com/@0x0/109501674056556919")
.fetch_post("https://example.com/@0x0/109501674056556919".into())
.await
.unwrap_err();
})
Expand Down Expand Up @@ -109,7 +109,7 @@ async fn check_ap_content_type() {

assert!(matches!(
*fetcher
.fetch_post("https://corteximplant.com/users/0x0")
.fetch_post("https://corteximplant.com/users/0x0".into())
.await
.unwrap_err()
.downcast_ref()
Expand Down
30 changes: 23 additions & 7 deletions crates/kitsune-core/src/traits/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ pub struct AccountFetchOptions<'a> {
#[builder(default, setter(strip_option))]
pub acct: Option<(&'a str, &'a str)>,

/// Refetch the ActivityPub entity
/// Refetch the account
///
/// This is mainly used to refresh possibly stale actors
///
/// Default: false
#[builder(default = false)]
pub refetch: bool,

/// URL of the ActivityPub entity
/// URL of the account
pub url: &'a str,
}

Expand All @@ -30,6 +30,22 @@ impl<'a> From<&'a str> for AccountFetchOptions<'a> {
}
}

#[derive(Clone, Copy, Debug, TypedBuilder)]
pub struct PostFetchOptions<'a> {
/// Call depth of recursive calls of the post fetch logic
#[builder(default)]
pub call_depth: u32,

/// URL of the object
pub url: &'a str,
}

impl<'a> From<&'a str> for PostFetchOptions<'a> {
fn from(value: &'a str) -> Self {
Self::builder().url(value).build()
}
}

#[async_trait]
pub trait Fetcher: Send + Sync + 'static {
fn resolver(&self) -> Arc<dyn Resolver>;
Expand All @@ -41,7 +57,7 @@ pub trait Fetcher: Send + Sync + 'static {

async fn fetch_emoji(&self, url: &str) -> Result<Option<CustomEmoji>, BoxError>;

async fn fetch_post(&self, url: &str) -> Result<Option<Post>, BoxError>;
async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result<Option<Post>, BoxError>;
}

#[async_trait]
Expand All @@ -61,8 +77,8 @@ impl Fetcher for Arc<dyn Fetcher> {
(**self).fetch_emoji(url).await
}

async fn fetch_post(&self, url: &str) -> Result<Option<Post>, BoxError> {
(**self).fetch_post(url).await
async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result<Option<Post>, BoxError> {
(**self).fetch_post(opts).await
}
}

Expand Down Expand Up @@ -98,9 +114,9 @@ where
Ok(None)
}

async fn fetch_post(&self, url: &str) -> Result<Option<Post>, BoxError> {
async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result<Option<Post>, BoxError> {
for fetcher in self {
if let Some(post) = fetcher.fetch_post(url).await? {
if let Some(post) = fetcher.fetch_post(opts).await? {
return Ok(Some(post));
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/kitsune-email/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ askama = "0.12.1"
askama_axum = "0.3.0" # Damn it, cargo. Because "kitsune" uses "askama" with the axum feature, we have to have the crate available here as well..
diesel = "2.1.4"
diesel-async = "0.4.1"
iso8601-timestamp = "0.2.12"
kitsune-db = { path = "../kitsune-db" }
kitsune-url = { path = "../kitsune-url" }
lettre = { version = "0.11.2", default-features = false, features = [
Expand Down
14 changes: 9 additions & 5 deletions crates/kitsune-email/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{error::Result, mails::confirm_account::ConfirmAccount, MailSender};
use diesel::{ExpressionMethods, QueryDsl};
use diesel::{ExpressionMethods, NullableExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use iso8601_timestamp::Timestamp;
use kitsune_db::{model::user::User, schema::users, PgPool};
use kitsune_db::{function::now, model::user::User, schema::users, PgPool};
use kitsune_url::UrlService;
use lettre::{AsyncSmtpTransport, Tokio1Executor};
use scoped_futures::ScopedFutureExt;
Expand All @@ -22,11 +21,16 @@ impl Mailing {
self.sender.is_some()
}

#[must_use]
pub fn sender(&self) -> Option<MailSender<AsyncSmtpTransport<Tokio1Executor>>> {
self.sender.clone()
}

pub async fn mark_as_confirmed(&self, user_id: Uuid) -> Result<()> {
self.db_pool
.with_connection(|db_conn| {
diesel::update(users::table.find(user_id))
.set(users::confirmed_at.eq(Timestamp::now_utc()))
.set(users::confirmed_at.eq(now().nullable()))
.execute(db_conn)
.scoped()
})
Expand All @@ -43,7 +47,7 @@ impl Mailing {
.filter(users::confirmation_token.eq(confirmation_token))
.filter(users::confirmed_at.is_null()),
)
.set(users::confirmed_at.eq(Timestamp::now_utc()))
.set(users::confirmed_at.eq(now().nullable()))
.execute(db_conn)
.scoped()
})
Expand Down
Loading

0 comments on commit d0da289

Please sign in to comment.