Skip to content

Commit

Permalink
feat(gateway): impl Stream for Shard
Browse files Browse the repository at this point in the history
  • Loading branch information
vilgotf committed Jan 15, 2024
1 parent f875e7f commit d269aad
Show file tree
Hide file tree
Showing 10 changed files with 495 additions and 1,532 deletions.
91 changes: 16 additions & 75 deletions twilight-gateway/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@
//!
//! [`Shard::send`]: crate::Shard::send
use crate::{
command::{self, Command},
error::{SendError, SendErrorType},
CloseFrame,
};
use tokio::sync::mpsc::{self, error::TrySendError};
use crate::{command::Command, json, CloseFrame};
use tokio::sync::mpsc;

/// Channel between a user and shard for sending outgoing gateway messages.
#[derive(Debug)]
Expand Down Expand Up @@ -72,30 +68,20 @@ impl MessageSender {

/// Send a command to the associated shard.
///
/// # Errors
/// # Panics
///
/// Returns a [`SendErrorType::Sending`] error type if the channel is
/// closed.
///
/// Returns a [`SendErrorType::Serializing`] error type if the provided
/// command failed to serialize.
pub fn command(&self, command: &impl Command) -> Result<(), SendError> {
let json = command::prepare(command)?;

self.send(json)
/// Panics if the channel is closed.
pub fn command(&self, command: &impl Command) {
self.send(json::to_string(command).unwrap());
}

/// Send a JSON encoded gateway event to the associated shard.
///
/// # Errors
/// # Panics
///
/// Returns a [`SendErrorType::Sending`] error type if the channel is
/// closed.
pub fn send(&self, json: String) -> Result<(), SendError> {
self.command.send(json).map_err(|_| SendError {
kind: SendErrorType::Sending,
source: None,
})
/// Panics if the channel is closed.
pub fn send(&self, json: String) {
self.command.send(json).expect("channel should be open");
}

/// Send a Websocket close frame to the associated shard.
Expand All @@ -106,69 +92,24 @@ impl MessageSender {
///
/// See the [`Shard::close`] docs for further information.
///
/// # Errors
/// # Panics
///
/// Returns a [`SendErrorType::Sending`] error type if the channel is
/// closed.
/// Panics if the channel is closed.
///
/// [`Shard::close`]: crate::Shard::close
pub fn close(&self, close_frame: CloseFrame<'static>) -> Result<(), SendError> {
match self.close.try_send(close_frame) {
Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
_ => Err(SendError {
kind: SendErrorType::Sending,
source: None,
}),
pub fn close(&self, close_frame: CloseFrame<'static>) {
if let Err(e @ mpsc::error::TrySendError::Closed(_)) = self.close.try_send(close_frame) {
panic!("channel should be open: {e:?}")
}
}
}

#[cfg(test)]
mod tests {
use super::{MessageChannel, MessageSender};
use crate::json;
use static_assertions::assert_impl_all;
use std::{error::Error, fmt::Debug};
use twilight_model::{
gateway::{
payload::outgoing::{Heartbeat, RequestGuildMembers},
CloseFrame,
},
id::Id,
};
use std::fmt::Debug;

assert_impl_all!(MessageChannel: Debug, Send, Sync);
assert_impl_all!(MessageSender: Clone, Debug, Send, Sync);

#[test]
fn channel_sending() -> Result<(), Box<dyn Error>> {
let mut channel = MessageChannel::new();
let sender = channel.sender();
assert!(channel.command_rx.try_recv().is_err());
assert!(channel.close_rx.try_recv().is_err());

let frame = CloseFrame::NORMAL;
let request = RequestGuildMembers::builder(Id::new(1)).query("", None);
let heartbeat = Heartbeat::new(Some(30_000));
let heartbeat_string = json::to_string(&heartbeat)?;
assert!(sender.command(&request).is_ok());
assert!(sender.send(heartbeat_string.clone()).is_ok());
assert!(sender.close(frame.clone()).is_ok());
assert!(sender.close(frame.clone()).is_ok());

assert_eq!(request, json::from_str(&channel.command_rx.try_recv()?)?);
assert_eq!(heartbeat_string, channel.command_rx.try_recv()?);
assert_eq!(frame, channel.close_rx.try_recv()?);
assert!(channel.close_rx.try_recv().is_err());

assert!(!sender.is_closed());
drop(channel);
assert!(sender.is_closed());

assert!(sender.command(&request).is_err());
assert!(sender.send(heartbeat_string).is_err());
assert!(sender.close(frame).is_err());

Ok(())
}
}
30 changes: 1 addition & 29 deletions twilight-gateway/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@
//!
//! [`Shard::command`]: crate::Shard::command
use crate::{
error::{SendError, SendErrorType},
json,
};
use serde::Serialize;
use twilight_model::gateway::payload::outgoing::{
RequestGuildMembers, UpdatePresence, UpdateVoiceState,
};
Expand Down Expand Up @@ -52,38 +47,15 @@ impl Command for RequestGuildMembers {}
impl Command for UpdatePresence {}
impl Command for UpdateVoiceState {}

/// Prepare a command for sending by serializing it.
///
/// # Errors
///
/// Returns a [`SendErrorType::Serializing`] error type if the provided value
/// failed to serialize into JSON.
pub fn prepare(command: &impl Serialize) -> Result<String, SendError> {
json::to_string(command).map_err(|source| SendError {
source: Some(Box::new(source)),
kind: SendErrorType::Serializing,
})
}

#[cfg(test)]
mod tests {
use super::Command;
use crate::json;
use static_assertions::assert_impl_all;
use twilight_model::gateway::payload::outgoing::{
Heartbeat, RequestGuildMembers, UpdatePresence, UpdateVoiceState,
RequestGuildMembers, UpdatePresence, UpdateVoiceState,
};

assert_impl_all!(RequestGuildMembers: Command);
assert_impl_all!(UpdatePresence: Command);
assert_impl_all!(UpdateVoiceState: Command);

#[test]
fn prepare() {
let heartbeat = Heartbeat::new(Some(30_000));
let string = json::to_string(&heartbeat).unwrap();
let message = super::prepare(&heartbeat).unwrap();

assert_eq!(message, string);
}
}
7 changes: 1 addition & 6 deletions twilight-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct Config<Q = InMemoryQueue> {
/// TLS connector for Websocket connections.
// We need this to be public so [`stream`] can reuse TLS on multiple shards
// if unconfigured.
tls: Arc<Connector>,
pub(crate) tls: Arc<Connector>,
/// Token used to authenticate when identifying with the gateway.
///
/// The token is prefixed with "Bot ", which is required by Discord for
Expand Down Expand Up @@ -136,11 +136,6 @@ impl<Q> Config<Q> {
self.ratelimit_messages
}

/// Immutable reference to the TLS connector in use by the shard.
pub(crate) fn tls(&self) -> &Connector {
&self.tls
}

/// Immutable reference to the token used to authenticate when identifying
/// with the gateway.
pub const fn token(&self) -> &str {
Expand Down
137 changes: 0 additions & 137 deletions twilight-gateway/src/connection.rs

This file was deleted.

Loading

0 comments on commit d269aad

Please sign in to comment.