Skip to content

Commit

Permalink
feat: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl committed Dec 26, 2024
1 parent 604e382 commit ff56f2e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
29 changes: 22 additions & 7 deletions keep/workflowmanager/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,21 +610,36 @@ def stop(self):
self.logger.info("Stopping scheduled workflows")
self._stop = True

# Wait for scheduler to stop first
if self.scheduler_future:
try:
self.scheduler_future.result() # Wait for scheduler to stop
self.scheduler_future.result(
timeout=5
) # Add timeout to prevent hanging
except Exception:
self.logger.exception("Error waiting for scheduler to stop")

# Cancel all running workflows
for future in self.futures:
future.cancel()
# Cancel all running workflows with timeout
for future in list(self.futures): # Create a copy of futures set
try:
self.logger.info("Cancelling future")
future.cancel()
future.result(timeout=1) # Add timeout
self.logger.info("Future cancelled")
except Exception:
self.logger.exception("Error cancelling future")

# Shutdown the executor if it exists
# Shutdown the executor with timeout
if self.executor:
self.executor.shutdown(wait=True)
self.executor = None # Clear the executor reference
try:
self.logger.info("Shutting down executor")
self.executor.shutdown(wait=True, cancel_futures=True)
self.executor = None
self.logger.info("Executor shut down")
except Exception:
self.logger.exception("Error shutting down executor")

self.futures.clear()
self.logger.info("Scheduled workflows stopped")

def _finish_workflow_execution(
Expand Down
22 changes: 14 additions & 8 deletions tests/test_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,21 @@
@pytest.fixture(scope="module")
def workflow_manager():
"""
Fixture to create and manage a WorkflowManager instance for the duration of the module.
Fixture to create and manage a WorkflowManager instance.
"""
manager = WorkflowManager.get_instance()
asyncio.run(manager.start()) # Start synchronously

yield manager

# Cleanup
manager.stop()
manager = None
try:
manager = WorkflowManager.get_instance()
asyncio.run(manager.start())
yield manager
finally:
if manager:
try:
manager.stop()
# Give some time for threads to clean up
time.sleep(1)
except Exception as e:
print(f"Error stopping workflow manager: {e}")


@pytest.fixture
Expand Down

0 comments on commit ff56f2e

Please sign in to comment.