From 5856b8e49c75a221ad054f17eacf54aeee235578 Mon Sep 17 00:00:00 2001 From: Andrey Snytin Date: Mon, 30 Oct 2023 23:36:53 +0100 Subject: [PATCH] BI-4975: added old_tenant_id parameter to rename_tenant_files task --- .../dl_file_uploader_api_lib/schemas/misc.py | 1 + .../dl_file_uploader_api_lib/views/misc.py | 5 ++- .../dl_file_uploader_task_interface/tasks.py | 1 + .../tasks/cleanup.py | 43 +++++++++++++------ 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/misc.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/misc.py index d8da72ec0..4749a0f47 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/misc.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/misc.py @@ -8,4 +8,5 @@ class CleanupRequestSchema(BaseRequestSchema): class RenameFilesRequestSchema(BaseRequestSchema): + old_tenant_id = ma.fields.String(allow_none=True, load_default=None) tenant_id = ma.fields.String(required=True) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/misc.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/misc.py index ace07d9cf..2d89ca842 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/misc.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/misc.py @@ -68,12 +68,13 @@ class RenameTenantFilesView(FileUploaderBaseView): async def post(self) -> web.StreamResponse: req_data = await self._load_post_request_schema_data(misc_schemas.RenameFilesRequestSchema) tenant_id = req_data["tenant_id"] + old_tenant_id = req_data["old_tenant_id"] rmm = self.dl_request.get_redis_model_manager() await RenameTenantStatusModel(manager=rmm, id=tenant_id, status=RenameTenantStatus.scheduled).save() task_processor = self.dl_request.get_task_processor() - await task_processor.schedule(RenameTenantFilesTask(tenant_id=tenant_id)) - LOGGER.info(f"Scheduled RenameTenantFilesTask for tenant_id {tenant_id}") + await task_processor.schedule(RenameTenantFilesTask(tenant_id=tenant_id, old_tenant_id=old_tenant_id)) + LOGGER.info(f"Scheduled RenameTenantFilesTask for tenant_id {tenant_id} (old_tenant_id = {old_tenant_id})") return web.Response() diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index 682df4904..56b5a88bb 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -110,4 +110,5 @@ class CleanupTenantFilePreviewsTask(BaseTaskMeta): class RenameTenantFilesTask(BaseTaskMeta): name = TaskName("rename_tenant_files_task") + old_tenant_id: Optional[str] = attr.ib(default=None) tenant_id: str = attr.ib() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py index ef24d20ae..b43c83142 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py @@ -3,7 +3,10 @@ timedelta, ) import logging -from typing import Any +from typing import ( + Any, + Optional, +) import attr from botocore.exceptions import ClientError @@ -211,7 +214,8 @@ class RenameTenantFilesTask(BaseExecutorTask[task_interface.RenameTenantFilesTas async def run(self) -> TaskResult: tenant_id = self.meta.tenant_id - LOGGER.info(f"RenameTenantFilesTask. Moving to {tenant_id}") + old_tenant_id = self.meta.old_tenant_id + LOGGER.info(f"RenameTenantFilesTask. Moving to {tenant_id} (old_tenant_id = {old_tenant_id})") rmm = RedisModelManager( redis=self._ctx.redis_service.get_redis(), @@ -256,23 +260,38 @@ async def run(self) -> TaskResult: assert isinstance(conn, BaseFileS3Connection) conn_changed = False for source in conn.data.sources: - old_s3_filename = source.s3_filename - if old_s3_filename is None: + if source.s3_filename is None and source.s3_filename_suffix is None: LOGGER.info( f"Cannot rename file for source_id {source.id} - s3_filename not set" ) continue - if old_s3_filename.startswith(tenant_id): - LOGGER.info(f"File is already tagged with the new tenant: {old_s3_filename}") - continue - old_fname_parts = old_s3_filename.split("_") - if len(old_fname_parts) >= 2 and all(part for part in old_fname_parts): - # assume that first part is old tenant id - old_tenants.add(old_fname_parts[0]) + old_s3_filename: str + if source.s3_filename: + old_s3_filename = source.s3_filename + else: + assert old_tenant_id + old_s3_filename = "_".join( + (old_tenant_id, conn.uuid, source.s3_filename_suffix) + ) - s3_filename_suffix = make_source_s3_filename_suffix() + if not old_tenant_id: + old_fname_parts = old_s3_filename.split("_") + if len(old_fname_parts) >= 2 and all(part for part in old_fname_parts): + # assume that first part is old tenant id + old_tenant_id = old_fname_parts[0] + old_tenants.add(old_tenant_id) + + s3_filename_suffix = ( + source.s3_filename_suffix + if source.s3_filename_suffix is not None + else make_source_s3_filename_suffix() + ) new_s3_filename = conn.get_full_s3_filename(s3_filename_suffix) + if old_s3_filename == new_s3_filename: + LOGGER.info(f"The file already has the correct name: {new_s3_filename}") + continue + await s3_client.copy_object( CopySource=dict( Bucket=s3_service.persistent_bucket_name,