From eefe057a47d5b0f3aef86481f28eb3047e9c6ecf Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 15 Oct 2024 15:13:46 -0700 Subject: [PATCH 1/2] fix: Avoid errors from executor modifying tasks dict during exit routine - This could happen if a task happened to finish while the exit routine is running --- libs/langgraph/langgraph/pregel/executor.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/libs/langgraph/langgraph/pregel/executor.py b/libs/langgraph/langgraph/pregel/executor.py index 691098b7a..af8cdf3a5 100644 --- a/libs/langgraph/langgraph/pregel/executor.py +++ b/libs/langgraph/langgraph/pregel/executor.py @@ -86,19 +86,21 @@ def __exit__( exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Optional[bool]: + # copy the tasks as done() callback may modify the dict + tasks = self.tasks.copy() # cancel all tasks that should be cancelled - for task, (cancel, _) in self.tasks.items(): + for task, (cancel, _) in tasks.items(): if cancel: task.cancel() # wait for all tasks to finish - if tasks := {t for t in self.tasks if not t.done()}: + if tasks := {t for t in tasks if not t.done()}: concurrent.futures.wait(tasks) # shutdown the executor self.stack.__exit__(exc_type, exc_value, traceback) # re-raise the first exception that occurred in a task if exc_type is None: # if there's already an exception being raised, don't raise another one - for task, (_, reraise) in self.tasks.items(): + for task, (_, reraise) in tasks.items(): if not reraise: continue try: @@ -161,17 +163,19 @@ async def __aexit__( exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: + # copy the tasks as done() callback may modify the dict + tasks = self.tasks.copy() # cancel all tasks that should be cancelled - for task, (cancel, _) in self.tasks.items(): + for task, (cancel, _) in tasks.items(): if cancel: task.cancel(self.sentinel) # wait for all tasks to finish - if self.tasks: - await asyncio.wait(self.tasks) + if tasks: + await asyncio.wait(tasks) # if there's already an exception being raised, don't raise another one if exc_type is None: # re-raise the first exception that occurred in a task - for task, (_, reraise) in self.tasks.items(): + for task, (_, reraise) in tasks.items(): if not reraise: continue try: From 515c4ffebe167b0b4ebb5a1fe02d1d8450d3caa6 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 15 Oct 2024 16:11:08 -0700 Subject: [PATCH 2/2] Fix --- libs/langgraph/langgraph/pregel/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/langgraph/langgraph/pregel/executor.py b/libs/langgraph/langgraph/pregel/executor.py index af8cdf3a5..488f12661 100644 --- a/libs/langgraph/langgraph/pregel/executor.py +++ b/libs/langgraph/langgraph/pregel/executor.py @@ -93,8 +93,8 @@ def __exit__( if cancel: task.cancel() # wait for all tasks to finish - if tasks := {t for t in tasks if not t.done()}: - concurrent.futures.wait(tasks) + if pending := {t for t in tasks if not t.done()}: + concurrent.futures.wait(pending) # shutdown the executor self.stack.__exit__(exc_type, exc_value, traceback) # re-raise the first exception that occurred in a task