Skip to content

Commit

Permalink
fixed websocket duplication bug
Browse files Browse the repository at this point in the history
made button swap play/pause
  • Loading branch information
Klaven committed May 22, 2024
1 parent 69f0ead commit 31992c5
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 76 deletions.
97 changes: 44 additions & 53 deletions frontend_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use axum::{routing::get, Router};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::stream::SelectNextSome;
use futures_channel::mpsc::unbounded;
use futures_util::{SinkExt, StreamExt};
use maud::{html, Markup};
use maud::html;
use messages::DisplayMessage;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::{
collections::HashMap,
Expand All @@ -21,26 +19,11 @@ use tokio_tungstenite::{
tungstenite::{Error, Message, Result},
};

type Tx = UnboundedSender<Message>;
pub type ConnectionMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
type EventQueues = Arc<Mutex<Queues>>;
mod routes;
mod types;
use routes::{admin, index};

static EVENT_QUEUE_ACTIVE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true);
static TTS_QUEUE_ACTIVE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);

pub struct Queues {
pub events: VecDeque<DisplayMessage>,
pub tts: VecDeque<DisplayMessage>,
}

impl Queues {
pub fn new() -> Queues {
Queues {
events: VecDeque::new(),
tts: VecDeque::new(),
}
}
}
use crate::types::{ConnectionMap, EventQueues, Queues};

pub struct FrontendApi {
ws_address: String,
Expand Down Expand Up @@ -82,19 +65,26 @@ impl FrontendApi {
});

// Process the Queues on a new thread

let queue_connection_state = connection_state.clone();
let event_queue = message_queue_arc.clone();
tokio::spawn(async move {
loop {
let active = types::EVENT_QUEUE_ACTIVE.load(std::sync::atomic::Ordering::SeqCst);
if !active {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
continue;
}

let message = {
let mut queues = message_queue_arc.lock().unwrap();
queues.events.pop_front()
let mut queues = event_queue.lock().unwrap();
queues.unpublished_events.pop_front()
};

let Some(message) = message else {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
continue;
};

//Make html message to send to frontend
//<div id="alerts" hx-swap-oob="true">
let html_message = html! {
Expand All @@ -103,6 +93,10 @@ impl FrontendApi {
img src=(message.image_url) {}
}
};

let mut bad_websockets = vec![];

//Send message to all connected websockets
{
let mut websocket_state = queue_connection_state.lock().unwrap();
for (&addr, tx) in websocket_state.iter_mut() {
Expand All @@ -111,6 +105,7 @@ impl FrontendApi {
.is_err()
{
println!("closing websocket message to: {} ==========", addr);
bad_websockets.push(addr);
}
}
}
Expand All @@ -130,6 +125,7 @@ impl FrontendApi {
.is_err()
{
println!("closing websocket message to: {} ==========", addr);
bad_websockets.push(addr);
}
}
}
Expand All @@ -140,6 +136,7 @@ impl FrontendApi {
});

let https_address = self.http_address.clone();
let event_queues = message_queue_arc.clone();
tokio::spawn(async move {
let listener = TcpListener::bind(&https_address)
.await
Expand All @@ -148,9 +145,16 @@ impl FrontendApi {
let app = Router::new()
.route("/", get(index))
.route("/admin", get(admin))
.route("/events/latest", get(routes::get_latest_unpublished_events))
.route("/tts", get(routes::get_all_events_in_queue))
.route("/events", get(routes::get_all_events_in_queue))
.route("/events/latest/all", get(routes::get_latest_events))
.route("/events/pause", get(routes::pause_events))
.route("/events/start", get(routes::resume_events))
//TODO: understand where to put our assets
// Remember that these need served by nginx in production
.nest_service("/assets", ServeDir::new("assets"));
.nest_service("/assets", ServeDir::new("assets"))
.with_state(event_queues.clone());

// run it
axum::serve(listener, app).await.unwrap();
Expand All @@ -174,40 +178,22 @@ impl FrontendApi {
}
}

#[derive(askama::Template)]
#[template(path = "index.html")]
struct IndexTemplate {}

#[derive(askama::Template)]
#[template(path = "admin.html")]
struct AdminTemplate {}

async fn index() -> IndexTemplate {
IndexTemplate {}
}

async fn admin() -> AdminTemplate {
AdminTemplate {}
}

async fn handle_message(
connection_state: ConnectionMap,
event_queues: EventQueues,
message: Option<DisplayMessage>,
) {
match message {
Some(message) => {
let mut state2 = connection_state.lock().unwrap();
for (&addr, tx) in state2.iter_mut() {
println!("Sending message to: {}", addr);
let mut queues = event_queues.lock().unwrap();

//Enqueue message
{
let mut queues = event_queues.lock().unwrap();
//TODO: need to handle different types of messages
//TODO: Store different types of messages in different queues
queues.unpublished_events.push_back(message.clone());

queues.events.push_back(message.clone());
}
//add to latest events, remove oldest if over 10
queues.latest_events.push_back(message.clone());
if queues.latest_events.len() > 10 {
queues.latest_events.pop_front();
}
}
None => panic!("Error receiving message"),
Expand Down Expand Up @@ -247,6 +233,7 @@ async fn handle_connection(
println!("Received a message from {}: {}", peer, msg.to_text()?);
ws_sender.send(msg).await?;
} else if msg.is_close() {
println!("Issue with connection: {}", peer);
break;
}
}
Expand All @@ -258,7 +245,11 @@ async fn handle_connection(
msg = rx.next() => {
let msg = msg.unwrap();
println!("Sending message to {}: {}", peer, msg.to_text()?);
ws_sender.send(msg).await?;
let res = ws_sender.send(msg).await;
if res.is_err() {
println!("Error sending message to {}", peer);
break;
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions frontend_api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ async fn main() {

//TODO: write some test something to send a message to the receiver

let mut count = 0;
loop {
println!("Sending message");
count += 1;
let display_message = messages::DisplayMessage {
message: "hello from htmx".to_string(),
message: format!("hello from htmx {}", count),
image_url: "".to_string(),
sound_url: "".to_string(),
display_time: 5000,
Expand All @@ -35,6 +36,6 @@ async fn main() {

let _ = tx.send(display_message).unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}
85 changes: 85 additions & 0 deletions frontend_api/src/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::types::EventQueues;
use axum::{extract::State, http::StatusCode};
use maud::{html, Markup};

#[derive(askama::Template)]
#[template(path = "index.html")]
pub struct IndexTemplate {}

#[derive(askama::Template)]
#[template(path = "admin.html")]
pub struct AdminTemplate {
pub enabled: bool,
}

pub async fn index() -> IndexTemplate {
IndexTemplate {}
}

pub async fn admin() -> AdminTemplate {
AdminTemplate {
enabled: crate::types::EVENT_QUEUE_ACTIVE.load(std::sync::atomic::Ordering::SeqCst),
}
}

pub async fn get_latest_unpublished_events(
State(queues): State<EventQueues>,
) -> Result<Markup, (StatusCode, String)> {
let queues = queues.lock().unwrap();
let range = if queues.unpublished_events.len() < 10 {
0..queues.unpublished_events.len()
} else {
0..10
};
let events = queues.unpublished_events.range(range);

Ok(html! {
ul class="waiting" {
@for event in events {
li { (event.message) }
}
}
})
}

pub async fn get_latest_events(
State(queues): State<EventQueues>,
) -> Result<Markup, (StatusCode, String)> {
let queues = queues.lock().unwrap();
let events = queues.latest_events.clone();
Ok(html! {
ul {
@for event in events {
li { (event.message) }
}
}
})
}

pub async fn pause_events() -> Result<Markup, (StatusCode, String)> {
crate::types::EVENT_QUEUE_ACTIVE.store(false, std::sync::atomic::Ordering::SeqCst);
Ok(html! {
button id="event-queue-toggle" hx-get="/events/start" hx-swap="outerHTML" hx-target="#event-queue-toggle" { "Start" }
})
}

pub async fn resume_events() -> Result<Markup, (StatusCode, String)> {
crate::types::EVENT_QUEUE_ACTIVE.store(true, std::sync::atomic::Ordering::SeqCst);
Ok(html! {
button id="event-queue-toggle" hx-get="/events/pause" hx-swap="outerHTML" hx-target="#event-queue-toggle" { "Pause" }
})
}

pub async fn get_all_events_in_queue(
State(queues): State<EventQueues>,
) -> Result<Markup, (StatusCode, String)> {
let queues = queues.lock().unwrap();
let events = queues.unpublished_events.clone();
Ok(html! {
ul {
@for event in events {
li { (event.message) }
}
}
})
}
32 changes: 32 additions & 0 deletions frontend_api/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use futures_channel::mpsc::UnboundedSender;
use messages::DisplayMessage;
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio_tungstenite::tungstenite::Message;
pub type Tx = UnboundedSender<Message>;
pub type ConnectionMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
pub type EventQueues = Arc<Mutex<Queues>>;

pub struct Queues {
pub unpublished_events: VecDeque<DisplayMessage>,
pub tts: VecDeque<DisplayMessage>,
pub latest_events: VecDeque<DisplayMessage>,
pub last_sub: Option<DisplayMessage>,
}

pub static EVENT_QUEUE_ACTIVE: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(true);

impl Queues {
pub fn new() -> Queues {
Queues {
unpublished_events: VecDeque::new(),
tts: VecDeque::new(),
latest_events: VecDeque::new(),
last_sub: None,
}
}
}
42 changes: 22 additions & 20 deletions frontend_api/templates/admin.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,34 @@
<main class="flex flex-row justify-center w-full">
<!-- Should display 2 queues with buttons at the bottom -->
<div>
<div class="queue">
<h1>Events</h1>
<div class="">
<h1>TODO</h1>
<div id="todo-queue"></div>
<div class="queue" hx-get="/events/latest/all" hx-target="#latest" hx-swap="innerHTML"
hx-trigger="every 1s">
<h1>Last Events</h1>
<li id="latest"></li>
<div class="button-holder">
</div>
<div class="">
<h1>DOING</h1>
<div id="doing-queue"></div>
</div>
<div class="queue" hx-get="/events/latest" hx-target="#waiting" hx-swap="innerHTML"
hx-trigger="every 2s">
<h1>In Queue</h1>
<li id="waiting"></li>
<div class="button-holder">
{%- if enabled %}
<button id="event-queue-toggle" hx-get="/events/pause" hx-swap="outerHTML"
hx-target="#event-queue-toggle">Stop</button>
{% else %}
<button id="event-queue-toggle" hx-get="/events/start" hx-swap="outerHTML"
hx-target="#event-queue-toggle">Start</button>
{% endif %}
</div>
</div>
<div class="queue">
<div class="queue" hx-get="/tts" hx-swap="innerHTML" hx-target="tts" hx-trigger="every 2s">
<h1>TTS</h1>
<div>
<h1>DONE</h1>
<div id=""></div>
</div>
<div class="">
<h1>ARCHIVED</h1>
<div id="archived-queue"></div>
<li id="tts"></li>
<div class="button-holder">
<button id="play-next" hx-swap="none">Play Next</button>
</div>
</div>
<div class="">
<button id="event-queue-start-stop">Stop</button>
<button id="tts">Play Next TTS</button>
</div>
</div>
<!-- Should display notifications and alerts -->
<div hx-ext="ws" ws-connect="ws://localhost:9000/">
Expand Down

0 comments on commit 31992c5

Please sign in to comment.