From 5ff08fcaed9e182066a9460f654796afd828cc1c Mon Sep 17 00:00:00 2001 From: aumetra Date: Thu, 9 Nov 2023 21:45:39 +0100 Subject: [PATCH] use a lot more impl in traits --- Cargo.lock | 9 ++- Cargo.toml | 1 + crates/kitsune-core/Cargo.toml | 1 + crates/kitsune-core/src/job/mod.rs | 29 +-------- crates/kitsune-core/src/service/attachment.rs | 7 ++- crates/kitsune-messaging/Cargo.toml | 2 +- crates/kitsune-messaging/src/lib.rs | 62 ++++++++++++++----- crates/kitsune-messaging/src/redis.rs | 8 +-- .../kitsune-messaging/src/tokio_broadcast.rs | 4 +- crates/kitsune-storage/Cargo.toml | 2 +- crates/kitsune-storage/src/fs.rs | 6 +- crates/kitsune-storage/src/lib.rs | 60 +++++++++++++----- crates/kitsune-storage/src/s3.rs | 4 +- crates/kitsune-util/Cargo.toml | 6 ++ crates/kitsune-util/src/lib.rs | 1 + crates/kitsune-util/src/macros.rs | 43 +++++++++++++ 16 files changed, 166 insertions(+), 79 deletions(-) create mode 100644 crates/kitsune-util/Cargo.toml create mode 100644 crates/kitsune-util/src/lib.rs create mode 100644 crates/kitsune-util/src/macros.rs diff --git a/Cargo.lock b/Cargo.lock index b4b39cbb3..e789c24c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2929,6 +2929,7 @@ dependencies = [ "kitsune-storage", "kitsune-test", "kitsune-type", + "kitsune-util", "mime", "mime_guess", "password-hash", @@ -3084,9 +3085,9 @@ name = "kitsune-messaging" version = "0.0.1-pre.4" dependencies = [ "ahash 0.8.6", - "enum_dispatch", "futures-util", "kitsune-retry-policies", + "kitsune-util", "pin-project-lite", "redis", "serde", @@ -3173,11 +3174,11 @@ name = "kitsune-storage" version = "0.0.1-pre.4" dependencies = [ "bytes", - "enum_dispatch", "futures-util", "http", "hyper", "kitsune-http-client", + "kitsune-util", "rusty-s3", "tempfile", "tokio", @@ -3213,6 +3214,10 @@ dependencies = [ "utoipa", ] +[[package]] +name = "kitsune-util" +version = "0.0.1-pre.4" + [[package]] name = "lazy_static" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index 1d81145da..298675a03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "crates/kitsune-storage", "crates/kitsune-test", "crates/kitsune-type", + "crates/kitsune-util", "kitsune", "kitsune-cli", "kitsune-job-runner", diff --git a/crates/kitsune-core/Cargo.toml b/crates/kitsune-core/Cargo.toml index 613b9c198..cd0d9d5b0 100644 --- a/crates/kitsune-core/Cargo.toml +++ b/crates/kitsune-core/Cargo.toml @@ -49,6 +49,7 @@ kitsune-messaging = { path = "../kitsune-messaging" } kitsune-search = { path = "../kitsune-search" } kitsune-storage = { path = "../kitsune-storage" } kitsune-type = { path = "../kitsune-type" } +kitsune-util = { path = "../kitsune-util" } mime = "0.3.17" mime_guess = { version = "2.0.4", default-features = false } password-hash = { version = "0.5.0", features = ["std"] } diff --git a/crates/kitsune-core/src/job/mod.rs b/crates/kitsune-core/src/job/mod.rs index 539f2a152..113511d5a 100644 --- a/crates/kitsune-core/src/job/mod.rs +++ b/crates/kitsune-core/src/job/mod.rs @@ -17,6 +17,7 @@ use kitsune_db::{ schema::job_context, PgPool, }; +use kitsune_util::impl_from; use scoped_futures::ScopedFutureExt; use serde::{Deserialize, Serialize}; use speedy_uuid::Uuid; @@ -27,34 +28,6 @@ pub mod mailing; const MAX_CONCURRENT_REQUESTS: usize = 10; -macro_rules! impl_from { - ( - $(#[$top_annotation:meta])* - $vb:vis enum $name:ident { - $( - $(#[$branch_annotation:meta])* - $branch_name:ident ($from_type:ty) - ),+ - $(,)* - }) => { - $(#[$top_annotation])* - $vb enum $name { - $( - $(#[$branch_annotation])* - $branch_name($from_type), - )* - } - - $( - impl From<$from_type> for $name { - fn from(val: $from_type) -> Self { - Self::$branch_name(val) - } - } - )* - }; -} - pub struct JobRunnerContext { pub deliverer: Deliverer, pub state: State, diff --git a/crates/kitsune-core/src/service/attachment.rs b/crates/kitsune-core/src/service/attachment.rs index cfab83570..668242326 100644 --- a/crates/kitsune-core/src/service/attachment.rs +++ b/crates/kitsune-core/src/service/attachment.rs @@ -130,7 +130,8 @@ impl AttachmentService { pub async fn stream_file( &self, media_attachment: &MediaAttachment, - ) -> Result>> { + ) -> Result> + 'static> { + // TODO: Find way to avoid boxing the streams here if let Some(ref file_path) = media_attachment.file_path { let stream = self .storage_backend @@ -138,7 +139,7 @@ impl AttachmentService { .await .map_err(Error::Storage)?; - Ok(stream.map_err(Error::Storage).left_stream()) + Ok(stream.map_err(Error::Storage).boxed()) } else if self.media_proxy_enabled { Ok(self .client @@ -146,7 +147,7 @@ impl AttachmentService { .await? .stream() .map_err(Into::into) - .right_stream()) + .boxed()) } else { Err(ApiError::NotFound.into()) } diff --git a/crates/kitsune-messaging/Cargo.toml b/crates/kitsune-messaging/Cargo.toml index fb47fe455..914a546dc 100644 --- a/crates/kitsune-messaging/Cargo.toml +++ b/crates/kitsune-messaging/Cargo.toml @@ -5,9 +5,9 @@ edition.workspace = true [dependencies] ahash = "0.8.6" -enum_dispatch = "0.3.12" futures-util = "0.3.29" kitsune-retry-policies = { path = "../kitsune-retry-policies" } +kitsune-util = { path = "../kitsune-util" } pin-project-lite = "0.2.13" redis = { version = "0.23.3", features = [ "aio", diff --git a/crates/kitsune-messaging/src/lib.rs b/crates/kitsune-messaging/src/lib.rs index 3cdf5ab74..ea8c0da9b 100644 --- a/crates/kitsune-messaging/src/lib.rs +++ b/crates/kitsune-messaging/src/lib.rs @@ -10,12 +10,13 @@ #[macro_use] extern crate tracing; -use enum_dispatch::enum_dispatch; -use futures_util::{stream::BoxStream, Stream}; +use futures_util::{stream::BoxStream, Stream, StreamExt}; +use kitsune_util::impl_from; use pin_project_lite::pin_project; use serde::{de::DeserializeOwned, Serialize}; use std::{ error::Error, + future::Future, marker::PhantomData, pin::Pin, sync::Arc, @@ -33,14 +34,40 @@ mod util; pub mod redis; pub mod tokio_broadcast; -/// Enum dispatch over all supported backends -#[enum_dispatch(MessagingBackend)] -pub enum AnyMessagingBackend { - /// Redis backend - Redis(redis::RedisMessagingBackend), +impl_from! { + /// Enum dispatch over all supported backends + pub enum AnyMessagingBackend { + /// Redis backend + Redis(redis::RedisMessagingBackend), - /// Tokio broadcast backend - Tokio(tokio_broadcast::TokioBroadcastMessagingBackend), + /// Tokio broadcast backend + Tokio(tokio_broadcast::TokioBroadcastMessagingBackend), + } +} + +impl MessagingBackend for AnyMessagingBackend { + async fn enqueue(&self, channel_name: &str, message: Vec) -> Result<()> { + match self { + Self::Redis(redis) => redis.enqueue(channel_name, message).await, + Self::Tokio(tokio) => tokio.enqueue(channel_name, message).await, + } + } + + async fn message_stream( + &self, + channel_name: String, + ) -> Result>> + 'static> { + match self { + Self::Redis(redis) => redis + .message_stream(channel_name) + .await + .map(StreamExt::left_stream), + Self::Tokio(tokio) => tokio + .message_stream(channel_name) + .await + .map(StreamExt::right_stream), + } + } } /// Messaging backend @@ -50,17 +77,15 @@ pub enum AnyMessagingBackend { /// /// The trait is designed to be object-safe since it's internally stored inside an `Arc` /// and supposed to be type-erased for ease of testing. -#[enum_dispatch] -#[allow(async_fn_in_trait)] // Because of `enum_dispatch` pub trait MessagingBackend { /// Enqueue a new message onto the backend - async fn enqueue(&self, channel_name: &str, message: Vec) -> Result<()>; + fn enqueue(&self, channel_name: &str, message: Vec) -> impl Future>; /// Open a new stream of messages from the backend - async fn message_stream( + fn message_stream( &self, channel_name: String, - ) -> Result>>>; + ) -> impl Future>> + 'static>>; } pin_project! { @@ -116,7 +141,8 @@ where self.inner = self .backend .message_stream(self.channel_name.clone()) - .await?; + .await? + .boxed(); Ok(()) } @@ -211,7 +237,11 @@ impl MessagingHub { where M: DeserializeOwned + Serialize, { - let message_stream = self.backend.message_stream(channel_name.clone()).await?; + let message_stream = self + .backend + .message_stream(channel_name.clone()) + .await? + .boxed(); Ok(MessageConsumer { backend: self.backend.clone(), diff --git a/crates/kitsune-messaging/src/redis.rs b/crates/kitsune-messaging/src/redis.rs index 1d07e9100..0cdb626df 100644 --- a/crates/kitsune-messaging/src/redis.rs +++ b/crates/kitsune-messaging/src/redis.rs @@ -4,7 +4,7 @@ use crate::{util::TransparentDebug, MessagingBackend, Result}; use ahash::AHashMap; -use futures_util::{future, stream::BoxStream, StreamExt, TryStreamExt}; +use futures_util::{future, Stream, StreamExt, TryStreamExt}; use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt}; use redis::{ aio::{ConnectionManager, PubSub}, @@ -159,7 +159,7 @@ impl MessagingBackend for RedisMessagingBackend { async fn message_stream( &self, channel_name: String, - ) -> Result>>> { + ) -> Result>> + 'static> { let (sender, receiver) = oneshot::channel(); self.sub_actor .send(RegistrationMessage { @@ -169,8 +169,6 @@ impl MessagingBackend for RedisMessagingBackend { .await?; let broadcast_receiver = receiver.await?; - Ok(BroadcastStream::new(broadcast_receiver) - .map_err(Into::into) - .boxed()) + Ok(BroadcastStream::new(broadcast_receiver).map_err(Into::into)) } } diff --git a/crates/kitsune-messaging/src/tokio_broadcast.rs b/crates/kitsune-messaging/src/tokio_broadcast.rs index a3d0c9b9a..3919e309d 100644 --- a/crates/kitsune-messaging/src/tokio_broadcast.rs +++ b/crates/kitsune-messaging/src/tokio_broadcast.rs @@ -3,7 +3,7 @@ //! use crate::{MessagingBackend, Result}; -use futures_util::{stream::BoxStream, StreamExt, TryStreamExt}; +use futures_util::{Stream, StreamExt, TryStreamExt}; use std::{collections::HashMap, sync::RwLock}; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; @@ -36,7 +36,7 @@ impl MessagingBackend for TokioBroadcastMessagingBackend { async fn message_stream( &self, channel_name: String, - ) -> Result>>> { + ) -> Result>> + 'static> { let guard = self.registry.read().unwrap(); let receiver = if let Some(sender) = guard.get(&channel_name) { sender.subscribe() diff --git a/crates/kitsune-storage/Cargo.toml b/crates/kitsune-storage/Cargo.toml index ca4a4c8f7..da06ffad6 100644 --- a/crates/kitsune-storage/Cargo.toml +++ b/crates/kitsune-storage/Cargo.toml @@ -5,11 +5,11 @@ edition.workspace = true [dependencies] bytes = "1.5.0" -enum_dispatch = "0.3.12" futures-util = "0.3.29" http = "0.2.9" hyper = { version = "0.14.27", features = ["stream"] } kitsune-http-client = { path = "../kitsune-http-client" } +kitsune-util = { path = "../kitsune-util" } rusty-s3 = { version = "0.5.0", default-features = false } tokio = { version = "1.33.0", features = ["fs", "io-util"] } tokio-util = { version = "0.7.10", features = ["io"] } diff --git a/crates/kitsune-storage/src/fs.rs b/crates/kitsune-storage/src/fs.rs index a150aac4f..71fdbe168 100644 --- a/crates/kitsune-storage/src/fs.rs +++ b/crates/kitsune-storage/src/fs.rs @@ -4,7 +4,7 @@ use crate::{Result, StorageBackend}; use bytes::Bytes; -use futures_util::{pin_mut, stream::BoxStream, Stream, StreamExt, TryStreamExt}; +use futures_util::{pin_mut, Stream, StreamExt, TryStreamExt}; use std::path::PathBuf; use tokio::{ fs::{self, File}, @@ -34,9 +34,9 @@ impl StorageBackend for Storage { Ok(()) } - async fn get(&self, path: &str) -> Result>> { + async fn get(&self, path: &str) -> Result> + 'static> { let file = File::open(self.storage_dir.join(path)).await?; - Ok(ReaderStream::new(file).map_err(Into::into).boxed()) + Ok(ReaderStream::new(file).map_err(Into::into)) } async fn put(&self, path: &str, input_stream: T) -> Result<()> diff --git a/crates/kitsune-storage/src/lib.rs b/crates/kitsune-storage/src/lib.rs index 82b9c8ff5..86f56cb40 100644 --- a/crates/kitsune-storage/src/lib.rs +++ b/crates/kitsune-storage/src/lib.rs @@ -4,9 +4,9 @@ #![allow(forbidden_lint_groups)] use bytes::Bytes; -use enum_dispatch::enum_dispatch; -use futures_util::{stream::BoxStream, Stream}; -use std::error::Error; +use futures_util::{Stream, StreamExt}; +use kitsune_util::impl_from; +use std::{error::Error, future::Future}; pub mod fs; pub mod s3; @@ -18,28 +18,56 @@ pub type BoxError = Box; pub type Result = std::result::Result; /// Trait abstraction over storage backends -#[enum_dispatch] -#[allow(async_fn_in_trait)] // Because of `enum_dispatch` pub trait StorageBackend: Clone + Send + Sync { /// Delete something from the object storage - async fn delete(&self, path: &str) -> Result<()>; + fn delete(&self, path: &str) -> impl Future>; /// Stream something from the object storage - async fn get(&self, path: &str) -> Result>>; + fn get( + &self, + path: &str, + ) -> impl Future> + 'static>>; /// Stream something onto the object storage - async fn put(&self, path: &str, input_stream: T) -> Result<()> + fn put(&self, path: &str, input_stream: T) -> impl Future> where T: Stream> + Send + 'static; } -#[derive(Clone)] -#[enum_dispatch(StorageBackend)] -/// Combined storage enum for enum dispatch -pub enum AnyStorageBackend { - /// File system-backed storage - Fs(fs::Storage), +impl_from! { + #[derive(Clone)] + /// Combined storage enum for enum dispatch + pub enum AnyStorageBackend { + /// File system-backed storage + Fs(fs::Storage), + + /// S3-backed storage + S3(s3::Storage), + } +} + +impl StorageBackend for AnyStorageBackend { + async fn delete(&self, path: &str) -> Result<()> { + match self { + Self::Fs(fs) => fs.delete(path).await, + Self::S3(s3) => s3.delete(path).await, + } + } - /// S3-backed storage - S3(s3::Storage), + async fn get(&self, path: &str) -> Result> + 'static> { + match self { + Self::Fs(fs) => fs.get(path).await.map(StreamExt::left_stream), + Self::S3(s3) => s3.get(path).await.map(StreamExt::right_stream), + } + } + + async fn put(&self, path: &str, input_stream: T) -> Result<()> + where + T: Stream> + Send + 'static, + { + match self { + Self::Fs(fs) => fs.put(path, input_stream).await, + Self::S3(s3) => s3.put(path, input_stream).await, + } + } } diff --git a/crates/kitsune-storage/src/s3.rs b/crates/kitsune-storage/src/s3.rs index 06ffc933a..bb028b4c3 100644 --- a/crates/kitsune-storage/src/s3.rs +++ b/crates/kitsune-storage/src/s3.rs @@ -4,7 +4,7 @@ use crate::{Result, StorageBackend}; use bytes::Bytes; -use futures_util::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; +use futures_util::{Stream, StreamExt, TryStreamExt}; use http::Request; use hyper::Body; use kitsune_http_client::Client as HttpClient; @@ -103,7 +103,7 @@ impl StorageBackend for Storage { self.client.delete_object(path).await } - async fn get(&self, path: &str) -> Result>> { + async fn get(&self, path: &str) -> Result> + 'static> { let stream = self.client.get_object(path).await?.boxed(); Ok(stream) } diff --git a/crates/kitsune-util/Cargo.toml b/crates/kitsune-util/Cargo.toml new file mode 100644 index 000000000..81ddafb59 --- /dev/null +++ b/crates/kitsune-util/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "kitsune-util" +edition.workspace = true +version.workspace = true + +[dependencies] diff --git a/crates/kitsune-util/src/lib.rs b/crates/kitsune-util/src/lib.rs new file mode 100644 index 000000000..992f6dd8c --- /dev/null +++ b/crates/kitsune-util/src/lib.rs @@ -0,0 +1 @@ +mod macros; diff --git a/crates/kitsune-util/src/macros.rs b/crates/kitsune-util/src/macros.rs new file mode 100644 index 000000000..0527c2fbd --- /dev/null +++ b/crates/kitsune-util/src/macros.rs @@ -0,0 +1,43 @@ +/// Implement the `From` trait for each branch of an enum +/// +/// ``` +/// # use kitsune_util::impl_from; +/// impl_from! { +/// #[derive(Debug, PartialEq)] +/// enum Test { +/// A(i32), +/// B(u32), +/// } +/// } +/// +/// assert_eq!(Test::from(1_i32), Test::A(1)); +/// assert_eq!(Test::from(2_u32), Test::B(2)); +/// ``` +#[macro_export] +macro_rules! impl_from { + ( + $(#[$top_annotation:meta])* + $vb:vis enum $name:ident { + $( + $(#[$branch_annotation:meta])* + $branch_name:ident ($from_type:ty) + ),+ + $(,)* + }) => { + $(#[$top_annotation])* + $vb enum $name { + $( + $(#[$branch_annotation])* + $branch_name($from_type), + )* + } + + $( + impl From<$from_type> for $name { + fn from(val: $from_type) -> Self { + Self::$branch_name(val) + } + } + )* + }; +}