From 9241adac44558959dc4228ec810195731bb9ce15 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Wed, 18 Oct 2023 16:36:48 +0300 Subject: [PATCH] Add file uploader api handler for YaDocs --- .../dl_file_uploader_api_lib/app.py | 1 + .../data_file_preparer.py | 41 +++++- .../dl_file_uploader_api_lib/schemas/files.py | 19 +++ .../dl_file_uploader_api_lib/views/files.py | 56 +++++++- .../conftest.py | 5 + .../ext/params.yml | 1 + .../ext/test_documents_api.py | 130 ++++++++++++++++++ .../req_builder.py | 20 +++ .../tasks/download_yadocs.py | 12 +- .../tasks/excel.py | 11 +- .../tasks/parse.py | 1 + .../db/test_excel.py | 7 +- 12 files changed, 293 insertions(+), 11 deletions(-) create mode 100644 lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/app.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/app.py index c7210d823..21cf0ef84 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/app.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/app.py @@ -133,6 +133,7 @@ def create_app(self, app_version: str) -> web.Application: app.router.add_route("post", "/api/v2/files", files_views.FilesView) app.router.add_route("post", "/api/v2/links", files_views.LinksView) + app.router.add_route("post", "/api/v2/documents", files_views.DocumentsView) app.router.add_route("post", "/api/v2/update_connection_data", files_views.UpdateConnectionDataView) app.router.add_route( "post", "/api/v2/update_connection_data_internal", files_views.InternalUpdateConnectionDataView diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/data_file_preparer.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/data_file_preparer.py index aadd9fc52..bf708a765 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/data_file_preparer.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/data_file_preparer.py @@ -7,7 +7,10 @@ from dl_file_uploader_lib.exc import InvalidLink from dl_file_uploader_lib.redis_model.base import RedisModelManager from dl_file_uploader_lib.redis_model.models import DataFile -from dl_file_uploader_lib.redis_model.models.models import GSheetsUserSourceProperties +from dl_file_uploader_lib.redis_model.models.models import ( + GSheetsUserSourceProperties, + YaDocsUserSourceProperties, +) LOGGER = logging.getLogger(__name__) @@ -58,3 +61,39 @@ async def gsheets_data_file_preparer( ) return df + + +async def yadocs_data_file_preparer( + oauth_token: Optional[str], + private_path: Optional[str], + public_link: Optional[str], + redis_model_manager: RedisModelManager, +) -> DataFile: + default_host: str = "disk.yandex.ru" + allowed_hosts: frozenset[str] = frozenset({default_host}) + example_url: str = "https://disk.yandex.ru/i/OyzdmFI0MUEEgA" + example_url_message: str = f"example: {example_url!r}" + if public_link is not None: + parts = urllib.parse.urlparse(public_link) + if parts.scheme not in ("http", "https"): + raise InvalidLink(f"Invalid url scheme: {parts.scheme!r}, must be 'http' or 'https'; {example_url_message}") + host = parts.hostname + if host not in allowed_hosts: + raise InvalidLink(f"Invalid host: {host!r}; should be {default_host!r}; {example_url_message}") + + path = parts.path + prefix = "/i/" + if not path.startswith(prefix): + raise InvalidLink(f"Invalid URL path prefix: {path!r}; should be {prefix!r}; {example_url_message}") + + df = DataFile( + manager=redis_model_manager, + filename="YaDocument", + status=FileProcessingStatus.in_progress, + file_type=FileType.yadocs, + user_source_properties=YaDocsUserSourceProperties( + private_path=private_path, public_link=public_link, oauth_token=oauth_token + ), + ) + + return df diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py index bc4a9a927..6a45ae905 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py @@ -22,6 +22,14 @@ def validate_authorized(data: dict) -> None: ) +def validate_docs_data(data): + if not ((data["public_link"] is None) ^ (data["private_path"] is None)): + raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified") + if data["public_link"] is None: + if data["private_path"] is None or data["oauth_token"] is None: + raise ma.ValidationError("Both path and token must be provided for private files") + + class FileLinkRequestSchema(BaseRequestSchema): type = ma.fields.Enum(FileType, required=True) url = ma.fields.String(required=True) @@ -34,6 +42,17 @@ def validate_object(self, data: dict, **kwargs: Any) -> None: validate_authorized(data) +class FileDocumentsRequestSchema(BaseRequestSchema): + # connection_id = ma.fields.String(load_default=None, allow_none=True) + private_path = ma.fields.String(load_default=None, allow_none=True) + oauth_token = ma.fields.String(load_default=None, allow_none=True) + public_link = ma.fields.String(load_default=None, allow_none=True) + + @ma.validates_schema(skip_on_field_errors=True) + def validate_object(self, data: dict, **kwargs: Any) -> None: + validate_docs_data(data) + + class FileUploadResponseSchema(ma.Schema): file_id = ma.fields.String() title = ma.fields.String() diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py index f26dae1ab..785b2495c 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py @@ -19,7 +19,10 @@ RequiredResourceCommon, ) from dl_constants.enums import FileProcessingStatus -from dl_file_uploader_api_lib.data_file_preparer import gsheets_data_file_preparer +from dl_file_uploader_api_lib.data_file_preparer import ( + gsheets_data_file_preparer, + yadocs_data_file_preparer, +) from dl_file_uploader_api_lib.schemas import files as files_schemas from dl_file_uploader_api_lib.views.base import FileUploaderBaseView from dl_file_uploader_lib.common_locks import get_update_connection_source_lock @@ -38,6 +41,7 @@ ) from dl_file_uploader_task_interface.tasks import ( DownloadGSheetTask, + DownloadYaDocsTask, ParseFileTask, ProcessExcelTask, TaskExecutionMode, @@ -162,6 +166,56 @@ async def post(self) -> web.StreamResponse: ) +class DocumentsView(FileUploaderBaseView): + REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check + + FILE_TYPE_TO_DATA_FILE_PREPARER_MAP: dict[ + FileType, Callable[[str, RedisModelManager, Optional[str]], Awaitable[DataFile]] + ] = { + FileType.yadocs: yadocs_data_file_preparer, + } + + async def post(self) -> web.StreamResponse: + req_data = await self._load_post_request_schema_data(files_schemas.FileDocumentsRequestSchema) + + file_type = FileType.yadocs + + rmm = self.dl_request.get_redis_model_manager() + + oauth_token: Optional[str] = req_data["oauth_token"] + public_link: Optional[str] = req_data["public_link"] + private_path: Optional[str] = req_data["private_path"] + + df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type]( + oauth_token=oauth_token, + private_path=private_path, + public_link=public_link, + redis_model_manager=rmm, + ) + + LOGGER.info(f"Data file id: {df.id}") + await df.save() + + task_processor = self.dl_request.get_task_processor() + await task_processor.schedule(DownloadYaDocsTask(file_id=df.id)) + LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}") + + df = await DataFile.get(manager=rmm, obj_id=df.id) + if df.status == FileProcessingStatus.failed: + return web.json_response( + files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}), + status=HTTPStatus.OK, + ) + + await task_processor.schedule(ProcessExcelTask(file_id=df.id)) + LOGGER.info(f"Scheduled ProcessExcelTask for file_id {df.id}") + + return web.json_response( + files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}), + status=HTTPStatus.CREATED, + ) + + class FileStatusView(FileUploaderBaseView): async def get(self) -> web.StreamResponse: req_data = files_schemas.FileStatusRequestSchema().load(self.request.match_info) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/conftest.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/conftest.py index 961e1a024..24c1afa2c 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/conftest.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/conftest.py @@ -397,3 +397,8 @@ def reader_app(loop, secure_reader): loop.run_until_complete(runner.setup()) site = aiohttp.web.UnixSite(runner, path=secure_reader.SOCKET) return loop.run_until_complete(site.start()) + + +@pytest.fixture(scope="session") +def ya_docs_oauth_token(env_param_getter): + return env_param_getter.get_str_value("YA_DOCS_API_KEY") diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/params.yml b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/params.yml index 4c9de51d8..ebbd3fda7 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/params.yml +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/params.yml @@ -1,2 +1,3 @@ params: GOOGLE_API_KEY: {getter: $osenv, key: GOOGLE_API_KEY} + YA_DOCS_API_KEY: {getter: $osenv, key: YA_DOCS_API_KEY} diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py new file mode 100644 index 000000000..ff87edcdc --- /dev/null +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py @@ -0,0 +1,130 @@ +import asyncio +import logging + +import pytest + +from dl_constants.enums import FileProcessingStatus +from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder +from dl_file_uploader_lib.redis_model.models import DataFile, YaDocsUserSourceProperties, GSheetsUserSourceProperties + + +LOGGER = logging.getLogger(__name__) + + +@pytest.mark.asyncio +async def test_yadocuments_public_file( + fu_client, + s3_client, + s3_tmp_bucket, + redis_model_manager, + reader_app, +): + public_link = "https://disk.yandex.ru/i/ZgabI6zyoYn8IQ" + resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link)) + assert resp.status == 201 + file_id = resp.json["file_id"] + assert file_id + + await asyncio.sleep(5) + + rmm = redis_model_manager + df = await DataFile.get(manager=rmm, obj_id=file_id) + assert isinstance(df, DataFile) + assert isinstance(df.user_source_properties, YaDocsUserSourceProperties) + assert df.status == FileProcessingStatus.ready + assert df.error is None + assert df.user_source_properties.public_link == public_link + assert df.filename == "test public table.xlsx" + for sheet_src in df.sources: + assert sheet_src.status == FileProcessingStatus.ready + assert sheet_src.error is None + + +@pytest.mark.asyncio +async def test_yadocuments_private_file( + fu_client, + s3_client, + s3_tmp_bucket, + redis_model_manager, + reader_app, + ya_docs_oauth_token, +): + private_path = "test private table.xlsx" + resp = await fu_client.make_request( + ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token) + ) + assert resp.status == 201 + file_id = resp.json["file_id"] + assert file_id + + await asyncio.sleep(5) + + rmm = redis_model_manager + df = await DataFile.get(manager=rmm, obj_id=file_id) + assert isinstance(df, DataFile) + assert isinstance(df.user_source_properties, YaDocsUserSourceProperties) + assert df.status == FileProcessingStatus.ready + assert df.error is None + assert df.user_source_properties.private_path == private_path + assert df.filename == "test private table.xlsx" + for sheet_src in df.sources: + assert sheet_src.status == FileProcessingStatus.ready + assert sheet_src.error is None + + +@pytest.mark.asyncio +async def test_yadocuments_invalid_link( + fu_client, + s3_client, + s3_tmp_bucket, + redis_model_manager, + reader_app, +): + public_link_invalid = "https://disk.yandeks.ru/i/ZgabI6zyoYn8IQ" + resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_invalid, require_ok=False)) + assert resp.status == 400 + assert resp.json["code"] == "ERR.FILE.INVALID_LINK" + + +@pytest.mark.asyncio +async def test_yadocuments_not_found( + fu_client, + s3_client, + s3_tmp_bucket, + redis_model_manager, + reader_app, +): + public_link_nonexisten = "https://disk.yandex.ru/i/fffffffff" + resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_nonexisten)) + assert resp.status == 200 + file_id = resp.json["file_id"] + + await asyncio.sleep(2) + + resp = await fu_client.make_request(ReqBuilder.file_status(file_id)) + assert resp.status == 200 + assert resp.json["file_id"] == file_id + assert resp.json["status"] == "failed" + assert resp.json["error"]["code"] == "ERR.FILE.NOT_FOUND" + + +@pytest.mark.asyncio +async def test_documents_unsupported( + fu_client, + s3_client, + s3_tmp_bucket, + redis_model_manager, + reader_app, +): + public_link_to_docs = "https://disk.yandex.ru/i/ros0GDegLEpyew" + resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_to_docs)) + assert resp.status == 200 + file_id = resp.json["file_id"] + + await asyncio.sleep(2) + + resp = await fu_client.make_request(ReqBuilder.file_status(file_id)) + assert resp.status == 200 + assert resp.json["file_id"] == file_id + assert resp.json["status"] == "failed" + assert resp.json["error"]["code"] == "ERR.FILE.UNSUPPORTED_DOCUMENT" diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py index e7df337ba..6f300d387 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py @@ -75,6 +75,26 @@ def upload_gsheet( require_ok=require_ok, ) + @classmethod + def upload_documents( + cls, + public_link: Optional[str] = None, + private_path: Optional[str] = None, + oauth_token: Optional[str] = None, + *, + require_ok: bool = True, + ) -> Req: + return Req( + method="post", + url="/api/v2/documents", + data_json={ + "public_link": public_link, + "private_path": private_path, + "oauth_token": oauth_token, + }, + require_ok=require_ok, + ) + @classmethod def file_status(cls, file_id: str) -> Req: return Req( diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index 2b39ecedd..487fba1d4 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -1,6 +1,5 @@ from __future__ import annotations - import logging from typing import Optional @@ -8,20 +7,21 @@ import attr from dl_constants.enums import FileProcessingStatus - from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.raw_bytes import ( RawBytesAsyncDataStream, S3RawFileAsyncDataSink, ) - from dl_file_uploader_lib.redis_model.base import RedisModelManager from dl_file_uploader_lib.redis_model.models import ( DataFile, FileProcessingError, YaDocsUserSourceProperties, ) -from dl_file_uploader_lib.yadocs_client import YaDocsClient, yadocs_error_to_file_uploader_exception +from dl_file_uploader_lib.yadocs_client import ( + YaDocsClient, + yadocs_error_to_file_uploader_exception, +) from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface from dl_task_processor.task import ( @@ -85,6 +85,10 @@ async def run(self) -> TaskResult: return Success() dfile.filename = spreadsheet_meta["name"] + xslx_suffix = ".xlsx" + if not dfile.filename.endswith(xslx_suffix): + raise exc.UnsupportedDocument + s3 = self._ctx.s3_service async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py index 826751273..d37482e22 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py @@ -108,9 +108,14 @@ async def run(self) -> TaskResult: sheet_data_source.error = FileProcessingError.from_exception(exc.EmptyDocument()) sheet_data_source.status = FileProcessingStatus.failed else: - has_header, raw_schema, raw_schema_header, raw_schema_body = guess_header_and_schema_excel( - sheetdata - ) + try: + has_header, raw_schema, raw_schema_header, raw_schema_body = guess_header_and_schema_excel( + sheetdata + ) + except Exception as ex: + sheet_data_source.status = FileProcessingStatus.failed + exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.ParseFailed() + sheet_data_source.error = FileProcessingError.from_exception(exc_to_save) sheet_settings = None if sheet_data_source.is_applicable: diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/parse.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/parse.py index 75a33c1f1..b638ef8c4 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/parse.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/parse.py @@ -44,6 +44,7 @@ class ParseFileTask(BaseExecutorTask[task_interface.ParseFileTask, FileUploaderT FileType.csv: CSVFileParser, FileType.gsheets: SpreadsheetFileParser, FileType.xlsx: SpreadsheetFileParser, + FileType.yadocs: SpreadsheetFileParser, } async def run(self) -> TaskResult: diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_excel.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_excel.py index 77e90701c..9ff055dfb 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_excel.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_excel.py @@ -176,7 +176,10 @@ async def test_parse_excel_with_one_row_task( result = await wait_task(task, task_state) await sleep(60) - assert result[-1] == "failed" + assert result[-1] == "success" df = await DataFile.get(manager=rmm, obj_id=uploaded_excel_id) - assert df.status == FileProcessingStatus.failed + assert df.status == FileProcessingStatus.ready + assert df.id == uploaded_excel_id + for src in df.sources: + assert src.status == FileProcessingStatus.failed