diff --git a/fixbackend/dispatcher/dispatcher_service.py b/fixbackend/dispatcher/dispatcher_service.py index 2f848899..2ddade0e 100644 --- a/fixbackend/dispatcher/dispatcher_service.py +++ b/fixbackend/dispatcher/dispatcher_service.py @@ -261,6 +261,11 @@ async def send_domain_event(collect_state: Dict[FixCloudAccountId, AccountCollec for k, v in collect_state.items() if isinstance(v.collection_done, CollectionSuccess) } + + if len(collected_accounts) == 0: + log.info(f"No accounts were collected for workspace {workspace_id}. Not sending domain event.") + return + next_run = await self.next_run_repo.get(workspace_id) event = TenantAccountsCollected(workspace_id, collected_accounts, next_run) await self.domain_event_sender.publish(event) diff --git a/tests/fixbackend/dispatcher/dispatcher_service_test.py b/tests/fixbackend/dispatcher/dispatcher_service_test.py index 92ab4ade..0e72f5e8 100644 --- a/tests/fixbackend/dispatcher/dispatcher_service_test.py +++ b/tests/fixbackend/dispatcher/dispatcher_service_test.py @@ -296,6 +296,53 @@ async def jobs_mapping_hash_len() -> int: ) +@pytest.mark.asyncio +async def test_receive_collect_done_message_zero_collected_accounts( + dispatcher: DispatcherService, + metering_repository: MeteringRepository, + workspace: Workspace, + domain_event_sender: InMemoryDomainEventPublisher, + arq_redis: Redis, + redis: Redis, +) -> None: + async def in_progress_hash_len() -> int: + in_progress_hash: Dict[bytes, bytes] = await redis.hgetall( + dispatcher.collect_progress._collect_progress_hash_key(workspace.id) + ) # type: ignore # noqa + return len(in_progress_hash) + + async def jobs_mapping_hash_len() -> int: + in_progress_hash: Dict[bytes, bytes] = await redis.hgetall( + dispatcher.collect_progress._jobs_hash_key(workspace.id) + ) # type: ignore # noqa + return len(in_progress_hash) + + current_events_length = len(domain_event_sender.events) + job_id = uuid.uuid4() + cloud_account_id = FixCloudAccountId(uuid.uuid4()) + message = { + "job_id": str(job_id), + "task_id": "t1", + "tenant_id": str(workspace.id), + "account_info": {}, + "messages": ["m1", "m2"], + "started_at": "2023-09-29T09:00:00Z", + "duration": 18, + } + context = MessageContext("test", "collect-done", "test", utc(), utc()) + + await dispatcher.process_collect_done_message(message, context) + assert await in_progress_hash_len() == 0 + assert await jobs_mapping_hash_len() == 0 + assert await redis.exists(dispatcher.collect_progress._jobs_to_workspace_key(str(job_id))) == 0 + assert await dispatcher.collect_progress.account_collection_ongoing(workspace.id, cloud_account_id) is False + + # no event is emitted in case of no collected accounts + for evt in domain_event_sender.events: + print(evt) + assert len(domain_event_sender.events) == current_events_length + + @pytest.mark.asyncio async def test_receive_collect_error_message( dispatcher: DispatcherService, @@ -354,12 +401,8 @@ async def jobs_mapping_hash_len() -> int: result = [n async for n in metering_repository.list(workspace.id)] assert len(result) == 0 - assert len(domain_event_sender.events) == current_events_length + 1 - assert domain_event_sender.events[-1] == TenantAccountsCollected( - workspace.id, - {}, - None, - ) + # no event is emitted in case of failure + assert len(domain_event_sender.events) == current_events_length @pytest.mark.asyncio