Skip to content

Commit

Permalink
BI-4975: added old_tenant_id parameter to rename_tenant_files task
Browse files Browse the repository at this point in the history
  • Loading branch information
asnytin committed Oct 30, 2023
1 parent 9493976 commit 5856b8e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ class CleanupRequestSchema(BaseRequestSchema):


class RenameFilesRequestSchema(BaseRequestSchema):
old_tenant_id = ma.fields.String(allow_none=True, load_default=None)
tenant_id = ma.fields.String(required=True)
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ class RenameTenantFilesView(FileUploaderBaseView):
async def post(self) -> web.StreamResponse:
req_data = await self._load_post_request_schema_data(misc_schemas.RenameFilesRequestSchema)
tenant_id = req_data["tenant_id"]
old_tenant_id = req_data["old_tenant_id"]

rmm = self.dl_request.get_redis_model_manager()
await RenameTenantStatusModel(manager=rmm, id=tenant_id, status=RenameTenantStatus.scheduled).save()

task_processor = self.dl_request.get_task_processor()
await task_processor.schedule(RenameTenantFilesTask(tenant_id=tenant_id))
LOGGER.info(f"Scheduled RenameTenantFilesTask for tenant_id {tenant_id}")
await task_processor.schedule(RenameTenantFilesTask(tenant_id=tenant_id, old_tenant_id=old_tenant_id))
LOGGER.info(f"Scheduled RenameTenantFilesTask for tenant_id {tenant_id} (old_tenant_id = {old_tenant_id})")

return web.Response()
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ class CleanupTenantFilePreviewsTask(BaseTaskMeta):
class RenameTenantFilesTask(BaseTaskMeta):
name = TaskName("rename_tenant_files_task")

old_tenant_id: Optional[str] = attr.ib(default=None)
tenant_id: str = attr.ib()
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
timedelta,
)
import logging
from typing import Any
from typing import (
Any,
Optional,
)

import attr
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -211,7 +214,8 @@ class RenameTenantFilesTask(BaseExecutorTask[task_interface.RenameTenantFilesTas

async def run(self) -> TaskResult:
tenant_id = self.meta.tenant_id
LOGGER.info(f"RenameTenantFilesTask. Moving to {tenant_id}")
old_tenant_id = self.meta.old_tenant_id
LOGGER.info(f"RenameTenantFilesTask. Moving to {tenant_id} (old_tenant_id = {old_tenant_id})")

rmm = RedisModelManager(
redis=self._ctx.redis_service.get_redis(),
Expand Down Expand Up @@ -256,23 +260,38 @@ async def run(self) -> TaskResult:
assert isinstance(conn, BaseFileS3Connection)
conn_changed = False
for source in conn.data.sources:
old_s3_filename = source.s3_filename
if old_s3_filename is None:
if source.s3_filename is None and source.s3_filename_suffix is None:
LOGGER.info(
f"Cannot rename file for source_id {source.id} - s3_filename not set"
)
continue
if old_s3_filename.startswith(tenant_id):
LOGGER.info(f"File is already tagged with the new tenant: {old_s3_filename}")
continue

old_fname_parts = old_s3_filename.split("_")
if len(old_fname_parts) >= 2 and all(part for part in old_fname_parts):
# assume that first part is old tenant id
old_tenants.add(old_fname_parts[0])
old_s3_filename: str
if source.s3_filename:
old_s3_filename = source.s3_filename
else:
assert old_tenant_id
old_s3_filename = "_".join(
(old_tenant_id, conn.uuid, source.s3_filename_suffix)
)

s3_filename_suffix = make_source_s3_filename_suffix()
if not old_tenant_id:
old_fname_parts = old_s3_filename.split("_")
if len(old_fname_parts) >= 2 and all(part for part in old_fname_parts):
# assume that first part is old tenant id
old_tenant_id = old_fname_parts[0]
old_tenants.add(old_tenant_id)

s3_filename_suffix = (
source.s3_filename_suffix
if source.s3_filename_suffix is not None
else make_source_s3_filename_suffix()
)
new_s3_filename = conn.get_full_s3_filename(s3_filename_suffix)
if old_s3_filename == new_s3_filename:
LOGGER.info(f"The file already has the correct name: {new_s3_filename}")
continue

await s3_client.copy_object(
CopySource=dict(
Bucket=s3_service.persistent_bucket_name,
Expand Down

0 comments on commit 5856b8e

Please sign in to comment.