Skip to content

Commit

Permalink
Merge pull request #26 from glrs/feature/core
Browse files Browse the repository at this point in the history
Feature/core - Enhance async module exception handling to prevent core from crashing in module failure
  • Loading branch information
glrs authored Nov 29, 2024
2 parents 928f173 + dff9d41 commit 1033855
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
9 changes: 8 additions & 1 deletion ygg-mule.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
logging = custom_logger("Ygg-Mule")


async def launch_realm(realm):
try:
await realm.launch()
except Exception as e:
logging.error(f"Error in realm.launch(): {e}", exc_info=True)


def process_document(doc_id):
"""Process a document by its ID.
Expand Down Expand Up @@ -46,7 +53,7 @@ def process_document(doc_id):
if RealmClass:
realm = RealmClass(document, ydm)
if realm.proceed:
asyncio.run(realm.launch())
asyncio.run(launch_realm(realm))
logging.info("Processing complete.")
else:
logging.info(
Expand Down
31 changes: 24 additions & 7 deletions ygg_trunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ async def process_couchdb_changes():
tasks_limit = 2
if len(tasks) >= tasks_limit:
# Wait for all tasks to complete
await asyncio.gather(*tasks)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for exceptions
for result in results:
if isinstance(result, Exception):
logging.error(
f"Task raised an exception: {result}",
exc_info=True,
)
tasks = []

# Sleep to avoid excessive polling
Expand All @@ -90,14 +97,24 @@ async def process_couchdb_changes():

# If an HPC job is required, submit it asynchronously
# await submit_hpc_job(data)
except Exception as e:
logging.error(f"An error occurred: {e}", exc_info=True)
# except Exception as e:
# logging.error(f"An error occurred: {e}", exc_info=True)
# logging.error(f"Data causing the error: {data}")

# After the loop, wait for any remaining tasks to complete
if tasks:
await asyncio.gather(*tasks)
tasks = []
# After the loop, wait for any remaining tasks to complete
# |<-- if goes one indentation back if except block above is uncommented NOTE
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logging.error(
f"Task raised an exception: {result}",
exc_info=True,
)
tasks = []

except Exception as e:
logging.error(f"An error occurred: {e}", exc_info=True)

# Sleep to avoid excessive polling
print("Sleeping in while loop...")
Expand Down

0 comments on commit 1033855

Please sign in to comment.