From 3774068a8023f773d7c0cb581b1d6e296cd85b31 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Tue, 10 Oct 2023 18:31:51 +0300 Subject: [PATCH] Download yadocs task --- Taskfile.dist.yml | 2 +- .../schemas/sources.py | 1 + .../dl_file_uploader_lib/enums.py | 1 + .../redis_model/models/__init__.py | 2 + .../redis_model/models/models.py | 10 +++ .../redis_model/models/storage_schemas.py | 11 +++ .../yadocuments_client.py | 43 ++++++++++ .../dl_file_uploader_task_interface/tasks.py | 7 ++ .../tasks/__init__.py | 6 +- .../tasks/download.py | 84 +++++++++++++++++++ .../ext/test_yadocuments.py | 35 ++++++++ 11 files changed, 200 insertions(+), 2 deletions(-) create mode 100644 lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py create mode 100644 lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py diff --git a/Taskfile.dist.yml b/Taskfile.dist.yml index 5b8b2482b..eae80e6c0 100644 --- a/Taskfile.dist.yml +++ b/Taskfile.dist.yml @@ -2,7 +2,7 @@ version: '3' vars: USER_REL_PATH: - sh: python -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))" + sh: python3 -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))" ROOT_DIR: sh: pwd diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py index 445b64c20..1ce595a29 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py @@ -107,6 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema): FileType.csv.name: SourceInfoSchemaBase, FileType.gsheets.name: SourceInfoSchemaGSheets, FileType.xlsx.name: SourceInfoSchemaBase, + FileType.yadocuments.name: SourceInfoSchemaBase, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py index 3d3627557..432f6987d 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py @@ -18,6 +18,7 @@ class FileType(Enum): csv = "csv" gsheets = "gsheets" xlsx = "xlsx" + yadocuments = "yadocuments" class CSVEncoding(Enum): diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py index b3bbd3d14..d07e8d0c7 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py @@ -19,6 +19,7 @@ RenameTenantStatusModel, SourceNotFoundError, SpreadsheetFileSourceSettings, + YaDocumentsUserSourceProperties, ) from .storage_schemas import ( DataFileSchema, @@ -46,6 +47,7 @@ "EmptySourcesError", "RenameTenantStatusModel", "PreviewSet", + "YaDocumentsUserSourceProperties", ) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py index 85abda58a..10d729382 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py @@ -102,6 +102,16 @@ def get_secret_keys(self) -> set[DataKey]: return {DataKey(parts=("refresh_token",))} +@attr.s(init=True, kw_only=True) +class YaDocumentsUserSourceProperties(UserSourceProperties): + file_type: FileType = attr.ib(default=FileType.yadocuments) + + private_path: Optional[str] = attr.ib(default=None) + public_link: Optional[str] = attr.ib(default=None) + + oauth_token: Optional[str] = attr.ib(default=None) + + @attr.s(init=True, kw_only=True) class UserSourceDataSourceProperties: file_type: FileType = attr.ib() diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py index b138dac9d..1d80265f6 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py @@ -35,6 +35,7 @@ GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, RenameTenantStatusModel, + YaDocumentsUserSourceProperties, ) @@ -151,9 +152,19 @@ class Meta(BaseSchema.Meta): spreadsheet_id = fields.String() +class YaDocumentsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema): + class Meta(BaseSchema.Meta): + target = YaDocumentsUserSourceProperties + + private_path = fields.String(allow_none=True) + public_link = fields.String(allow_none=True) + oauth_token = fields.String(allow_none=True) + + class UserSourcePropertiesSchema(FileTypeOneOfSchema): type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = { FileType.gsheets.name: GSheetsUserSourcePropertiesSchema, + FileType.yadocuments.name: YaDocumentsUserSourcePropertiesSchema, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py new file mode 100644 index 000000000..57e869708 --- /dev/null +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py @@ -0,0 +1,43 @@ +import requests + + +class YaDocumentsClient: + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + + def get_spreadsheet_public(self, link): + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/public/resources/download/?public_key={link}", + headers=self.headers, + ) + return resp.json()["href"] + + def get_spreadsheet_public_meta(self, link): + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/public/resources/?public_key={link}", + headers=self.headers, + ) + return resp.json() + + def get_spreadsheet_private(self, path, token): + headers_with_token = self._create_headers_with_token(token) + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/resources/download/?path={path}", + headers=headers_with_token, + ) + return resp.json()["href"] + + def get_spreadsheet_private_meta(self, path, token): + headers_with_token = self._create_headers_with_token(token) + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/resources/?path={path}", + headers=headers_with_token, + ) + return resp.json() + + def _create_headers_with_token(self, token: str): + headers_with_token = self.headers.copy() + headers_with_token.update({"Authorization": "OAuth " + token}) + return headers_with_token diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index ee8ff92be..772e19461 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -28,6 +28,13 @@ class DownloadGSheetTask(BaseTaskMeta): exec_mode: TaskExecutionMode = attr.ib(default=TaskExecutionMode.BASIC) +@attr.s +class DownloadYaDocumentsTask(BaseTaskMeta): + name = TaskName("download_yadocuments") + + file_id: str = attr.ib() + + @attr.s class ParseFileTask(BaseTaskMeta): name = TaskName("parse_file") diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py index c3c7b5b4d..19da19f71 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py @@ -7,7 +7,10 @@ RenameTenantFilesTask, ) from .delete import DeleteFileTask -from .download import DownloadGSheetTask +from .download import ( + DownloadGSheetTask, + DownloadYaDocumentsTask, +) from .excel import ProcessExcelTask from .parse import ParseFileTask from .save import SaveSourceTask @@ -16,6 +19,7 @@ REGISTRY: TaskRegistry = TaskRegistry.create( [ DownloadGSheetTask, + DownloadYaDocumentsTask, ParseFileTask, ProcessExcelTask, SaveSourceTask, diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py index 8750015e8..2c9093fd2 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py @@ -11,6 +11,7 @@ ) import aiogoogle +import aiohttp import attr from dl_constants.enums import ( @@ -26,6 +27,10 @@ from dl_core.us_manager.us_manager_async import AsyncUSManager from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.json_each_row import S3JsonEachRowUntypedFileAsyncDataSink +from dl_file_uploader_lib.data_sink.raw_bytes import ( + RawBytesAsyncDataStream, + S3RawFileAsyncDataSink, +) from dl_file_uploader_lib.gsheets_client import ( GSheetsClient, GSheetsOAuth2, @@ -39,7 +44,9 @@ GSheetsFileSourceSettings, GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, + YaDocumentsUserSourceProperties, ) +from dl_file_uploader_lib.yadocuments_client import YaDocumentsClient from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface from dl_file_uploader_task_interface.tasks import TaskExecutionMode @@ -340,3 +347,80 @@ async def run(self) -> TaskResult: finally: await usm.close() return Success() + + +@attr.s +class DownloadYaDocumentsTask(BaseExecutorTask[task_interface.DownloadYaDocumentsTask, FileUploaderTaskContext]): + cls_meta = task_interface.DownloadYaDocumentsTask + + async def run(self) -> TaskResult: + dfile: Optional[DataFile] = None + redis = self._ctx.redis_service.get_redis() + + try: + rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) + dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) + assert dfile is not None + + assert isinstance(dfile.user_source_properties, YaDocumentsUserSourceProperties) + + yadocs_client = YaDocumentsClient() + + if dfile.user_source_properties.public_link is not None: + spreadsheet_ref = yadocs_client.get_spreadsheet_public(link=dfile.user_source_properties.public_link) + spreadsheet_meta = yadocs_client.get_spreadsheet_public_meta( + link=dfile.user_source_properties.public_link + ) + + elif ( + dfile.user_source_properties.private_path is not None + and dfile.user_source_properties.oauth_token is not None + ): + spreadsheet_ref = yadocs_client.get_spreadsheet_private( + path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token + ) + spreadsheet_meta = yadocs_client.get_spreadsheet_private_meta( + path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token + ) + else: + raise exc.DLFileUploaderBaseError() + dfile.filename = spreadsheet_meta["name"] + + s3 = self._ctx.s3_service + + async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): + async with aiohttp.ClientSession() as session: + async with session.get(spreadsheet_ref) as response: + assert response.status == 200 + while True: + chunk = await response.content.read(chunk_size) + if chunk: + LOGGER.debug(f"Received chunk of {len(chunk)} bytes.") + yield chunk + else: + LOGGER.info("Empty chunk received.") + break + + data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) + async with S3RawFileAsyncDataSink( + s3=s3.client, + s3_key=dfile.s3_key, + bucket_name=s3.tmp_bucket_name, + ) as data_sink: + await data_sink.dump_data_stream(data_stream) + + await dfile.save() + LOGGER.info(f'Uploaded file "{dfile.filename}".') + + except Exception as ex: + LOGGER.exception(ex) + if dfile is None: + return Retry(attempts=3) + else: + dfile.status = FileProcessingStatus.failed + exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed() + dfile.error = FileProcessingError.from_exception(exc_to_save) + await dfile.save() + + return Fail() + return Success() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py new file mode 100644 index 000000000..dfd17e417 --- /dev/null +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py @@ -0,0 +1,35 @@ +import pytest + +from dl_constants.enums import FileProcessingStatus +from dl_file_uploader_lib.enums import FileType +from dl_file_uploader_lib.redis_model.models import ( + DataFile, + YaDocumentsUserSourceProperties, +) +from dl_file_uploader_task_interface.tasks import DownloadYaDocumentsTask +from dl_task_processor.state import wait_task +from dl_testing.s3_utils import s3_file_exists + + +@pytest.mark.asyncio +async def test_download_yadocs_task( + task_processor_client, + task_state, + s3_client, + redis_model_manager, + s3_tmp_bucket, +): + df = DataFile( + filename="", + file_type=FileType.yadocuments, + manager=redis_model_manager, + status=FileProcessingStatus.in_progress, + user_source_properties=YaDocumentsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), + ) + await df.save() + + task = await task_processor_client.schedule(DownloadYaDocumentsTask(file_id=df.id)) + result = await wait_task(task, task_state) + + assert result[-1] == "success" + assert await s3_file_exists(key=df.s3_key, bucket=s3_tmp_bucket, s3_client=s3_client)