From ab5a41d190c2f89c996f4a012705098d0880d4c0 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Thu, 12 Oct 2023 14:11:08 +0300 Subject: [PATCH] Fix issues --- .../schemas/sources.py | 2 +- .../dl_file_uploader_lib/enums.py | 2 +- .../redis_model/models/__init__.py | 4 +- .../redis_model/models/models.py | 9 +- .../redis_model/models/storage_schemas.py | 8 +- .../dl_file_uploader_lib/yadocs_client.py | 80 +++++++++++ .../yadocuments_client.py | 43 ------ .../dl_file_uploader_task_interface/tasks.py | 4 +- .../tasks/__init__.py | 8 +- .../{download.py => download_gsheets.py} | 84 ------------ .../tasks/download_yadocs.py | 126 ++++++++++++++++++ .../ext/test_yado\321\201s.py" | 10 +- 12 files changed, 230 insertions(+), 150 deletions(-) create mode 100644 lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py delete mode 100644 lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py rename lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/{download.py => download_gsheets.py} (81%) create mode 100644 lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py rename lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py => "lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" (68%) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py index 1ce595a29..bd672d960 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py @@ -107,7 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema): FileType.csv.name: SourceInfoSchemaBase, FileType.gsheets.name: SourceInfoSchemaGSheets, FileType.xlsx.name: SourceInfoSchemaBase, - FileType.yadocuments.name: SourceInfoSchemaBase, + FileType.yadocs.name: SourceInfoSchemaBase, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py index 432f6987d..88bfa3d54 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py @@ -18,7 +18,7 @@ class FileType(Enum): csv = "csv" gsheets = "gsheets" xlsx = "xlsx" - yadocuments = "yadocuments" + yadocs = "yadocs" class CSVEncoding(Enum): diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py index d07e8d0c7..7922a09f9 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py @@ -19,7 +19,7 @@ RenameTenantStatusModel, SourceNotFoundError, SpreadsheetFileSourceSettings, - YaDocumentsUserSourceProperties, + YaDocsUserSourceProperties, ) from .storage_schemas import ( DataFileSchema, @@ -47,7 +47,7 @@ "EmptySourcesError", "RenameTenantStatusModel", "PreviewSet", - "YaDocumentsUserSourceProperties", + "YaDocsUserSourceProperties", ) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py index 10d729382..6f2711d45 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py @@ -103,13 +103,16 @@ def get_secret_keys(self) -> set[DataKey]: @attr.s(init=True, kw_only=True) -class YaDocumentsUserSourceProperties(UserSourceProperties): - file_type: FileType = attr.ib(default=FileType.yadocuments) +class YaDocsUserSourceProperties(UserSourceProperties): + file_type: FileType = attr.ib(default=FileType.yadocs) private_path: Optional[str] = attr.ib(default=None) public_link: Optional[str] = attr.ib(default=None) - oauth_token: Optional[str] = attr.ib(default=None) + oauth_token: Optional[str] = attr.ib(default=None, repr=False) + + def get_secret_keys(self) -> set[DataKey]: + return {DataKey(parts=("oauth_token",))} @attr.s(init=True, kw_only=True) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py index 1d80265f6..23f2031ed 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py @@ -35,7 +35,7 @@ GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, RenameTenantStatusModel, - YaDocumentsUserSourceProperties, + YaDocsUserSourceProperties, ) @@ -152,9 +152,9 @@ class Meta(BaseSchema.Meta): spreadsheet_id = fields.String() -class YaDocumentsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema): +class YaDocsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema): class Meta(BaseSchema.Meta): - target = YaDocumentsUserSourceProperties + target = YaDocsUserSourceProperties private_path = fields.String(allow_none=True) public_link = fields.String(allow_none=True) @@ -164,7 +164,7 @@ class Meta(BaseSchema.Meta): class UserSourcePropertiesSchema(FileTypeOneOfSchema): type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = { FileType.gsheets.name: GSheetsUserSourcePropertiesSchema, - FileType.yadocuments.name: YaDocumentsUserSourcePropertiesSchema, + FileType.yadocs.name: YaDocsUserSourcePropertiesSchema, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py new file mode 100644 index 000000000..1d1690c72 --- /dev/null +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py @@ -0,0 +1,80 @@ +from typing import Any + +import aiohttp +from aiohttp.client import ( + ClientResponse, + ClientSession, +) + +from dl_file_uploader_lib import exc as file_upl_exc + + +def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict): + if status_code == 401: + err_cls = file_upl_exc.PermissionDenied + elif status_code == 404: + err_cls = file_upl_exc.DocumentNotFound + elif status_code == 400: + err_cls = file_upl_exc.UnsupportedDocument + elif status_code >= 500: + err_cls = file_upl_exc.RemoteServerError + else: + err_cls = file_upl_exc.DLFileUploaderBaseError + + return err_cls( + details=resp_info, + ) + + +class YaDocsClient: + headers: dict[str, Any] = { + "Content-Type": "application/json", + "Accept": "application/json", + } + hostname: str = "https://cloud-api.yandex.net/v1/disk" + + def __init__(self, session: ClientSession): + self.session = session + + async def get_spreadsheet_public_ref(self, link: str) -> str: + resp = await self.session.get( + f"{self.hostname}/public/resources/download/?public_key={link}", + headers=self.headers, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return (await resp.json())["href"] + + async def get_spreadsheet_public_meta(self, link: str) -> ClientResponse: + resp = await self.session.get( + f"{self.hostname}/public/resources/?public_key={link}", + headers=self.headers, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return await resp.json() + + async def get_spreadsheet_private_ref(self, path: str, token: str) -> str: + headers_with_token = self._create_headers_with_token(token) + resp = await self.session.get( + f"{self.hostname}/resources/download/?path={path}", + headers=headers_with_token, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return (await resp.json())["href"] + + async def get_spreadsheet_private_meta(self, path: str, token: str) -> ClientResponse: + headers_with_token = self._create_headers_with_token(token) + resp = await self.session.get( + f"{self.hostname}/resources/?path={path}", + headers=headers_with_token, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return await resp.json() + + def _create_headers_with_token(self, token: str) -> dict[str, Any]: + headers_with_token = self.headers.copy() + headers_with_token.update({"Authorization": "OAuth " + token}) + return headers_with_token diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py deleted file mode 100644 index 57e869708..000000000 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py +++ /dev/null @@ -1,43 +0,0 @@ -import requests - - -class YaDocumentsClient: - headers = { - "Content-Type": "application/json", - "Accept": "application/json", - } - - def get_spreadsheet_public(self, link): - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/public/resources/download/?public_key={link}", - headers=self.headers, - ) - return resp.json()["href"] - - def get_spreadsheet_public_meta(self, link): - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/public/resources/?public_key={link}", - headers=self.headers, - ) - return resp.json() - - def get_spreadsheet_private(self, path, token): - headers_with_token = self._create_headers_with_token(token) - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/resources/download/?path={path}", - headers=headers_with_token, - ) - return resp.json()["href"] - - def get_spreadsheet_private_meta(self, path, token): - headers_with_token = self._create_headers_with_token(token) - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/resources/?path={path}", - headers=headers_with_token, - ) - return resp.json() - - def _create_headers_with_token(self, token: str): - headers_with_token = self.headers.copy() - headers_with_token.update({"Authorization": "OAuth " + token}) - return headers_with_token diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index 772e19461..682df4904 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -29,8 +29,8 @@ class DownloadGSheetTask(BaseTaskMeta): @attr.s -class DownloadYaDocumentsTask(BaseTaskMeta): - name = TaskName("download_yadocuments") +class DownloadYaDocsTask(BaseTaskMeta): + name = TaskName("download_yadocs") file_id: str = attr.ib() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py index 19da19f71..f10c1d470 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py @@ -7,10 +7,8 @@ RenameTenantFilesTask, ) from .delete import DeleteFileTask -from .download import ( - DownloadGSheetTask, - DownloadYaDocumentsTask, -) +from .download_gsheets import DownloadGSheetTask +from .download_yadocs import DownloadYaDocsTask from .excel import ProcessExcelTask from .parse import ParseFileTask from .save import SaveSourceTask @@ -19,7 +17,7 @@ REGISTRY: TaskRegistry = TaskRegistry.create( [ DownloadGSheetTask, - DownloadYaDocumentsTask, + DownloadYaDocsTask, ParseFileTask, ProcessExcelTask, SaveSourceTask, diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py similarity index 81% rename from lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py rename to lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py index 2c9093fd2..8750015e8 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py @@ -11,7 +11,6 @@ ) import aiogoogle -import aiohttp import attr from dl_constants.enums import ( @@ -27,10 +26,6 @@ from dl_core.us_manager.us_manager_async import AsyncUSManager from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.json_each_row import S3JsonEachRowUntypedFileAsyncDataSink -from dl_file_uploader_lib.data_sink.raw_bytes import ( - RawBytesAsyncDataStream, - S3RawFileAsyncDataSink, -) from dl_file_uploader_lib.gsheets_client import ( GSheetsClient, GSheetsOAuth2, @@ -44,9 +39,7 @@ GSheetsFileSourceSettings, GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, - YaDocumentsUserSourceProperties, ) -from dl_file_uploader_lib.yadocuments_client import YaDocumentsClient from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface from dl_file_uploader_task_interface.tasks import TaskExecutionMode @@ -347,80 +340,3 @@ async def run(self) -> TaskResult: finally: await usm.close() return Success() - - -@attr.s -class DownloadYaDocumentsTask(BaseExecutorTask[task_interface.DownloadYaDocumentsTask, FileUploaderTaskContext]): - cls_meta = task_interface.DownloadYaDocumentsTask - - async def run(self) -> TaskResult: - dfile: Optional[DataFile] = None - redis = self._ctx.redis_service.get_redis() - - try: - rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) - dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) - assert dfile is not None - - assert isinstance(dfile.user_source_properties, YaDocumentsUserSourceProperties) - - yadocs_client = YaDocumentsClient() - - if dfile.user_source_properties.public_link is not None: - spreadsheet_ref = yadocs_client.get_spreadsheet_public(link=dfile.user_source_properties.public_link) - spreadsheet_meta = yadocs_client.get_spreadsheet_public_meta( - link=dfile.user_source_properties.public_link - ) - - elif ( - dfile.user_source_properties.private_path is not None - and dfile.user_source_properties.oauth_token is not None - ): - spreadsheet_ref = yadocs_client.get_spreadsheet_private( - path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token - ) - spreadsheet_meta = yadocs_client.get_spreadsheet_private_meta( - path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token - ) - else: - raise exc.DLFileUploaderBaseError() - dfile.filename = spreadsheet_meta["name"] - - s3 = self._ctx.s3_service - - async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): - async with aiohttp.ClientSession() as session: - async with session.get(spreadsheet_ref) as response: - assert response.status == 200 - while True: - chunk = await response.content.read(chunk_size) - if chunk: - LOGGER.debug(f"Received chunk of {len(chunk)} bytes.") - yield chunk - else: - LOGGER.info("Empty chunk received.") - break - - data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) - async with S3RawFileAsyncDataSink( - s3=s3.client, - s3_key=dfile.s3_key, - bucket_name=s3.tmp_bucket_name, - ) as data_sink: - await data_sink.dump_data_stream(data_stream) - - await dfile.save() - LOGGER.info(f'Uploaded file "{dfile.filename}".') - - except Exception as ex: - LOGGER.exception(ex) - if dfile is None: - return Retry(attempts=3) - else: - dfile.status = FileProcessingStatus.failed - exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed() - dfile.error = FileProcessingError.from_exception(exc_to_save) - await dfile.save() - - return Fail() - return Success() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py new file mode 100644 index 000000000..2b39ecedd --- /dev/null +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -0,0 +1,126 @@ +from __future__ import annotations + + +import logging +from typing import Optional + +import aiohttp +import attr + +from dl_constants.enums import FileProcessingStatus + +from dl_file_uploader_lib import exc +from dl_file_uploader_lib.data_sink.raw_bytes import ( + RawBytesAsyncDataStream, + S3RawFileAsyncDataSink, +) + +from dl_file_uploader_lib.redis_model.base import RedisModelManager +from dl_file_uploader_lib.redis_model.models import ( + DataFile, + FileProcessingError, + YaDocsUserSourceProperties, +) +from dl_file_uploader_lib.yadocs_client import YaDocsClient, yadocs_error_to_file_uploader_exception +from dl_file_uploader_task_interface.context import FileUploaderTaskContext +import dl_file_uploader_task_interface.tasks as task_interface +from dl_task_processor.task import ( + BaseExecutorTask, + Fail, + Retry, + Success, + TaskResult, +) + + +LOGGER = logging.getLogger(__name__) + + +@attr.s +class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]): + cls_meta = task_interface.DownloadYaDocsTask + + async def run(self) -> TaskResult: + dfile: Optional[DataFile] = None + redis = self._ctx.redis_service.get_redis() + + try: + rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) + dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) + assert dfile is not None + + assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties) + + async with aiohttp.ClientSession() as session: + yadocs_client = YaDocsClient(session) + try: + if dfile.user_source_properties.public_link is not None: + spreadsheet_ref = await yadocs_client.get_spreadsheet_public_ref( + link=dfile.user_source_properties.public_link + ) + spreadsheet_meta = await yadocs_client.get_spreadsheet_public_meta( + link=dfile.user_source_properties.public_link + ) + + elif ( + dfile.user_source_properties.private_path is not None + and dfile.user_source_properties.oauth_token is not None + ): + spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref( + path=dfile.user_source_properties.private_path, + token=dfile.user_source_properties.oauth_token, + ) + spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta( + path=dfile.user_source_properties.private_path, + token=dfile.user_source_properties.oauth_token, + ) + else: + raise exc.DLFileUploaderBaseError() + except exc.DLFileUploaderBaseError as e: + LOGGER.exception(e) + download_error = FileProcessingError.from_exception(e) + dfile.status = FileProcessingStatus.failed + dfile.error = download_error + await dfile.save() + return Success() + + dfile.filename = spreadsheet_meta["name"] + s3 = self._ctx.s3_service + + async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): + async with aiohttp.ClientSession() as session: + async with session.get(spreadsheet_ref) as resp: + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + while True: + chunk = await resp.content.read(chunk_size) + if chunk: + LOGGER.debug(f"Received chunk of {len(chunk)} bytes.") + yield chunk + else: + LOGGER.info("Empty chunk received.") + break + + data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) + async with S3RawFileAsyncDataSink( + s3=s3.client, + s3_key=dfile.s3_key, + bucket_name=s3.tmp_bucket_name, + ) as data_sink: + await data_sink.dump_data_stream(data_stream) + + await dfile.save() + LOGGER.info(f'Uploaded file "{dfile.filename}".') + + except Exception as ex: + LOGGER.exception(ex) + if dfile is None: + return Retry(attempts=3) + else: + dfile.status = FileProcessingStatus.failed + exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed() + dfile.error = FileProcessingError.from_exception(exc_to_save) + await dfile.save() + + return Fail() + return Success() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" similarity index 68% rename from lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py rename to "lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" index dfd17e417..3a0ff9e18 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py +++ "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" @@ -4,9 +4,9 @@ from dl_file_uploader_lib.enums import FileType from dl_file_uploader_lib.redis_model.models import ( DataFile, - YaDocumentsUserSourceProperties, + YaDocsUserSourceProperties, ) -from dl_file_uploader_task_interface.tasks import DownloadYaDocumentsTask +from dl_file_uploader_task_interface.tasks import DownloadYaDocsTask from dl_task_processor.state import wait_task from dl_testing.s3_utils import s3_file_exists @@ -21,14 +21,14 @@ async def test_download_yadocs_task( ): df = DataFile( filename="", - file_type=FileType.yadocuments, + file_type=FileType.yadocs, manager=redis_model_manager, status=FileProcessingStatus.in_progress, - user_source_properties=YaDocumentsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), + user_source_properties=YaDocsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), ) await df.save() - task = await task_processor_client.schedule(DownloadYaDocumentsTask(file_id=df.id)) + task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id)) result = await wait_task(task, task_state) assert result[-1] == "success"