diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py index 445b64c20..bd672d960 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py @@ -107,6 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema): FileType.csv.name: SourceInfoSchemaBase, FileType.gsheets.name: SourceInfoSchemaGSheets, FileType.xlsx.name: SourceInfoSchemaBase, + FileType.yadocs.name: SourceInfoSchemaBase, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py index 3d3627557..88bfa3d54 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py @@ -18,6 +18,7 @@ class FileType(Enum): csv = "csv" gsheets = "gsheets" xlsx = "xlsx" + yadocs = "yadocs" class CSVEncoding(Enum): diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py index b3bbd3d14..7922a09f9 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py @@ -19,6 +19,7 @@ RenameTenantStatusModel, SourceNotFoundError, SpreadsheetFileSourceSettings, + YaDocsUserSourceProperties, ) from .storage_schemas import ( DataFileSchema, @@ -46,6 +47,7 @@ "EmptySourcesError", "RenameTenantStatusModel", "PreviewSet", + "YaDocsUserSourceProperties", ) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py index 85abda58a..6f2711d45 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py @@ -102,6 +102,19 @@ def get_secret_keys(self) -> set[DataKey]: return {DataKey(parts=("refresh_token",))} +@attr.s(init=True, kw_only=True) +class YaDocsUserSourceProperties(UserSourceProperties): + file_type: FileType = attr.ib(default=FileType.yadocs) + + private_path: Optional[str] = attr.ib(default=None) + public_link: Optional[str] = attr.ib(default=None) + + oauth_token: Optional[str] = attr.ib(default=None, repr=False) + + def get_secret_keys(self) -> set[DataKey]: + return {DataKey(parts=("oauth_token",))} + + @attr.s(init=True, kw_only=True) class UserSourceDataSourceProperties: file_type: FileType = attr.ib() diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py index b138dac9d..23f2031ed 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py @@ -35,6 +35,7 @@ GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, RenameTenantStatusModel, + YaDocsUserSourceProperties, ) @@ -151,9 +152,19 @@ class Meta(BaseSchema.Meta): spreadsheet_id = fields.String() +class YaDocsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema): + class Meta(BaseSchema.Meta): + target = YaDocsUserSourceProperties + + private_path = fields.String(allow_none=True) + public_link = fields.String(allow_none=True) + oauth_token = fields.String(allow_none=True) + + class UserSourcePropertiesSchema(FileTypeOneOfSchema): type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = { FileType.gsheets.name: GSheetsUserSourcePropertiesSchema, + FileType.yadocs.name: YaDocsUserSourcePropertiesSchema, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py new file mode 100644 index 000000000..435e03908 --- /dev/null +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from typing import ( + Any, + Type, +) + +from aiohttp.client import ClientSession + +from dl_file_uploader_lib import exc as file_upl_exc + + +def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict) -> file_upl_exc.DLFileUploaderBaseError: + err_cls: Type[file_upl_exc.DLFileUploaderBaseError] + + if status_code == 401: + err_cls = file_upl_exc.PermissionDenied + elif status_code == 404: + err_cls = file_upl_exc.DocumentNotFound + elif status_code == 400: + err_cls = file_upl_exc.UnsupportedDocument + elif status_code >= 500: + err_cls = file_upl_exc.RemoteServerError + else: + err_cls = file_upl_exc.DLFileUploaderBaseError + + return err_cls( + details=resp_info, + ) + + +class YaDocsClient: + headers: dict[str, Any] = { + "Content-Type": "application/json", + "Accept": "application/json", + } + hostname: str = "https://cloud-api.yandex.net/v1/disk" + + def __init__(self, session: ClientSession): + self.session = session + + async def get_spreadsheet_public_ref(self, link: str) -> str: + resp = await self.session.get( + f"{self.hostname}/public/resources/download/?public_key={link}", + headers=self.headers, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return (await resp.json())["href"] + + async def get_spreadsheet_public_meta(self, link: str) -> dict[str, Any]: + resp = await self.session.get( + f"{self.hostname}/public/resources/?public_key={link}", + headers=self.headers, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return await resp.json() + + async def get_spreadsheet_private_ref(self, path: str, token: str) -> str: + headers_with_token = self._create_headers_with_token(token) + resp = await self.session.get( + f"{self.hostname}/resources/download/?path={path}", + headers=headers_with_token, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return (await resp.json())["href"] + + async def get_spreadsheet_private_meta(self, path: str, token: str) -> dict[str, Any]: + headers_with_token = self._create_headers_with_token(token) + resp = await self.session.get( + f"{self.hostname}/resources/?path={path}", + headers=headers_with_token, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return await resp.json() + + def _create_headers_with_token(self, token: str) -> dict[str, Any]: + headers_with_token = self.headers.copy() + headers_with_token.update({"Authorization": "OAuth " + token}) + return headers_with_token diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index ee8ff92be..682df4904 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -28,6 +28,13 @@ class DownloadGSheetTask(BaseTaskMeta): exec_mode: TaskExecutionMode = attr.ib(default=TaskExecutionMode.BASIC) +@attr.s +class DownloadYaDocsTask(BaseTaskMeta): + name = TaskName("download_yadocs") + + file_id: str = attr.ib() + + @attr.s class ParseFileTask(BaseTaskMeta): name = TaskName("parse_file") diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py index c3c7b5b4d..f10c1d470 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py @@ -7,7 +7,8 @@ RenameTenantFilesTask, ) from .delete import DeleteFileTask -from .download import DownloadGSheetTask +from .download_gsheets import DownloadGSheetTask +from .download_yadocs import DownloadYaDocsTask from .excel import ProcessExcelTask from .parse import ParseFileTask from .save import SaveSourceTask @@ -16,6 +17,7 @@ REGISTRY: TaskRegistry = TaskRegistry.create( [ DownloadGSheetTask, + DownloadYaDocsTask, ParseFileTask, ProcessExcelTask, SaveSourceTask, diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py similarity index 100% rename from lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py rename to lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py new file mode 100644 index 000000000..f3fe78b2b --- /dev/null +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +import logging +from typing import ( + AsyncGenerator, + Optional, +) + +import aiohttp +import attr + +from dl_constants.enums import FileProcessingStatus +from dl_file_uploader_lib import exc +from dl_file_uploader_lib.data_sink.raw_bytes import ( + RawBytesAsyncDataStream, + S3RawFileAsyncDataSink, +) +from dl_file_uploader_lib.redis_model.base import RedisModelManager +from dl_file_uploader_lib.redis_model.models import ( + DataFile, + FileProcessingError, + YaDocsUserSourceProperties, +) +from dl_file_uploader_lib.yadocs_client import ( + YaDocsClient, + yadocs_error_to_file_uploader_exception, +) +from dl_file_uploader_task_interface.context import FileUploaderTaskContext +import dl_file_uploader_task_interface.tasks as task_interface +from dl_task_processor.task import ( + BaseExecutorTask, + Fail, + Retry, + Success, + TaskResult, +) + + +LOGGER = logging.getLogger(__name__) + + +@attr.s +class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]): + cls_meta = task_interface.DownloadYaDocsTask + + async def run(self) -> TaskResult: + dfile: Optional[DataFile] = None + redis = self._ctx.redis_service.get_redis() + + try: + rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) + dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) + assert dfile is not None + + assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties) + + async with aiohttp.ClientSession() as session: + yadocs_client = YaDocsClient(session) + try: + if dfile.user_source_properties.public_link is not None: + spreadsheet_ref = await yadocs_client.get_spreadsheet_public_ref( + link=dfile.user_source_properties.public_link + ) + spreadsheet_meta = await yadocs_client.get_spreadsheet_public_meta( + link=dfile.user_source_properties.public_link + ) + + elif ( + dfile.user_source_properties.private_path is not None + and dfile.user_source_properties.oauth_token is not None + ): + spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref( + path=dfile.user_source_properties.private_path, + token=dfile.user_source_properties.oauth_token, + ) + spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta( + path=dfile.user_source_properties.private_path, + token=dfile.user_source_properties.oauth_token, + ) + else: + raise exc.DLFileUploaderBaseError() + except exc.DLFileUploaderBaseError as e: + LOGGER.exception(e) + download_error = FileProcessingError.from_exception(e) + dfile.status = FileProcessingStatus.failed + dfile.error = download_error + await dfile.save() + return Success() + + dfile.filename = spreadsheet_meta["name"] + s3 = self._ctx.s3_service + + async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]: + async with aiohttp.ClientSession() as session: + async with session.get(spreadsheet_ref) as resp: + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + while True: + chunk = await resp.content.read(chunk_size) + if chunk: + LOGGER.debug(f"Received chunk of {len(chunk)} bytes.") + yield chunk + else: + LOGGER.info("Empty chunk received.") + break + + data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) + async with S3RawFileAsyncDataSink( + s3=s3.client, + s3_key=dfile.s3_key, + bucket_name=s3.tmp_bucket_name, + ) as data_sink: + await data_sink.dump_data_stream(data_stream) + + await dfile.save() + LOGGER.info(f'Uploaded file "{dfile.filename}".') + + except Exception as ex: + LOGGER.exception(ex) + if dfile is None: + return Retry(attempts=3) + else: + dfile.status = FileProcessingStatus.failed + exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed() + dfile.error = FileProcessingError.from_exception(exc_to_save) + await dfile.save() + + return Fail() + return Success() diff --git "a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" new file mode 100644 index 000000000..3a0ff9e18 --- /dev/null +++ "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" @@ -0,0 +1,35 @@ +import pytest + +from dl_constants.enums import FileProcessingStatus +from dl_file_uploader_lib.enums import FileType +from dl_file_uploader_lib.redis_model.models import ( + DataFile, + YaDocsUserSourceProperties, +) +from dl_file_uploader_task_interface.tasks import DownloadYaDocsTask +from dl_task_processor.state import wait_task +from dl_testing.s3_utils import s3_file_exists + + +@pytest.mark.asyncio +async def test_download_yadocs_task( + task_processor_client, + task_state, + s3_client, + redis_model_manager, + s3_tmp_bucket, +): + df = DataFile( + filename="", + file_type=FileType.yadocs, + manager=redis_model_manager, + status=FileProcessingStatus.in_progress, + user_source_properties=YaDocsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), + ) + await df.save() + + task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id)) + result = await wait_task(task, task_state) + + assert result[-1] == "success" + assert await s3_file_exists(key=df.s3_key, bucket=s3_tmp_bucket, s3_client=s3_client)