diff --git a/tests/worker/test_jobs.py b/tests/worker/test_jobs.py index eeaa3e2d..f746c036 100644 --- a/tests/worker/test_jobs.py +++ b/tests/worker/test_jobs.py @@ -965,39 +965,313 @@ async def test_create_mapped_variants_for_scoreset_no_mapping_output( assert score_set.mapping_state == MappingState.failed -@pytest.mark.skip @pytest.mark.asyncio -async def test_mapping_manager_empty_queue(setup_worker_db, standalone_worker_context, session): - queued_job = await variant_mapper_manager(standalone_worker_context) +async def test_mapping_manager_empty_queue(setup_worker_db, standalone_worker_context): + result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) - # No new jobs should have been created if nothing is in the queue. - assert queued_job is None - session.commit() + # No new jobs should have been created if nothing is in the queue, and the queue should remain empty. + assert result["enqueued_job"] is None + assert result["success"] + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0 + assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == "" -@pytest.mark.skip @pytest.mark.asyncio -async def test_mapping_manager_occupied_queue_mapping_in_progress(setup_worker_db, standalone_worker_context, session): - await standalone_worker_context["redis"].lpush(MAPPING_QUEUE_NAME, "mavedb:test-urn") +async def test_mapping_manager_empty_queue_error_during_setup(setup_worker_db, standalone_worker_context): + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "") + with patch.object(ArqRedis, "rpop", Exception()): + result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + # No new jobs should have been created if nothing is in the queue, and the queue should remain empty. + assert result["enqueued_job"] is None + assert not result["success"] + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0 + assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == "" + + +@pytest.mark.asyncio +async def test_mapping_manager_occupied_queue_mapping_in_progress( + setup_worker_db, standalone_worker_context, session, async_client, data_files +): + score_set = await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "5") with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress): - queued_job = await variant_mapper_manager(standalone_worker_context) + result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) - # Execution should be deferred if a job is in progress. - assert await queued_job.status() is arq.jobs.JobStatus.deferred - session.commit() + # Execution should be deferred if a job is in progress, and the queue should contain one entry which is the deferred ID. + assert result["enqueued_job"] is not None + assert ( + await arq.jobs.Job(result["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.deferred + assert result["success"] + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 1 + assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set.id) + assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == "5" + assert score_set.mapping_state == MappingState.queued + assert score_set.mapping_errors is None -@pytest.mark.skip @pytest.mark.asyncio async def test_mapping_manager_occupied_queue_mapping_not_in_progress( - setup_worker_db, standalone_worker_context, session + setup_worker_db, standalone_worker_context, session, async_client, data_files ): - await standalone_worker_context["redis"].lpush(MAPPING_QUEUE_NAME, "mavedb:test-urn") + score_set = await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "") with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.not_found): - queued_job = await variant_mapper_manager(standalone_worker_context) + result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) - # VRS Mapping jobs have the same ID. - assert queued_job.job_id == "vrs_map" - session.commit() + # Mapping job should be queued if none is currently running, and the queue should now be empty. + assert result["enqueued_job"] is not None + assert ( + await arq.jobs.Job(result["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.queued + assert result["success"] + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0 + # We don't actually start processing these score sets. + assert score_set.mapping_state == MappingState.queued + assert score_set.mapping_errors is None + + +@pytest.mark.asyncio +async def test_mapping_manager_occupied_queue_mapping_in_progress_error_during_enqueue( + setup_worker_db, standalone_worker_context, session, async_client, data_files +): + score_set = await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "5") + with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress), patch.object( + ArqRedis, "enqueue_job", return_value=awaitable_exception() + ): + result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + + # Execution should be deferred if a job is in progress, and the queue should contain one entry which is the deferred ID. + assert result["enqueued_job"] is None + assert not result["success"] + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0 + assert (await standalone_worker_context["redis"].get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == "5" + assert score_set.mapping_state == MappingState.failed + assert score_set.mapping_errors is not None + + +@pytest.mark.asyncio +async def test_mapping_manager_occupied_queue_mapping_not_in_progress_error_during_enqueue( + setup_worker_db, standalone_worker_context, session, async_client, data_files +): + score_set = await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "") + with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.not_found), patch.object( + ArqRedis, "enqueue_job", return_value=awaitable_exception() + ): + result = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + + # Enqueue would have failed, the job is unsuccessful, and we remove the queued item. + assert result["enqueued_job"] is None + assert not result["success"] + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 0 + assert score_set.mapping_state == MappingState.failed + assert score_set.mapping_errors is not None + + +@pytest.mark.asyncio +async def test_mapping_manager_multiple_score_sets_occupy_queue_mapping_in_progress( + setup_worker_db, standalone_worker_context, session, async_client, data_files +): + score_set_id_1 = ( + await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + ).id + score_set_id_2 = ( + await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + ).id + score_set_id_3 = ( + await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + ).id + + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "5") + with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress): + result1 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + result2 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + result3 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + + # All three jobs should complete successfully... + assert result1["success"] + assert result2["success"] + assert result3["success"] + + # ...with a new job enqueued... + assert result1["enqueued_job"] is not None + assert result2["enqueued_job"] is not None + assert result3["enqueued_job"] is not None + + # ...of which all should be deferred jobs of the "variant_mapper_manager" variety... + assert ( + await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.deferred + assert ( + await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.deferred + assert ( + await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.deferred + + assert ( + await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).info() + ).function == "variant_mapper_manager" + assert ( + await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).info() + ).function == "variant_mapper_manager" + assert ( + await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).info() + ).function == "variant_mapper_manager" + + # ...and the queue state should have three jobs, each of our three created score sets. + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 3 + assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_1) + assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_2) + assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_3) + + score_set1 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_1)).one() + score_set2 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_2)).one() + score_set3 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_3)).one() + # Each score set should remain queued with no mapping errors. + assert score_set1.mapping_state == MappingState.queued + assert score_set2.mapping_state == MappingState.queued + assert score_set3.mapping_state == MappingState.queued + assert score_set1.mapping_errors is None + assert score_set2.mapping_errors is None + assert score_set3.mapping_errors is None + + +@pytest.mark.asyncio +async def test_mapping_manager_multiple_score_sets_occupy_queue_mapping_not_in_progress( + setup_worker_db, standalone_worker_context, session, async_client, data_files +): + score_set_id_1 = ( + await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + ).id + score_set_id_2 = ( + await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + ).id + score_set_id_3 = ( + await setup_records_files_and_variants( + session, + async_client, + data_files, + TEST_MINIMAL_SEQ_SCORESET, + standalone_worker_context, + ) + ).id + + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, "") + with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.not_found): + result1 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + + # Mock the first job being in-progress + await standalone_worker_context["redis"].set(MAPPING_CURRENT_ID_NAME, str(score_set_id_1)) + with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress): + result2 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + result3 = await variant_mapper_manager(standalone_worker_context, uuid4().hex, 1) + + # All three jobs should complete successfully... + assert result1["success"] + assert result2["success"] + assert result3["success"] + + # ...with a new job enqueued... + assert result1["enqueued_job"] is not None + assert result2["enqueued_job"] is not None + assert result3["enqueued_job"] is not None + + # ...of which the first should be a queued job of the "map_variants_for_score_set" variety and the other two should be + # deferred jobs of the "variant_mapper_manager" variety... + assert ( + await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.queued + assert ( + await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.deferred + assert ( + await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).status() + ) == arq.jobs.JobStatus.deferred + + assert ( + await arq.jobs.Job(result1["enqueued_job"], standalone_worker_context["redis"]).info() + ).function == "map_variants_for_score_set" + assert ( + await arq.jobs.Job(result2["enqueued_job"], standalone_worker_context["redis"]).info() + ).function == "variant_mapper_manager" + assert ( + await arq.jobs.Job(result3["enqueued_job"], standalone_worker_context["redis"]).info() + ).function == "variant_mapper_manager" + + # ...and the queue state should have two jobs, neither of which should be the first score set. + assert (await standalone_worker_context["redis"].llen(MAPPING_QUEUE_NAME)) == 2 + assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_2) + assert (await standalone_worker_context["redis"].rpop(MAPPING_QUEUE_NAME)).decode("utf-8") == str(score_set_id_3) + + score_set1 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_1)).one() + score_set2 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_2)).one() + score_set3 = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.id == score_set_id_3)).one() + # We don't actually process any score sets in the manager job, and each should have no mapping errors. + assert score_set1.mapping_state == MappingState.queued + assert score_set2.mapping_state == MappingState.queued + assert score_set3.mapping_state == MappingState.queued + assert score_set1.mapping_errors is None + assert score_set2.mapping_errors is None + assert score_set3.mapping_errors is None