Skip to content

Commit

Permalink
Add redis expiration transactionally with data
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Mar 18, 2024
1 parent 8d9c804 commit 761de5d
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions fixbackend/dispatcher/dispatcher_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,23 @@ async def track_account_collection_progress(
cloud_account_id=cloud_account_id, account_id=account_id, started_at=now
).to_json_str()
# store account_collect_progress
await self.redis.hset(
name=self._collect_progress_hash_key(workspace_id), key=str(cloud_account_id), value=value
) # type: ignore
# store job_id -> cloud_account_id mapping
await self.redis.hset(name=self._jobs_hash_key(workspace_id), key=str(job_id), value=str(cloud_account_id)) # type: ignore

# store job_id -> workspace_id mapping
await self.redis.set(name=self._jobs_to_workspace_key(str(job_id)), value=str(workspace_id))
# cleanup after 4 hours just to be sure
expiration = timedelta(hours=4)

await self.redis.expire(name=self._collect_progress_hash_key(workspace_id), time=expiration)
await self.redis.expire(name=self._jobs_hash_key(workspace_id), time=expiration)
await self.redis.expire(name=self._jobs_to_workspace_key(str(job_id)), time=expiration)
async with self.redis.pipeline(transaction=True) as pipe:
await pipe.hset(
name=self._collect_progress_hash_key(workspace_id), key=str(cloud_account_id), value=value
) # type: ignore
# store job_id -> cloud_account_id mapping
await pipe.hset(name=self._jobs_hash_key(workspace_id), key=str(job_id), value=str(cloud_account_id)) # type: ignore

# store job_id -> workspace_id mapping
await pipe.set(name=self._jobs_to_workspace_key(str(job_id)), value=str(workspace_id))
# cleanup after 4 hours just to be sure
expiration = timedelta(hours=4)

await pipe.expire(name=self._collect_progress_hash_key(workspace_id), time=expiration)
await pipe.expire(name=self._jobs_hash_key(workspace_id), time=expiration)
await pipe.expire(name=self._jobs_to_workspace_key(str(job_id)), time=expiration)

await pipe.execute()

async def get_tenant_collect_state(
self, workspace_id: WorkspaceId
Expand Down

0 comments on commit 761de5d

Please sign in to comment.