Skip to content

Commit

Permalink
add more detailed logging
Browse files Browse the repository at this point in the history
  • Loading branch information
dwhswenson committed Aug 25, 2023
1 parent 9ee865a commit 5e33b03
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
32 changes: 26 additions & 6 deletions exorcist/taskdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,14 @@ def check_out_task(self):
# TODO: separate selection so subclasses can easily override;
# something like `_select_task(conn: Connection) -> Row` (allow us
# to do something smarter than "take the first available")
_logger.info("Checking out a new task")
sel_stmt = (
sqla.select(self.tasks_table)
.where(self.tasks_table.c.status == TaskStatus.AVAILABLE.value)
)
with self.engine.begin() as conn:
task_row = conn.execute(sel_stmt).first()
_logger.debug(f"Before claiming task: {task_row=}")

if task_row is None:
# no tasks are available
Expand All @@ -481,9 +483,27 @@ def check_out_task(self):

self._validate_update_result(result)

# log the changed row if we're doing DEBUG logging
if _logger.isEnabledFor(logging.DEBUG):
reselect = (
sqla.select(self.tasks_table)
.where(self.tasks_table.c.taskid == task_row.taskid)
)
# read-only; use connect() (no autocommit)
with self.engine.connect() as conn:
reloaded = list(conn.execute(reselect).all())

assert len(reloaded) == 1, \
f"Got {len(reloaded)} rows for '{task_row.taskid}'"

claimed = reloaded[0]
_logger.debug(f"After claiming task: {claimed=}")

_logger.info(f"Selected task '{task_row.taskid}'")
return task_row.taskid

def _mark_task_completed_failure(self, taskid: str):
_logger.info(f"Marking try of {taskid} as failed.")
status_statement = sqla.case(
(
self.tasks_table.c.tries >= self.tasks_table.c.max_tries,
Expand All @@ -502,7 +522,7 @@ def _mark_task_completed_failure(self, taskid: str):


def _mark_task_completed_success(self, taskid: str):
_logger.debug(f"Marking task '{taskid}' as successfully completed")
_logger.info(f"Marking task '{taskid}' as successfully completed")
# TODO: there may be ways to make this faster; this is likely to be
# the most important point for performance considerations

Expand Down Expand Up @@ -549,24 +569,24 @@ def _mark_task_completed_success(self, taskid: str):

# now we actually DO those steps
with self.engine.begin() as conn:
_logger.debug("Setting task status to COMPLETED")
_logger.debug("* Setting task status to COMPLETED")
completed_task = conn.execute(update_task_completed)
_logger.debug("Identifying candidates to unblock")
_logger.debug("* Identifying candidates to unblock")
candidates = conn.execute(update_deps).fetchall()
candidates = {c[0] for c in candidates}
_logger.debug("Identifying which candidates should unblocked")
_logger.debug("* Identifying which candidates should unblocked")
blocked = conn.execute(
still_blocked, {"candidates": candidates}
).fetchall()
blocked = {c[0] for c in blocked}
to_unblock = candidates - blocked
if to_unblock:
_logger.debug("Moving unblocked tasks to AVAILABLE")
_logger.debug("* Moving unblocked tasks to AVAILABLE")
unblocked = conn.execute(update_task_unblocked, [
{'unblock': unblock} for unblock in to_unblock
])
else:
_logger.debug("No tasks to unblock")
_logger.debug("* No tasks to unblock")

def mark_task_completed(self, taskid: str, success: bool):
if success:
Expand Down
9 changes: 8 additions & 1 deletion exorcist/tests/test_taskstatusdb.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import pytest
from unittest import mock
from exorcist import TaskStatusDB, NoStatusChange, TaskStatus
from exorcist.taskdb import _logger as taskdb_logger

import sqlalchemy as sqla
import networkx as nx
from datetime import datetime
import logging

def create_database(metadata, engine, extra_table=False,
missing_table=False, extra_column=False,
Expand Down Expand Up @@ -272,7 +274,11 @@ def test_task_row_update_statement(self, loaded_db):
# TODO: I'm going to do this in a future PR
...

def test_check_out_task(self, loaded_db):
# be sure to cover extra code paths when doing DEBUG logging
@pytest.mark.parametrize('loglevel', [logging.DEBUG, logging.WARNING])
def test_check_out_task(self, loaded_db, loglevel):
taskdb_logger.setLevel(loglevel)

taskid = loaded_db.check_out_task()
assert taskid == "foo"

Expand All @@ -286,6 +292,7 @@ def test_check_out_task(self, loaded_db):
assert foo.max_tries == 3

assert bar == ("bar", TaskStatus.BLOCKED.value, None, 0, 3)
taskdb_logger.setLevel(logging.NOTSET)

def test_check_out_task_double_checkout(self, loaded_db):
taskid = loaded_db.check_out_task()
Expand Down

0 comments on commit 5e33b03

Please sign in to comment.