Skip to content

Commit

Permalink
refactor chat server join room method (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
darioalessandro authored Jun 21, 2023
1 parent 5c07def commit 24ecea6
Showing 1 changed file with 67 additions and 27 deletions.
94 changes: 67 additions & 27 deletions actix-api/src/actors/chat_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use types::protos::media_packet::MediaPacket;
use super::chat_session::{RoomId, SessionId};

pub struct ChatServer {
nc: nats::Connection,
nats_connection: nats::Connection,
sessions: HashMap<SessionId, Recipient<Message>>,
active_subs: HashMap<SessionId, nats::Handler>,
}
Expand All @@ -24,7 +24,7 @@ impl ChatServer {
.connect(std::env::var("NATS_URL").expect("NATS_URL env var must be defined"))
.unwrap();
ChatServer {
nc,
nats_connection: nc,
active_subs: HashMap::new(),
sessions: HashMap::new(),
}
Expand All @@ -33,7 +33,7 @@ impl ChatServer {
pub fn send_message(&self, room: &RoomId, message: Arc<MediaPacket>, session_id: SessionId) {
let subject = format!("room.{}.{}", room, session_id);
if let Ok(message) = message.write_to_bytes() {
match self.nc.publish(&subject, message) {
match self.nats_connection.publish(&subject, message) {
Ok(_) => trace!("published message to {}", subject),
Err(e) => error!("error publishing message to {}: {}", subject, e),
}
Expand Down Expand Up @@ -105,46 +105,86 @@ impl Handler<JoinRoom> for ChatServer {
) -> Self::Result {
self.leave_rooms(&session);

let subject = format!("room.{}.*", room);
let queue = format!("{}-{}", session, room);
let session_recipient = self.sessions.get(&session).unwrap().clone();
let room_clone = room.clone();
let session_clone = session.clone();
let sub = match self.nc.queue_subscribe(&subject, &queue) {
Ok(sub) => sub,
Err(e) => {
let err = format!("error subscribing to subject {}: {}", subject, e);
let (subject, queue) = build_subject_and_queue(&room, &session);
let session_recipient = match self.sessions.get(&session) {
Some(recipient) => recipient.clone(),
None => {
let err = format!("session {} is not connected", session);
error!("{}", err);
return MessageResult(Err(err));
}
};
let handler = sub.with_handler(move |msg| {
if msg.subject == format!("room.{}.{}", room_clone, session_clone) {
return Ok(());
}
let msg = MediaPacket::parse_from_bytes(&msg.data).unwrap();
let msg = Message {
nickname: Arc::new(Some(msg.email.clone())),
msg: Arc::new(msg),
};

session_recipient.try_send(msg).map_err(|e| {
error!("error sending message to session {}: {}", session_clone, e);
std::io::Error::new(std::io::ErrorKind::Other, e)
})
});

let sub = match self
.nats_connection
.queue_subscribe(&subject, &queue)
.map_err(|e| handle_subscription_error(e, &subject))
{
Ok(sub) => sub,
Err(e) => return MessageResult(Err(e)),
};

let handler = sub.with_handler(build_handler(
session_recipient,
room.clone(),
session.clone(),
));

debug!("Subscribed to subject {} with queue {}", subject, queue);

let result = self
.active_subs
.insert(session.clone(), handler)
.map(|_| ())
.ok_or("The session is already subscribed".into());

info!(
"someone connected to room {} with session {} result {:?}",
room,
session.trim(),
result
);

MessageResult(result)
}
}

fn build_subject_and_queue(room: &str, session: &str) -> (String, String) {
(format!("room.{}.*", room), format!("{}-{}", session, room))
}

fn handle_subscription_error(e: impl std::fmt::Display, subject: &str) -> String {
let err = format!("error subscribing to subject {}: {}", subject, e);
error!("{}", err);
err
}

fn build_handler(
session_recipient: Recipient<Message>, // Assuming Recipient is a type
room: String,
session: String,
) -> impl Fn(nats::Message) -> Result<(), std::io::Error> {
move |msg| {
if msg.subject == format!("room.{}.{}", room, session) {
return Ok(());
}

let media_packet = match MediaPacket::parse_from_bytes(&msg.data) {
Ok(media_packet) => media_packet,
Err(e) => {
error!("error parsing message: {}", e);
return Err(std::io::Error::new(std::io::ErrorKind::Other, e));
}
};

let message = Message {
nickname: Arc::new(Some(media_packet.email.clone())),
msg: Arc::new(media_packet),
};

session_recipient.try_send(message).map_err(|e| {
error!("error sending message to session {}: {}", session, e);
std::io::Error::new(std::io::ErrorKind::Other, e)
})
}
}

0 comments on commit 24ecea6

Please sign in to comment.