Skip to content

Commit

Permalink
Tests for Mapping Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
bencap committed Oct 7, 2024
1 parent f69dfe4 commit 4acef84
Showing 1 changed file with 294 additions and 20 deletions.
314 changes: 294 additions & 20 deletions tests/worker/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,39 +965,313 @@ async def test_create_mapped_variants_for_scoreset_no_mapping_output(
assert score_set.mapping_state == MappingState.failed


@pytest.mark.skip
@pytest.mark.asyncio
async def test_mapping_manager_empty_queue(setup_worker_db, standalone_worker_context, session):
queued_job = await variant_mapper_manager(standalone_worker_context)
async def test_mapping_manager_empty_queue(setup_worker_db, standalone_worker_context):
result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# No new jobs should have been created if nothing is in the queue.
assert queued_job is None
session.commit()
# No new jobs should have been created if nothing is in the queue, and the queue should remain empty.
assert result["enqueued_job"] is None
assert result["success"]
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0
assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == ""


@pytest.mark.skip
@pytest.mark.asyncio
async def test_mapping_manager_occupied_queue_mapping_in_progress(setup_worker_db, standalone_worker_context, session):
await standalone_worker_context["redis"].lpush(MAPPING_QUEUE_NAME, "mavedb:test-urn")
async def test_mapping_manager_empty_queue_error_during_setup(setup_worker_db, standalone_worker_context):
await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "")
with patch.object(ArqRedis, "rpop", Exception()):
result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# No new jobs should have been created if nothing is in the queue, and the queue should remain empty.
assert result["enqueued_job"] is None
assert not result["success"]
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0
assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == ""


@pytest.mark.asyncio
async def test_mapping_manager_occupied_queue_mapping_in_progress(
setup_worker_db, standalone_worker_context, session, async_client, data_files
):
score_set = await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)

await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "5")
with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress):
queued_job = await variant_mapper_manager(standalone_worker_context)
result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# Execution should be deferred if a job is in progress.
assert await queued_job.status() is arq.jobs.JobStatus.deferred
session.commit()
# Execution should be deferred if a job is in progress, and the queue should contain one entry which is the deferred ID.
assert result["enqueued_job"] is not None
assert (
await arq.jobs.Job(result["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.deferred
assert result["success"]
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 1
assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set.id)
assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == "5"
assert score_set.mapping_state == MappingState.queued
assert score_set.mapping_errors is None


@pytest.mark.skip
@pytest.mark.asyncio
async def test_mapping_manager_occupied_queue_mapping_not_in_progress(
setup_worker_db, standalone_worker_context, session
setup_worker_db, standalone_worker_context, session, async_client, data_files
):
await standalone_worker_context["redis"].lpush(MAPPING_QUEUE_NAME, "mavedb:test-urn")
score_set = await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)

await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "")
with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.not_found):
queued_job = await variant_mapper_manager(standalone_worker_context)
result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# VRS Mapping jobs have the same ID.
assert queued_job.job_id == "vrs_map"
session.commit()
# Mapping job should be queued if none is currently running, and the queue should now be empty.
assert result["enqueued_job"] is not None
assert (
await arq.jobs.Job(result["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.queued
assert result["success"]
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0
# We don't actually start processing these score sets.
assert score_set.mapping_state == MappingState.queued
assert score_set.mapping_errors is None


@pytest.mark.asyncio
async def test_mapping_manager_occupied_queue_mapping_in_progress_error_during_enqueue(
setup_worker_db, standalone_worker_context, session, async_client, data_files
):
score_set = await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)

await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "5")
with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress), patch.object(
ArqRedis, "enqueue_job", return_value=awaitable_exception()
):
result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# Execution should be deferred if a job is in progress, and the queue should contain one entry which is the deferred ID.
assert result["enqueued_job"] is None
assert not result["success"]
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0
assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == "5"
assert score_set.mapping_state == MappingState.failed
assert score_set.mapping_errors is not None


@pytest.mark.asyncio
async def test_mapping_manager_occupied_queue_mapping_not_in_progress_error_during_enqueue(
setup_worker_db, standalone_worker_context, session, async_client, data_files
):
score_set = await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)

await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "")
with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.not_found), patch.object(
ArqRedis, "enqueue_job", return_value=awaitable_exception()
):
result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# Enqueue would have failed, the job is unsuccessful, and we remove the queued item.
assert result["enqueued_job"] is None
assert not result["success"]
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0
assert score_set.mapping_state == MappingState.failed
assert score_set.mapping_errors is not None


@pytest.mark.asyncio
async def test_mapping_manager_multiple_score_sets_occupy_queue_mapping_in_progress(
setup_worker_db, standalone_worker_context, session, async_client, data_files
):
score_set_id_1 = (
await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)
).id
score_set_id_2 = (
await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)
).id
score_set_id_3 = (
await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)
).id

await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "5")
with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress):
result1 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)
result2 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)
result3 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# All three jobs should complete successfully...
assert result1["success"]
assert result2["success"]
assert result3["success"]

# ...with a new job enqueued...
assert result1["enqueued_job"] is not None
assert result2["enqueued_job"] is not None
assert result3["enqueued_job"] is not None

# ...of which all should be deferred jobs of the "variant_mapper_manager" variety...
assert (
await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.deferred
assert (
await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.deferred
assert (
await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.deferred

assert (
await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).info()
).function == "variant_mapper_manager"
assert (
await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).info()
).function == "variant_mapper_manager"
assert (
await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).info()
).function == "variant_mapper_manager"

# ...and the queue state should have three jobs, each of our three created score sets.
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 3
assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_1)
assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_2)
assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_3)

score_set1 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_1)).one()
score_set2 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_2)).one()
score_set3 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_3)).one()
# Each score set should remain queued with no mapping errors.
assert score_set1.mapping_state == MappingState.queued
assert score_set2.mapping_state == MappingState.queued
assert score_set3.mapping_state == MappingState.queued
assert score_set1.mapping_errors is None
assert score_set2.mapping_errors is None
assert score_set3.mapping_errors is None


@pytest.mark.asyncio
async def test_mapping_manager_multiple_score_sets_occupy_queue_mapping_not_in_progress(
setup_worker_db, standalone_worker_context, session, async_client, data_files
):
score_set_id_1 = (
await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)
).id
score_set_id_2 = (
await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)
).id
score_set_id_3 = (
await setup_records_files_and_variants(
session,
async_client,
data_files,
TEST_MINIMAL_SEQ_SCORESET,
standalone_worker_context,
)
).id

await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "")
with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.not_found):
result1 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# Mock the first job being in-progress
await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, str(score_set_id_1))
with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress):
result2 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)
result3 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1)

# All three jobs should complete successfully...
assert result1["success"]
assert result2["success"]
assert result3["success"]

# ...with a new job enqueued...
assert result1["enqueued_job"] is not None
assert result2["enqueued_job"] is not None
assert result3["enqueued_job"] is not None

# ...of which the first should be a queued job of the "map_variants_for_score_set" variety and the other two should be
# deferred jobs of the "variant_mapper_manager" variety...
assert (
await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.queued
assert (
await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.deferred
assert (
await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).status()
) == arq.jobs.JobStatus.deferred

assert (
await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).info()
).function == "map_variants_for_score_set"
assert (
await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).info()
).function == "variant_mapper_manager"
assert (
await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).info()
).function == "variant_mapper_manager"

# ...and the queue state should have two jobs, neither of which should be the first score set.
assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 2
assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_2)
assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_3)

score_set1 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_1)).one()
score_set2 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_2)).one()
score_set3 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_3)).one()
# We don't actually process any score sets in the manager job, and each should have no mapping errors.
assert score_set1.mapping_state == MappingState.queued
assert score_set2.mapping_state == MappingState.queued
assert score_set3.mapping_state == MappingState.queued
assert score_set1.mapping_errors is None
assert score_set2.mapping_errors is None
assert score_set3.mapping_errors is None

0 comments on commit 4acef84

Please sign in to comment.