Skip to content

Commit

Permalink
Merge pull request #35 from stakpak/fix/sleep
Browse files Browse the repository at this point in the history
Fix: Sleep
  • Loading branch information
kajogo777 authored Feb 18, 2025
2 parents bae26c8 + cd9281a commit da971c8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "stakpak"
version = "0.1.35"
version = "0.1.36"
edition = "2021"

[dependencies]
Expand Down
11 changes: 8 additions & 3 deletions src/commands/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,18 @@ impl<'a> AgentOutputListener<'a> {
)
.await
{
return Err(format!("Failed to subscribe to session: {}", e));
if retry >= 5 {
return Err(format!("Failed to subscribe to session: {}", e));
}
}

if subscription_complete.load(Ordering::SeqCst) {
break;
}

if retry >= 5 {
return Err("Failed to subscribe to session: Timed out".to_string());
}
}

Ok(())
Expand All @@ -504,7 +510,6 @@ impl<'a> AgentOutputListener<'a> {

async fn listen_for_status_updates(&self) -> Result<(), String> {
let output_clone = Arc::clone(&self.output);

self.listener(
"status".to_string(),
move |msg: Payload, _client: SocketClient| -> BoxFuture<'static, ()> {
Expand All @@ -520,7 +525,7 @@ impl<'a> AgentOutputListener<'a> {
},
)
.await
.map_err(|_| "Failed to listen for events".to_string())
.map_err(|e| format!("Failed to listen for status updates: {}", e))
}

fn parse_agent_output(value: &Value) -> Result<RunAgentOutput, String> {
Expand Down
8 changes: 7 additions & 1 deletion src/commands/flow/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,18 @@ async fn wait_for_subscription(
)
.await
{
return Err(format!("Failed to subscribe to session: {}", e));
if retry >= 5 {
return Err(format!("Failed to subscribe to session: {}", e));
}
}

if subscription_complete.load(Ordering::SeqCst) {
break;
}

if retry >= 5 {
return Err("Failed to subscribe to session: Timed out".to_string());
}
}

Ok(())
Expand Down
78 changes: 38 additions & 40 deletions src/utils/output.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use std::sync::mpsc;
use std::{future::Future, pin::Pin, sync::Arc};

use rust_socketio::asynchronous::ClientBuilder;
use serde_json::json;
use std::future::Future;
use std::sync::mpsc;
use std::sync::Arc;

use crate::config::AppConfig;

pub struct OutputHandler {
tx: mpsc::Sender<String>,
}

pub type Handler = Box<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;

impl OutputHandler {
pub fn new(handler: Handler) -> Self {
pub fn new<F, Fut>(handler: F) -> Self
where
F: Fn(String) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let (tx, rx) = mpsc::channel::<String>();

tokio::spawn(async move {
while let Ok(msg) = rx.recv() {
let fut = handler(msg);
fut.await;
handler(msg).await;
}
});

Expand All @@ -42,8 +44,7 @@ pub async fn setup_output_handler(
config: &AppConfig,
session_id: String,
) -> Result<impl Fn(&str), String> {
// Attempt to connect to the socket
let socket_client = match ClientBuilder::new(config.api_endpoint.clone())
let socket_client = ClientBuilder::new(config.api_endpoint.clone())
.namespace("/v1/agents/sessions")
.reconnect(true)
.reconnect_delay(1000, 5000)
Expand All @@ -53,42 +54,39 @@ pub async fn setup_output_handler(
)
.connect()
.await
{
Ok(client) => Arc::new(client),
Err(e) => {
return Err(format!("Failed to connect to server: {}", e));
}
};
.map_err(|e| format!("Failed to connect to server: {}", e))?;

let socket_client = Arc::new(socket_client);

// Create output handler with the connected client
let output_handler = OutputHandler::new(Box::new(move |msg: String| {
let output_handler = OutputHandler::new(move |msg: String| {
println!("{}", msg);
let socket_client = socket_client.clone();
let msg_clone = msg.clone();
let session_id = session_id.clone();
Box::pin(async move {
let mut retries = 0;
while let Err(e) = socket_client
.emit(
"publish",
json!({
"text": msg_clone,
"session_id": session_id
}),
)
.await
{
tokio::time::sleep(std::time::Duration::from_millis(100 * (retries + 1))).await;
retries += 1;
if retries >= 5 {
eprintln!("Failed to publish message: {}", e);
break;

async move {
tokio::spawn(async move {
let payload = json!({
"text": msg,
"session_id": session_id
});

for retry in 0..5 {
match socket_client.emit("publish", payload.clone()).await {
Ok(_) => break,
Err(e) => {
if retry == 4 {
eprintln!("Failed to publish message: {}", e);
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100 * (retry + 1)))
.await;
}
}
}
}
})
}));
});
}
});

// Return closure that forwards messages to the output handler
Ok(move |msg: &str| {
output_handler.send(msg.to_string());
})
Expand Down

0 comments on commit da971c8

Please sign in to comment.