From bf2a1f7eae8c44220e977097cb282b3e043a1e3b Mon Sep 17 00:00:00 2001 From: Nikita Melkozerov Date: Tue, 19 Mar 2024 11:33:01 +0100 Subject: [PATCH] Add redis expiration transactionally with data (#339) --- fixbackend/dispatcher/dispatcher_service.py | 31 +++++++++++---------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/fixbackend/dispatcher/dispatcher_service.py b/fixbackend/dispatcher/dispatcher_service.py index e3811978..2f848899 100644 --- a/fixbackend/dispatcher/dispatcher_service.py +++ b/fixbackend/dispatcher/dispatcher_service.py @@ -82,20 +82,23 @@ async def track_account_collection_progress( cloud_account_id=cloud_account_id, account_id=account_id, started_at=now ).to_json_str() # store account_collect_progress - await self.redis.hset( - name=self._collect_progress_hash_key(workspace_id), key=str(cloud_account_id), value=value - ) # type: ignore - # store job_id -> cloud_account_id mapping - await self.redis.hset(name=self._jobs_hash_key(workspace_id), key=str(job_id), value=str(cloud_account_id)) # type: ignore - - # store job_id -> workspace_id mapping - await self.redis.set(name=self._jobs_to_workspace_key(str(job_id)), value=str(workspace_id)) - # cleanup after 4 hours just to be sure - expiration = timedelta(hours=4) - - await self.redis.expire(name=self._collect_progress_hash_key(workspace_id), time=expiration) - await self.redis.expire(name=self._jobs_hash_key(workspace_id), time=expiration) - await self.redis.expire(name=self._jobs_to_workspace_key(str(job_id)), time=expiration) + async with self.redis.pipeline(transaction=True) as pipe: + await pipe.hset( + name=self._collect_progress_hash_key(workspace_id), key=str(cloud_account_id), value=value + ) # type: ignore + # store job_id -> cloud_account_id mapping + await pipe.hset(name=self._jobs_hash_key(workspace_id), key=str(job_id), value=str(cloud_account_id)) # type: ignore + + # store job_id -> workspace_id mapping + await pipe.set(name=self._jobs_to_workspace_key(str(job_id)), value=str(workspace_id)) + # cleanup after 4 hours just to be sure + expiration = timedelta(hours=4) + + await pipe.expire(name=self._collect_progress_hash_key(workspace_id), time=expiration) + await pipe.expire(name=self._jobs_hash_key(workspace_id), time=expiration) + await pipe.expire(name=self._jobs_to_workspace_key(str(job_id)), time=expiration) + + await pipe.execute() async def get_tenant_collect_state( self, workspace_id: WorkspaceId