Skip to content

Commit

Permalink
Merge pull request #2120 from langchain-ai/nc/15oct/executor-dict
Browse files Browse the repository at this point in the history
fix: Avoid errors from executor modifying tasks dict during exit routine
  • Loading branch information
nfcampos authored Oct 15, 2024
2 parents 18f34c3 + 515c4ff commit 0e2c2eb
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions libs/langgraph/langgraph/pregel/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,21 @@ def __exit__(
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
# copy the tasks as done() callback may modify the dict
tasks = self.tasks.copy()
# cancel all tasks that should be cancelled
for task, (cancel, _) in self.tasks.items():
for task, (cancel, _) in tasks.items():
if cancel:
task.cancel()
# wait for all tasks to finish
if tasks := {t for t in self.tasks if not t.done()}:
concurrent.futures.wait(tasks)
if pending := {t for t in tasks if not t.done()}:
concurrent.futures.wait(pending)
# shutdown the executor
self.stack.__exit__(exc_type, exc_value, traceback)
# re-raise the first exception that occurred in a task
if exc_type is None:
# if there's already an exception being raised, don't raise another one
for task, (_, reraise) in self.tasks.items():
for task, (_, reraise) in tasks.items():
if not reraise:
continue
try:
Expand Down Expand Up @@ -161,17 +163,19 @@ async def __aexit__(
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
# copy the tasks as done() callback may modify the dict
tasks = self.tasks.copy()
# cancel all tasks that should be cancelled
for task, (cancel, _) in self.tasks.items():
for task, (cancel, _) in tasks.items():
if cancel:
task.cancel(self.sentinel)
# wait for all tasks to finish
if self.tasks:
await asyncio.wait(self.tasks)
if tasks:
await asyncio.wait(tasks)
# if there's already an exception being raised, don't raise another one
if exc_type is None:
# re-raise the first exception that occurred in a task
for task, (_, reraise) in self.tasks.items():
for task, (_, reraise) in tasks.items():
if not reraise:
continue
try:
Expand Down

0 comments on commit 0e2c2eb

Please sign in to comment.