From 566feae391a813a3c8f65a7da45a33a327455919 Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Fri, 29 Nov 2024 11:39:13 +0100 Subject: [PATCH] Changes to propagate and catch exceptions caused inside async tasks (modules). Tries to prevent crashing due to module errors --- ygg-mule.py | 9 ++++++++- ygg_trunk.py | 31 ++++++++++++++++++++++++------- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/ygg-mule.py b/ygg-mule.py index de4d63c..5a8c4e1 100644 --- a/ygg-mule.py +++ b/ygg-mule.py @@ -13,6 +13,13 @@ logging = custom_logger("Ygg-Mule") +async def launch_realm(realm): + try: + await realm.launch() + except Exception as e: + logging.error(f"Error in realm.launch(): {e}", exc_info=True) + + def process_document(doc_id): """Process a document by its ID. @@ -46,7 +53,7 @@ def process_document(doc_id): if RealmClass: realm = RealmClass(document, ydm) if realm.proceed: - asyncio.run(realm.launch()) + asyncio.run(launch_realm(realm)) logging.info("Processing complete.") else: logging.info( diff --git a/ygg_trunk.py b/ygg_trunk.py index d47de6d..860e0db 100644 --- a/ygg_trunk.py +++ b/ygg_trunk.py @@ -81,7 +81,14 @@ async def process_couchdb_changes(): tasks_limit = 2 if len(tasks) >= tasks_limit: # Wait for all tasks to complete - await asyncio.gather(*tasks) + results = await asyncio.gather(*tasks, return_exceptions=True) + # Check for exceptions + for result in results: + if isinstance(result, Exception): + logging.error( + f"Task raised an exception: {result}", + exc_info=True, + ) tasks = [] # Sleep to avoid excessive polling @@ -90,14 +97,24 @@ async def process_couchdb_changes(): # If an HPC job is required, submit it asynchronously # await submit_hpc_job(data) - except Exception as e: - logging.error(f"An error occurred: {e}", exc_info=True) + # except Exception as e: + # logging.error(f"An error occurred: {e}", exc_info=True) # logging.error(f"Data causing the error: {data}") - # After the loop, wait for any remaining tasks to complete - if tasks: - await asyncio.gather(*tasks) - tasks = [] + # After the loop, wait for any remaining tasks to complete + # |<-- if goes one indentation back if except block above is uncommented NOTE + if tasks: + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logging.error( + f"Task raised an exception: {result}", + exc_info=True, + ) + tasks = [] + + except Exception as e: + logging.error(f"An error occurred: {e}", exc_info=True) # Sleep to avoid excessive polling print("Sleeping in while loop...")