From ce92e415d8c74d70253f8d4170691d8e6cedce42 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:15:23 +0000 Subject: [PATCH 1/3] fix: prevent deadlocks in node manager operations - Add mutex for node lifecycle operations - Implement proper cleanup for event listeners - Add synchronization for concurrent access - Fix race conditions in start/stop operations Co-Authored-By: Nicolas Arqueros --- .../src/utils/node-manager.ts | 104 ++++++++++++------ 1 file changed, 71 insertions(+), 33 deletions(-) diff --git a/apps/shinkai-visor-e2e/src/utils/node-manager.ts b/apps/shinkai-visor-e2e/src/utils/node-manager.ts index 383b379ac..a2c45c06c 100644 --- a/apps/shinkai-visor-e2e/src/utils/node-manager.ts +++ b/apps/shinkai-visor-e2e/src/utils/node-manager.ts @@ -16,6 +16,7 @@ export class NodeManager { private node: ChildProcess | undefined; private nodeExecPath: string; + private operationLock: Promise = Promise.resolve(); constructor(nodeExecPath?: string) { this.nodeExecPath = @@ -35,6 +36,17 @@ export class NodeManager { ); } + private async acquireLock(): Promise<() => void> { + let release: () => void; + const newLock = new Promise((resolve) => { + release = resolve; + }); + const oldLock = this.operationLock; + this.operationLock = newLock; + await oldLock; + return release!; + } + private async spawnNode( command: string, options: { @@ -53,19 +65,31 @@ export class NodeManager { stdio: 'pipe', shell: true, }); + + const cleanup = () => { + childProcess.removeAllListeners(); + childProcess.stdout?.removeAllListeners(); + childProcess.stderr?.removeAllListeners(); + }; + childProcess.on('close', (err) => { logger(`close with code ${String(err)}`); + cleanup(); reject(err); }); + childProcess.on('error', (err) => { logger(`error ${String(err)}`); }); + childProcess.stderr.on('error', (data) => { logger(String(data)); }); + childProcess.stdout.on('error', (data) => { logger(String(data)); }); + if (options.pipeLogs) { childProcess.stderr.on('data', (data) => { logger(data.toString()); @@ -74,17 +98,21 @@ export class NodeManager { logger(data.toString()); }); } + if (options.readyMatcher) { const timeoutRef = setTimeout(() => { childProcess.kill(); + cleanup(); reject( `ready matcher timeout after ${options.readyMatcherTimeoutMs}`, ); }, options.readyMatcherTimeoutMs ?? 15000); + childProcess.stdout?.on('data', (chunk: Buffer) => { if (options.readyMatcher?.test(chunk.toString())) { logger(`process ready, with readyMatcher:${chunk.toString()}`); clearTimeout(timeoutRef); + cleanup(); resolve(childProcess); } }); @@ -95,45 +123,55 @@ export class NodeManager { } async startNode(pristine: boolean, nodeOptions?: object): Promise { + const release = await this.acquireLock(); console.log('starting node'); - const mergedOptions = { - ...this.defaultNodeOptions, - ...(nodeOptions || {}), - }; - if (pristine) { - this.resetToPristine(mergedOptions.NODE_STORAGE_PATH); + try { + const mergedOptions = { + ...this.defaultNodeOptions, + ...(nodeOptions || {}), + }; + if (pristine) { + this.resetToPristine(mergedOptions.NODE_STORAGE_PATH); + } + const nodeEnv = Object.entries(mergedOptions) + .map(([key, value]) => { + return `${key}="${value}"`; + }) + .join(' '); + + this.node = await this.spawnNode(`${nodeEnv} ${this.nodeExecPath}`, { + pipeLogs: true, + logsId: 'shinkai-node', + readyMatcher: /Server::run/, + }); + console.log('node started'); + } finally { + release(); } - const nodeEnv = Object.entries(mergedOptions) - .map(([key, value]) => { - return `${key}="${value}"`; - }) - .join(' '); - - this.node = await this.spawnNode(`${nodeEnv} ${this.nodeExecPath}`, { - pipeLogs: true, - logsId: 'shinkai-node', - readyMatcher: /Server::run/, - }); - console.log('node started'); } async stopNode(): Promise { + const release = await this.acquireLock(); console.log('stopping node'); - if (!this.node) { - return Promise.resolve(); - } - this.node.kill(); - await new Promise((resolve) => { - const timeout = setTimeout(() => { - console.warn('stopping node timeout'); - resolve(); - }, 5000); - this.node.once('exit', () => { - console.log('stopping node success'); - clearTimeout(timeout); - resolve(); + try { + if (!this.node) { + return Promise.resolve(); + } + this.node.kill(); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + console.warn('stopping node timeout'); + resolve(); + }, 5000); + this.node.once('exit', () => { + console.log('stopping node success'); + clearTimeout(timeout); + resolve(); + }); }); - }); - this.node = undefined; + this.node = undefined; + } finally { + release(); + } } } From 3a95caec270c68ec9a4c8bbccc76fae1a4e6cbbc Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:38:52 +0000 Subject: [PATCH 2/3] fix: prevent deadlocks in rust node manager implementation Co-Authored-By: Nicolas Arqueros --- .../process_handlers/process_handler.rs | 113 +++++++----------- .../shinkai_node_process_handler.rs | 31 ++++- .../shinkai_node_manager.rs | 46 +++++-- 3 files changed, 103 insertions(+), 87 deletions(-) diff --git a/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/process_handler.rs b/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/process_handler.rs index 1698f461f..00987881a 100644 --- a/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/process_handler.rs +++ b/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/process_handler.rs @@ -35,7 +35,6 @@ impl ProcessHandler { const MAX_LOGS_LENGTH: usize = 500; const MIN_MS_ALIVE: u64 = 5000; - /// Initializes a new ShinkaiNodeManager with default or provided options pub(crate) fn new( app: AppHandle, process_name: String, @@ -93,52 +92,13 @@ impl ProcessHandler { process.is_some() } - pub async fn spawn( - &self, - env: HashMap, - args: Vec<&str>, - current_dir: Option, - ) -> Result<(), String> { - { - let process = self.process.lock().await; - if process.is_some() { - log::warn!("process {} is already running", self.process_name); - return Ok(()); - } - } - - let mut logger = self.logger.write().await; - let shell = self.app.shell(); - let (mut rx, child) = shell - .sidecar(self.process_name.clone()) - .map_err(|error| { - let message = format!("failed to spawn, error: {}", error); - logger.add_log(message.clone()); - message - })? - .envs(env.clone()) - .current_dir(current_dir.unwrap_or_else(|| std::path::PathBuf::from("./"))) - .args(args) - .spawn() - .map_err(|error| { - let message = format!("failed to spawn error: {}", error); - logger.add_log(message.clone()); - message - })?; - drop(logger); - - { - let mut process = self.process.lock().await; - *process = Some(child); - } - + async fn handle_process_events(&self, mut rx: tokio::sync::mpsc::Receiver) { let process_mutex = Arc::clone(&self.process); let logger_mutex = Arc::clone(&self.logger); let event_sender_mutex = Arc::clone(&self.event_sender); let is_ready_mutex = Arc::new(Mutex::new(false)); - let is_ready_mutex_clone = is_ready_mutex.clone(); - let ready_matcher = self.ready_matcher.clone(); + tauri::async_runtime::spawn(async move { while let Some(event) = rx.recv().await { let message = Self::command_event_to_message(event.clone()); @@ -157,37 +117,52 @@ impl ProcessHandler { } } }); + } - let start_time = std::time::Instant::now(); - let logger_mutex = self.logger.clone(); - let process_mutex = self.process.clone(); - let event_sender_mutex = Arc::clone(&self.event_sender); - tauri::async_runtime::spawn(async move { - while std::time::Instant::now().duration_since(start_time) - < std::time::Duration::from_millis(Self::MIN_MS_ALIVE) - { - let process = process_mutex.lock().await; - let is_ready = is_ready_mutex_clone.lock().await; - if process.is_none() { - let event_sender = event_sender_mutex.lock().await; - let mut logger = logger_mutex.write().await; - let message = "failed to spawn shinkai-node, it crashed before min time alive" - .to_string(); - let log_entry = logger.add_log(message.clone()); - let _ = event_sender.send(ProcessHandlerEvent::Log(log_entry)).await; - return Err(message.to_string()); - } else if *is_ready { - break; - } - std::thread::sleep(std::time::Duration::from_millis(500)); + pub async fn spawn( + &self, + env: HashMap, + args: Vec<&str>, + current_dir: Option, + ) -> Result<(), String> { + { + let process = self.process.lock().await; + if process.is_some() { + log::warn!("process {} is already running", self.process_name); + return Ok(()); } - Ok(()) - }) - .await - .unwrap()?; + } - self.emit_event(ProcessHandlerEvent::Started).await; + let child = { + let mut logger = self.logger.write().await; + let shell = self.app.shell(); + let (rx, child) = shell + .sidecar(self.process_name.clone()) + .map_err(|error| { + let message = format!("failed to spawn, error: {}", error); + logger.add_log(message.clone()); + message + })? + .envs(env.clone()) + .current_dir(current_dir.unwrap_or_else(|| std::path::PathBuf::from("./"))) + .args(args) + .spawn() + .map_err(|error| { + let message = format!("failed to spawn error: {}", error); + logger.add_log(message.clone()); + message + })?; + + self.handle_process_events(rx); + child + }; + { + let mut process = self.process.lock().await; + *process = Some(child); + } + + self.emit_event(ProcessHandlerEvent::Started).await; Ok(()) } diff --git a/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/shinkai_node_process_handler.rs b/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/shinkai_node_process_handler.rs index 1d0e30a81..9db430cc1 100644 --- a/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/shinkai_node_process_handler.rs +++ b/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/process_handlers/shinkai_node_process_handler.rs @@ -141,12 +141,33 @@ impl ShinkaiNodeProcessHandler { let _ = self.kill().await; let env = options_to_env(&self.options.clone()); - self.process_handler.spawn(env, [].to_vec(), None).await?; - if let Err(e) = self.wait_shinkai_node_server().await { - self.process_handler.kill().await; - return Err(e); + + // Add timeout for spawn operation + let spawn_result = tokio::time::timeout( + Duration::from_secs(30), + self.process_handler.spawn(env, [].to_vec(), None) + ).await; + + match spawn_result { + Ok(Ok(_)) => { + match tokio::time::timeout( + Duration::from_millis(Self::HEALTH_TIMEOUT_MS), + self.wait_shinkai_node_server() + ).await { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => { + self.process_handler.kill().await; + Err(e) + }, + Err(_) => { + self.process_handler.kill().await; + Err("Health check timeout".to_string()) + } + } + }, + Ok(Err(e)) => Err(e), + Err(_) => Err("Spawn timeout".to_string()) } - Ok(()) } pub async fn get_last_n_logs(&self, n: usize) -> Vec { diff --git a/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/shinkai_node_manager.rs b/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/shinkai_node_manager.rs index be311ac2c..faa8ae693 100644 --- a/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/shinkai_node_manager.rs +++ b/apps/shinkai-desktop/src-tauri/src/local_shinkai_node/shinkai_node_manager.rs @@ -85,15 +85,28 @@ impl ShinkaiNodeManager { } pub async fn spawn(&mut self) -> Result<(), String> { + // Add cleanup in case of previous failed state + self.kill().await; + + // Start Ollama with timeout self.emit_event(ShinkaiNodeManagerEvent::StartingOllama); - match self.ollama_process.spawn(None).await { - Ok(_) => { + match tokio::time::timeout( + Duration::from_secs(30), + self.ollama_process.spawn(None) + ).await { + Ok(Ok(_)) => { self.emit_event(ShinkaiNodeManagerEvent::OllamaStarted); - } - Err(e) => { + }, + Ok(Err(e)) => { self.kill().await; self.emit_event(ShinkaiNodeManagerEvent::OllamaStartError { error: e.clone() }); return Err(e); + }, + Err(_) => { + self.kill().await; + let error = "Ollama start timeout".to_string(); + self.emit_event(ShinkaiNodeManagerEvent::OllamaStartError { error: error.clone() }); + return Err(error); } } @@ -223,19 +236,26 @@ impl ShinkaiNodeManager { } self.emit_event(ShinkaiNodeManagerEvent::StartingShinkaiNode); - match self.shinkai_node_process.spawn().await { - Ok(_) => { + match tokio::time::timeout( + Duration::from_secs(30), + self.shinkai_node_process.spawn() + ).await { + Ok(Ok(_)) => { self.emit_event(ShinkaiNodeManagerEvent::ShinkaiNodeStarted); - } - Err(e) => { + Ok(()) + }, + Ok(Err(e)) => { self.kill().await; - self.emit_event(ShinkaiNodeManagerEvent::ShinkaiNodeStartError { - error: e.clone(), - }); - return Err(e); + self.emit_event(ShinkaiNodeManagerEvent::ShinkaiNodeStartError { error: e.clone() }); + Err(e) + }, + Err(_) => { + self.kill().await; + let error = "Shinkai node start timeout".to_string(); + self.emit_event(ShinkaiNodeManagerEvent::ShinkaiNodeStartError { error: error.clone() }); + Err(error) } } - Ok(()) } pub async fn kill(&mut self) { From 62df044fc1abb7d53f39fe21cf1d50d80ead86cb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:46:41 +0000 Subject: [PATCH 3/3] Revert "fix: prevent deadlocks in node manager operations" This reverts commit ce92e415d8c74d70253f8d4170691d8e6cedce42. --- .../src/utils/node-manager.ts | 104 ++++++------------ 1 file changed, 33 insertions(+), 71 deletions(-) diff --git a/apps/shinkai-visor-e2e/src/utils/node-manager.ts b/apps/shinkai-visor-e2e/src/utils/node-manager.ts index a2c45c06c..383b379ac 100644 --- a/apps/shinkai-visor-e2e/src/utils/node-manager.ts +++ b/apps/shinkai-visor-e2e/src/utils/node-manager.ts @@ -16,7 +16,6 @@ export class NodeManager { private node: ChildProcess | undefined; private nodeExecPath: string; - private operationLock: Promise = Promise.resolve(); constructor(nodeExecPath?: string) { this.nodeExecPath = @@ -36,17 +35,6 @@ export class NodeManager { ); } - private async acquireLock(): Promise<() => void> { - let release: () => void; - const newLock = new Promise((resolve) => { - release = resolve; - }); - const oldLock = this.operationLock; - this.operationLock = newLock; - await oldLock; - return release!; - } - private async spawnNode( command: string, options: { @@ -65,31 +53,19 @@ export class NodeManager { stdio: 'pipe', shell: true, }); - - const cleanup = () => { - childProcess.removeAllListeners(); - childProcess.stdout?.removeAllListeners(); - childProcess.stderr?.removeAllListeners(); - }; - childProcess.on('close', (err) => { logger(`close with code ${String(err)}`); - cleanup(); reject(err); }); - childProcess.on('error', (err) => { logger(`error ${String(err)}`); }); - childProcess.stderr.on('error', (data) => { logger(String(data)); }); - childProcess.stdout.on('error', (data) => { logger(String(data)); }); - if (options.pipeLogs) { childProcess.stderr.on('data', (data) => { logger(data.toString()); @@ -98,21 +74,17 @@ export class NodeManager { logger(data.toString()); }); } - if (options.readyMatcher) { const timeoutRef = setTimeout(() => { childProcess.kill(); - cleanup(); reject( `ready matcher timeout after ${options.readyMatcherTimeoutMs}`, ); }, options.readyMatcherTimeoutMs ?? 15000); - childProcess.stdout?.on('data', (chunk: Buffer) => { if (options.readyMatcher?.test(chunk.toString())) { logger(`process ready, with readyMatcher:${chunk.toString()}`); clearTimeout(timeoutRef); - cleanup(); resolve(childProcess); } }); @@ -123,55 +95,45 @@ export class NodeManager { } async startNode(pristine: boolean, nodeOptions?: object): Promise { - const release = await this.acquireLock(); console.log('starting node'); - try { - const mergedOptions = { - ...this.defaultNodeOptions, - ...(nodeOptions || {}), - }; - if (pristine) { - this.resetToPristine(mergedOptions.NODE_STORAGE_PATH); - } - const nodeEnv = Object.entries(mergedOptions) - .map(([key, value]) => { - return `${key}="${value}"`; - }) - .join(' '); - - this.node = await this.spawnNode(`${nodeEnv} ${this.nodeExecPath}`, { - pipeLogs: true, - logsId: 'shinkai-node', - readyMatcher: /Server::run/, - }); - console.log('node started'); - } finally { - release(); + const mergedOptions = { + ...this.defaultNodeOptions, + ...(nodeOptions || {}), + }; + if (pristine) { + this.resetToPristine(mergedOptions.NODE_STORAGE_PATH); } + const nodeEnv = Object.entries(mergedOptions) + .map(([key, value]) => { + return `${key}="${value}"`; + }) + .join(' '); + + this.node = await this.spawnNode(`${nodeEnv} ${this.nodeExecPath}`, { + pipeLogs: true, + logsId: 'shinkai-node', + readyMatcher: /Server::run/, + }); + console.log('node started'); } async stopNode(): Promise { - const release = await this.acquireLock(); console.log('stopping node'); - try { - if (!this.node) { - return Promise.resolve(); - } - this.node.kill(); - await new Promise((resolve) => { - const timeout = setTimeout(() => { - console.warn('stopping node timeout'); - resolve(); - }, 5000); - this.node.once('exit', () => { - console.log('stopping node success'); - clearTimeout(timeout); - resolve(); - }); - }); - this.node = undefined; - } finally { - release(); + if (!this.node) { + return Promise.resolve(); } + this.node.kill(); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + console.warn('stopping node timeout'); + resolve(); + }, 5000); + this.node.once('exit', () => { + console.log('stopping node success'); + clearTimeout(timeout); + resolve(); + }); + }); + this.node = undefined; } }