Skip to content

Commit

Permalink
use a lot more impl in traits
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Nov 9, 2023
1 parent 8fba940 commit 5ff08fc
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 79 deletions.
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"crates/kitsune-storage",
"crates/kitsune-test",
"crates/kitsune-type",
"crates/kitsune-util",
"kitsune",
"kitsune-cli",
"kitsune-job-runner",
Expand Down
1 change: 1 addition & 0 deletions crates/kitsune-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ kitsune-messaging = { path = "../kitsune-messaging" }
kitsune-search = { path = "../kitsune-search" }
kitsune-storage = { path = "../kitsune-storage" }
kitsune-type = { path = "../kitsune-type" }
kitsune-util = { path = "../kitsune-util" }
mime = "0.3.17"
mime_guess = { version = "2.0.4", default-features = false }
password-hash = { version = "0.5.0", features = ["std"] }
Expand Down
29 changes: 1 addition & 28 deletions crates/kitsune-core/src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use kitsune_db::{
schema::job_context,
PgPool,
};
use kitsune_util::impl_from;
use scoped_futures::ScopedFutureExt;
use serde::{Deserialize, Serialize};
use speedy_uuid::Uuid;
Expand All @@ -27,34 +28,6 @@ pub mod mailing;

const MAX_CONCURRENT_REQUESTS: usize = 10;

macro_rules! impl_from {
(
$(#[$top_annotation:meta])*
$vb:vis enum $name:ident {
$(
$(#[$branch_annotation:meta])*
$branch_name:ident ($from_type:ty)
),+
$(,)*
}) => {
$(#[$top_annotation])*
$vb enum $name {
$(
$(#[$branch_annotation])*
$branch_name($from_type),
)*
}

$(
impl From<$from_type> for $name {
fn from(val: $from_type) -> Self {
Self::$branch_name(val)
}
}
)*
};
}

pub struct JobRunnerContext {
pub deliverer: Deliverer,
pub state: State,
Expand Down
7 changes: 4 additions & 3 deletions crates/kitsune-core/src/service/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,24 @@ impl AttachmentService {
pub async fn stream_file(
&self,
media_attachment: &MediaAttachment,
) -> Result<impl Stream<Item = Result<Bytes>>> {
) -> Result<impl Stream<Item = Result<Bytes>> + 'static> {
// TODO: Find way to avoid boxing the streams here
if let Some(ref file_path) = media_attachment.file_path {
let stream = self
.storage_backend
.get(file_path.as_str())
.await
.map_err(Error::Storage)?;

Ok(stream.map_err(Error::Storage).left_stream())
Ok(stream.map_err(Error::Storage).boxed())
} else if self.media_proxy_enabled {
Ok(self
.client
.get(media_attachment.remote_url.as_ref().unwrap())
.await?
.stream()
.map_err(Into::into)
.right_stream())
.boxed())
} else {
Err(ApiError::NotFound.into())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-messaging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ edition.workspace = true

[dependencies]
ahash = "0.8.6"
enum_dispatch = "0.3.12"
futures-util = "0.3.29"
kitsune-retry-policies = { path = "../kitsune-retry-policies" }
kitsune-util = { path = "../kitsune-util" }
pin-project-lite = "0.2.13"
redis = { version = "0.23.3", features = [
"aio",
Expand Down
62 changes: 46 additions & 16 deletions crates/kitsune-messaging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
#[macro_use]
extern crate tracing;

use enum_dispatch::enum_dispatch;
use futures_util::{stream::BoxStream, Stream};
use futures_util::{stream::BoxStream, Stream, StreamExt};
use kitsune_util::impl_from;
use pin_project_lite::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{
error::Error,
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
Expand All @@ -33,14 +34,40 @@ mod util;
pub mod redis;
pub mod tokio_broadcast;

/// Enum dispatch over all supported backends
#[enum_dispatch(MessagingBackend)]
pub enum AnyMessagingBackend {
/// Redis backend
Redis(redis::RedisMessagingBackend),
impl_from! {
/// Enum dispatch over all supported backends
pub enum AnyMessagingBackend {
/// Redis backend
Redis(redis::RedisMessagingBackend),

/// Tokio broadcast backend
Tokio(tokio_broadcast::TokioBroadcastMessagingBackend),
/// Tokio broadcast backend
Tokio(tokio_broadcast::TokioBroadcastMessagingBackend),
}
}

impl MessagingBackend for AnyMessagingBackend {
async fn enqueue(&self, channel_name: &str, message: Vec<u8>) -> Result<()> {
match self {
Self::Redis(redis) => redis.enqueue(channel_name, message).await,
Self::Tokio(tokio) => tokio.enqueue(channel_name, message).await,
}
}

async fn message_stream(
&self,
channel_name: String,
) -> Result<impl Stream<Item = Result<Vec<u8>>> + 'static> {
match self {
Self::Redis(redis) => redis
.message_stream(channel_name)
.await
.map(StreamExt::left_stream),
Self::Tokio(tokio) => tokio
.message_stream(channel_name)
.await
.map(StreamExt::right_stream),
}
}
}

/// Messaging backend
Expand All @@ -50,17 +77,15 @@ pub enum AnyMessagingBackend {
///
/// The trait is designed to be object-safe since it's internally stored inside an `Arc`
/// and supposed to be type-erased for ease of testing.
#[enum_dispatch]
#[allow(async_fn_in_trait)] // Because of `enum_dispatch`
pub trait MessagingBackend {
/// Enqueue a new message onto the backend
async fn enqueue(&self, channel_name: &str, message: Vec<u8>) -> Result<()>;
fn enqueue(&self, channel_name: &str, message: Vec<u8>) -> impl Future<Output = Result<()>>;

/// Open a new stream of messages from the backend
async fn message_stream(
fn message_stream(
&self,
channel_name: String,
) -> Result<BoxStream<'static, Result<Vec<u8>>>>;
) -> impl Future<Output = Result<impl Stream<Item = Result<Vec<u8>>> + 'static>>;
}

pin_project! {
Expand Down Expand Up @@ -116,7 +141,8 @@ where
self.inner = self
.backend
.message_stream(self.channel_name.clone())
.await?;
.await?
.boxed();

Ok(())
}
Expand Down Expand Up @@ -211,7 +237,11 @@ impl MessagingHub {
where
M: DeserializeOwned + Serialize,
{
let message_stream = self.backend.message_stream(channel_name.clone()).await?;
let message_stream = self
.backend
.message_stream(channel_name.clone())
.await?
.boxed();

Ok(MessageConsumer {
backend: self.backend.clone(),
Expand Down
8 changes: 3 additions & 5 deletions crates/kitsune-messaging/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{util::TransparentDebug, MessagingBackend, Result};
use ahash::AHashMap;
use futures_util::{future, stream::BoxStream, StreamExt, TryStreamExt};
use futures_util::{future, Stream, StreamExt, TryStreamExt};
use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt};
use redis::{
aio::{ConnectionManager, PubSub},
Expand Down Expand Up @@ -159,7 +159,7 @@ impl MessagingBackend for RedisMessagingBackend {
async fn message_stream(
&self,
channel_name: String,
) -> Result<BoxStream<'static, Result<Vec<u8>>>> {
) -> Result<impl Stream<Item = Result<Vec<u8>>> + 'static> {
let (sender, receiver) = oneshot::channel();
self.sub_actor
.send(RegistrationMessage {
Expand All @@ -169,8 +169,6 @@ impl MessagingBackend for RedisMessagingBackend {
.await?;
let broadcast_receiver = receiver.await?;

Ok(BroadcastStream::new(broadcast_receiver)
.map_err(Into::into)
.boxed())
Ok(BroadcastStream::new(broadcast_receiver).map_err(Into::into))
}
}
4 changes: 2 additions & 2 deletions crates/kitsune-messaging/src/tokio_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//!
use crate::{MessagingBackend, Result};
use futures_util::{stream::BoxStream, StreamExt, TryStreamExt};
use futures_util::{Stream, StreamExt, TryStreamExt};
use std::{collections::HashMap, sync::RwLock};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
Expand Down Expand Up @@ -36,7 +36,7 @@ impl MessagingBackend for TokioBroadcastMessagingBackend {
async fn message_stream(
&self,
channel_name: String,
) -> Result<BoxStream<'static, Result<Vec<u8>>>> {
) -> Result<impl Stream<Item = Result<Vec<u8>>> + 'static> {
let guard = self.registry.read().unwrap();
let receiver = if let Some(sender) = guard.get(&channel_name) {
sender.subscribe()
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ edition.workspace = true

[dependencies]
bytes = "1.5.0"
enum_dispatch = "0.3.12"
futures-util = "0.3.29"
http = "0.2.9"
hyper = { version = "0.14.27", features = ["stream"] }
kitsune-http-client = { path = "../kitsune-http-client" }
kitsune-util = { path = "../kitsune-util" }
rusty-s3 = { version = "0.5.0", default-features = false }
tokio = { version = "1.33.0", features = ["fs", "io-util"] }
tokio-util = { version = "0.7.10", features = ["io"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/kitsune-storage/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{Result, StorageBackend};
use bytes::Bytes;
use futures_util::{pin_mut, stream::BoxStream, Stream, StreamExt, TryStreamExt};
use futures_util::{pin_mut, Stream, StreamExt, TryStreamExt};
use std::path::PathBuf;
use tokio::{
fs::{self, File},
Expand Down Expand Up @@ -34,9 +34,9 @@ impl StorageBackend for Storage {
Ok(())
}

async fn get(&self, path: &str) -> Result<BoxStream<'static, Result<Bytes>>> {
async fn get(&self, path: &str) -> Result<impl Stream<Item = Result<Bytes>> + 'static> {
let file = File::open(self.storage_dir.join(path)).await?;
Ok(ReaderStream::new(file).map_err(Into::into).boxed())
Ok(ReaderStream::new(file).map_err(Into::into))
}

async fn put<T>(&self, path: &str, input_stream: T) -> Result<()>
Expand Down
Loading

0 comments on commit 5ff08fc

Please sign in to comment.