Skip to content

Commit

Permalink
Greatly simplify dispatching system to avoid downcasting
Browse files Browse the repository at this point in the history
  • Loading branch information
morr0ne committed Feb 20, 2025
1 parent b9eb72b commit 06c9b54
Show file tree
Hide file tree
Showing 14 changed files with 5,434 additions and 7,289 deletions.
21 changes: 8 additions & 13 deletions gen/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ pub fn generate_server_code(current: &[Pair], pairs: &[Pair]) -> TokenStream {

let handler_args = if dispatchers.is_empty() {
quote! {
_object: &crate::server::Object,
_client: &mut crate::server::Client,
_sender_id: crate::wire::ObjectId,
}
} else {
quote! {
object: &crate::server::Object,
client: &mut crate::server::Client,
sender_id: crate::wire::ObjectId,
}
};

Expand All @@ -58,11 +58,6 @@ pub fn generate_server_code(current: &[Pair], pairs: &[Pair]) -> TokenStream {
const INTERFACE: &'static str = #name;
const VERSION: u32 = #version;

fn into_object(self, id: crate::wire::ObjectId) -> crate::server::Object where Self: Sized
{
crate::server::Object::new(id, self)
}

async fn handle_request(
&self,
#handler_args
Expand Down Expand Up @@ -110,7 +105,7 @@ fn write_dispatchers(interface: &Interface) -> Vec<TokenStream> {
let mut tracing_fmt = Vec::new();
let mut tracing_args = Vec::new();

let mut args = vec![quote! { object }, quote! { client }];
let mut args = vec![quote! { client }, quote! { sender_id }];
let mut setters = Vec::new();

for arg in &request.args {
Expand Down Expand Up @@ -173,7 +168,7 @@ fn write_dispatchers(interface: &Interface) -> Vec<TokenStream> {
#opcode => {
#(#setters)*

tracing::debug!(#tracing_inner, object.id, #(#tracing_args),*);
tracing::debug!(#tracing_inner, sender_id, #(#tracing_args),*);
self.#name(#(#args),*).await
}
});
Expand All @@ -189,8 +184,8 @@ fn write_requests(pairs: &[Pair], pair: &Pair, interface: &Interface) -> Vec<Tok
let name = make_ident(request.name.to_snek_case());
let mut args = vec![
quote! {&self },
quote! {object: &crate::server::Object},
quote! {client: &mut crate::server::Client},
quote! {sender_id: crate::wire::ObjectId},
];

for arg in &request.args {
Expand Down Expand Up @@ -225,8 +220,8 @@ fn write_events(pairs: &[Pair], pair: &Pair, interface: &Interface) -> Vec<Token

let mut args = vec![
quote! {&self },
quote! {object: &crate::server::Object},
quote! {client: &mut crate::server::Client},
quote! {sender_id: crate::wire::ObjectId},
];

let mut tracing_fmt = Vec::new();
Expand Down Expand Up @@ -321,14 +316,14 @@ fn write_events(pairs: &[Pair], pair: &Pair, interface: &Interface) -> Vec<Token
events.push(quote! {
#(#docs)*
async fn #name(#(#args),*) -> crate::server::Result<()> {
tracing::debug!(#tracing_inner, object.id, #(#tracing_args),*);
tracing::debug!(#tracing_inner, sender_id, #(#tracing_args),*);

let (payload,fds) = crate::wire::PayloadBuilder::new()
#(#build_args)*
.build();

client
.send_message(crate::wire::Message::new(object.id, #opcode, payload, fds))
.send_message(crate::wire::Message::new(sender_id, #opcode, payload, fds))
.await
.map_err(crate::server::error::Error::IoError)
}
Expand Down
4 changes: 2 additions & 2 deletions macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ pub fn derive_dispatcher(input: TokenStream) -> TokenStream {
impl waynest::server::Dispatcher for #ident {
async fn dispatch(
&self,
object: &waynest::server::Object,
client: &mut waynest::server::Client,
sender_id: waynest::wire::ObjectId,
message: &mut waynest::wire::Message,
) -> Result<()> {
self.handle_request(object, client, message).await
self.handle_request(client, sender_id, message).await
}
}
}
Expand Down
44 changes: 9 additions & 35 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use waynest_macros::Dispatcher;

use async_trait::async_trait;
use core::fmt;
use downcast_rs::{impl_downcast, DowncastSync};
use futures_util::{SinkExt, TryStreamExt};
use std::{collections::HashMap, io, sync::Arc};
use tokio::net::UnixStream;
Expand Down Expand Up @@ -45,14 +44,14 @@ impl Client {
prev
}

pub fn insert(&mut self, object: Object) {
self.store.insert(object)
pub fn insert<D: Dispatcher + 'static>(&mut self, sender_id: ObjectId, object: D) {
self.store.insert(sender_id, Arc::new(object))
}

pub async fn handle_message(&mut self, message: &mut Message) -> Result<()> {
let object = self.store.get(&message.object_id).ok_or(Error::Internal)?;

object.dispatch(self, message).await
object.dispatch(self, message.object_id, message).await
}

pub async fn next_message(&mut self) -> Result<Option<Message>> {
Expand All @@ -66,31 +65,8 @@ impl Client {
}
}

#[derive(Clone)]
pub struct Object {
pub id: ObjectId,
dispatcher: Arc<dyn Dispatcher>,
}

impl Object {
pub fn new<D: Dispatcher>(id: ObjectId, dispatcher: D) -> Self {
Self {
id,
dispatcher: Arc::new(dispatcher),
}
}

pub fn as_dispatcher<D: Dispatcher>(&self) -> Result<&D> {
self.dispatcher.downcast_ref().ok_or(Error::Internal)
}

async fn dispatch(&self, client: &mut Client, message: &mut Message) -> Result<()> {
self.dispatcher.dispatch(self, client, message).await
}
}

struct Store {
objects: HashMap<ObjectId, Object>,
objects: HashMap<ObjectId, Arc<dyn Dispatcher>>,
}

impl Store {
Expand All @@ -100,23 +76,21 @@ impl Store {
}
}
// FIXME: handle possible error if id already exists
fn insert(&mut self, object: Object) {
self.objects.insert(object.id, object);
fn insert(&mut self, sender_id: ObjectId, object: Arc<dyn Dispatcher>) {
self.objects.insert(sender_id, object);
}

fn get(&self, id: &ObjectId) -> Option<Object> {
fn get(&self, id: &ObjectId) -> Option<Arc<dyn Dispatcher>> {
self.objects.get(id).cloned()
}
}

#[async_trait]
pub trait Dispatcher: DowncastSync {
pub trait Dispatcher: Send + Sync {
async fn dispatch(
&self,
object: &Object,
client: &mut Client,
sender_id: ObjectId,
message: &mut Message,
) -> Result<()>;
}

impl_downcast!(sync Dispatcher);
Loading

0 comments on commit 06c9b54

Please sign in to comment.