Skip to content

Commit

Permalink
Do not publish an event if zero accounts were collected
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Apr 12, 2024
1 parent 819311f commit 4fb5dcd
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
5 changes: 5 additions & 0 deletions fixbackend/dispatcher/dispatcher_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ async def send_domain_event(collect_state: Dict[FixCloudAccountId, AccountCollec
for k, v in collect_state.items()
if isinstance(v.collection_done, CollectionSuccess)
}

if len(collected_accounts) == 0:
log.info(f"No accounts were collected for workspace {workspace_id}. Not sending domain event.")
return

next_run = await self.next_run_repo.get(workspace_id)
event = TenantAccountsCollected(workspace_id, collected_accounts, next_run)
await self.domain_event_sender.publish(event)
Expand Down
55 changes: 49 additions & 6 deletions tests/fixbackend/dispatcher/dispatcher_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,53 @@ async def jobs_mapping_hash_len() -> int:
)


@pytest.mark.asyncio
async def test_receive_collect_done_message_zero_collected_accounts(
dispatcher: DispatcherService,
metering_repository: MeteringRepository,
workspace: Workspace,
domain_event_sender: InMemoryDomainEventPublisher,
arq_redis: Redis,
redis: Redis,
) -> None:
async def in_progress_hash_len() -> int:
in_progress_hash: Dict[bytes, bytes] = await redis.hgetall(
dispatcher.collect_progress._collect_progress_hash_key(workspace.id)
) # type: ignore # noqa
return len(in_progress_hash)

async def jobs_mapping_hash_len() -> int:
in_progress_hash: Dict[bytes, bytes] = await redis.hgetall(
dispatcher.collect_progress._jobs_hash_key(workspace.id)
) # type: ignore # noqa
return len(in_progress_hash)

current_events_length = len(domain_event_sender.events)
job_id = uuid.uuid4()
cloud_account_id = FixCloudAccountId(uuid.uuid4())
message = {
"job_id": str(job_id),
"task_id": "t1",
"tenant_id": str(workspace.id),
"account_info": {},
"messages": ["m1", "m2"],
"started_at": "2023-09-29T09:00:00Z",
"duration": 18,
}
context = MessageContext("test", "collect-done", "test", utc(), utc())

await dispatcher.process_collect_done_message(message, context)
assert await in_progress_hash_len() == 0
assert await jobs_mapping_hash_len() == 0
assert await redis.exists(dispatcher.collect_progress._jobs_to_workspace_key(str(job_id))) == 0
assert await dispatcher.collect_progress.account_collection_ongoing(workspace.id, cloud_account_id) is False

# no event is emitted in case of no collected accounts
for evt in domain_event_sender.events:
print(evt)
assert len(domain_event_sender.events) == current_events_length


@pytest.mark.asyncio
async def test_receive_collect_error_message(
dispatcher: DispatcherService,
Expand Down Expand Up @@ -354,12 +401,8 @@ async def jobs_mapping_hash_len() -> int:
result = [n async for n in metering_repository.list(workspace.id)]
assert len(result) == 0

assert len(domain_event_sender.events) == current_events_length + 1
assert domain_event_sender.events[-1] == TenantAccountsCollected(
workspace.id,
{},
None,
)
# no event is emitted in case of failure
assert len(domain_event_sender.events) == current_events_length


@pytest.mark.asyncio
Expand Down

0 comments on commit 4fb5dcd

Please sign in to comment.