From ff56f2e9a7cf52c9f20237673c43fba5ce68adee Mon Sep 17 00:00:00 2001 From: shahargl Date: Thu, 26 Dec 2024 11:24:21 +0200 Subject: [PATCH] feat: wip --- keep/workflowmanager/workflowscheduler.py | 29 +++++++++++++++++------ tests/test_workflow_execution.py | 22 ++++++++++------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/keep/workflowmanager/workflowscheduler.py b/keep/workflowmanager/workflowscheduler.py index 65ab89021..dbea8d5af 100644 --- a/keep/workflowmanager/workflowscheduler.py +++ b/keep/workflowmanager/workflowscheduler.py @@ -610,21 +610,36 @@ def stop(self): self.logger.info("Stopping scheduled workflows") self._stop = True + # Wait for scheduler to stop first if self.scheduler_future: try: - self.scheduler_future.result() # Wait for scheduler to stop + self.scheduler_future.result( + timeout=5 + ) # Add timeout to prevent hanging except Exception: self.logger.exception("Error waiting for scheduler to stop") - # Cancel all running workflows - for future in self.futures: - future.cancel() + # Cancel all running workflows with timeout + for future in list(self.futures): # Create a copy of futures set + try: + self.logger.info("Cancelling future") + future.cancel() + future.result(timeout=1) # Add timeout + self.logger.info("Future cancelled") + except Exception: + self.logger.exception("Error cancelling future") - # Shutdown the executor if it exists + # Shutdown the executor with timeout if self.executor: - self.executor.shutdown(wait=True) - self.executor = None # Clear the executor reference + try: + self.logger.info("Shutting down executor") + self.executor.shutdown(wait=True, cancel_futures=True) + self.executor = None + self.logger.info("Executor shut down") + except Exception: + self.logger.exception("Error shutting down executor") + self.futures.clear() self.logger.info("Scheduled workflows stopped") def _finish_workflow_execution( diff --git a/tests/test_workflow_execution.py b/tests/test_workflow_execution.py index 4c9315fb9..2a1bd08a4 100644 --- a/tests/test_workflow_execution.py +++ b/tests/test_workflow_execution.py @@ -79,15 +79,21 @@ @pytest.fixture(scope="module") def workflow_manager(): """ - Fixture to create and manage a WorkflowManager instance for the duration of the module. + Fixture to create and manage a WorkflowManager instance. """ - manager = WorkflowManager.get_instance() - asyncio.run(manager.start()) # Start synchronously - - yield manager - - # Cleanup - manager.stop() + manager = None + try: + manager = WorkflowManager.get_instance() + asyncio.run(manager.start()) + yield manager + finally: + if manager: + try: + manager.stop() + # Give some time for threads to clean up + time.sleep(1) + except Exception as e: + print(f"Error stopping workflow manager: {e}") @pytest.fixture