From c2ab161a35d19f3601a3d6b790b9233d456a17a3 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Fri, 10 Nov 2023 13:51:56 +0300 Subject: [PATCH 1/2] Get token from saved connection when required --- .../dl_file_uploader_api_lib/schemas/files.py | 8 ++-- .../dl_file_uploader_api_lib/views/files.py | 4 +- .../dl_file_uploader_task_interface/tasks.py | 2 + .../tasks/download_yadocs.py | 38 ++++++++++++++++++- 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py index 6a45ae905..4be3f8a5b 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py @@ -26,8 +26,10 @@ def validate_docs_data(data): if not ((data["public_link"] is None) ^ (data["private_path"] is None)): raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified") if data["public_link"] is None: - if data["private_path"] is None or data["oauth_token"] is None: - raise ma.ValidationError("Both path and token must be provided for private files") + if data["private_path"] is None: + raise ma.ValidationError("'private_path' must be provided for private files") + elif data["oauth_token"] is None and data["connection_id"] is None: + raise ma.ValidationError("Expected `oauth_token` or `connection_id` to be specified") class FileLinkRequestSchema(BaseRequestSchema): @@ -43,7 +45,7 @@ def validate_object(self, data: dict, **kwargs: Any) -> None: class FileDocumentsRequestSchema(BaseRequestSchema): - # connection_id = ma.fields.String(load_default=None, allow_none=True) + connection_id = ma.fields.String(load_default=None, allow_none=True) private_path = ma.fields.String(load_default=None, allow_none=True) oauth_token = ma.fields.String(load_default=None, allow_none=True) public_link = ma.fields.String(load_default=None, allow_none=True) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py index da845e1c8..c149e252c 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py @@ -185,6 +185,8 @@ async def post(self) -> web.StreamResponse: oauth_token: Optional[str] = req_data["oauth_token"] public_link: Optional[str] = req_data["public_link"] private_path: Optional[str] = req_data["private_path"] + connection_id: Optional[str] = req_data["connection_id"] + authorized: bool = req_data["authorized"] df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type]( oauth_token=oauth_token, @@ -198,7 +200,7 @@ async def post(self) -> web.StreamResponse: task_processor = self.dl_request.get_task_processor() await task_processor.schedule(DownloadYaDocsTask(file_id=df.id)) - LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}") + LOGGER.info(f"Scheduled DownloadYaDocsTask for file_id {df.id}") return web.json_response( files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}), diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index 00ba5d634..59d3b7207 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -33,6 +33,8 @@ class DownloadYaDocsTask(BaseTaskMeta): name = TaskName("download_yadocs") file_id: str = attr.ib() + authorized: bool = attr.ib() + connection_id: Optional[str] = attr.ib(default=None) @attr.s diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index fc2d9f8e9..0b1222c21 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -10,6 +10,7 @@ import attr from dl_constants.enums import FileProcessingStatus +from dl_core.us_manager.us_manager_async import AsyncUSManager from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.raw_bytes import ( RawBytesAsyncDataStream, @@ -35,10 +36,34 @@ TaskResult, ) +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + LOGGER = logging.getLogger(__name__) +class NoToken(Exception): + pass + + +def _get_yadocs_oauth_token( + dfile_token: Optional[str], + conn_id: Optional[str], + usm: AsyncUSManager, +) -> Optional[str]: + if dfile_token is not None: # if there is a token in dfile, then use it + oauth_token = dfile_token + elif conn_id is not None: # otherwise, use the one from the connection + conn: YaDocsFileS3Connection = await usm.get_by_id(conn_id, YaDocsFileS3Connection) + if conn.data.oauth_token is None: + raise NoToken() + oauth_token = conn.data.oauth_token + else: + raise NoToken() + + return oauth_token + + @attr.s class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]): cls_meta = task_interface.DownloadYaDocsTask @@ -47,6 +72,7 @@ async def run(self) -> TaskResult: dfile: Optional[DataFile] = None redis = self._ctx.redis_service.get_redis() task_processor = self._ctx.make_task_processor(self._request_id) + usm = self._ctx.get_async_usm() try: rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) @@ -55,6 +81,14 @@ async def run(self) -> TaskResult: assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties) + if self.meta.authorized: + try: + dfile_token = dfile.user_source_properties.oauth_token + auth = await _get_yadocs_oauth_token(dfile_token, self.meta.connection_id, usm) + except NoToken: + LOGGER.error("Authorized call but no token found in either DataFile or connection, failing task") + return Fail() + async with aiohttp.ClientSession() as session: yadocs_client = YaDocsClient(session) try: @@ -91,7 +125,9 @@ async def run(self) -> TaskResult: dfile.filename = spreadsheet_meta["name"] xlsx_suffix = ".xlsx" if not dfile.filename.endswith(xlsx_suffix): - raise exc.UnsupportedDocument + raise exc.UnsupportedDocument( + details={"reason": f"Supported file extensions are: '.xlsx'. Got {dfile.filename}"} + ) s3 = self._ctx.s3_service From 17322c1210cc95f45d8a9bf380b96aafdc089d95 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Fri, 10 Nov 2023 17:01:59 +0300 Subject: [PATCH 2/2] Get token from saved connection when required --- .../dl_file_uploader_api_lib/schemas/files.py | 1 + .../dl_file_uploader_api_lib/views/files.py | 8 +++++++- .../ext/test_documents_api.py | 2 +- .../dl_file_uploader_api_lib_tests/req_builder.py | 2 ++ .../dl_file_uploader_task_interface/tasks.py | 2 ++ .../tasks/download_yadocs.py | 14 ++++++-------- .../ext/test_yado\321\201s.py" | 2 +- 7 files changed, 20 insertions(+), 11 deletions(-) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py index 4be3f8a5b..705cc5e28 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py @@ -49,6 +49,7 @@ class FileDocumentsRequestSchema(BaseRequestSchema): private_path = ma.fields.String(load_default=None, allow_none=True) oauth_token = ma.fields.String(load_default=None, allow_none=True) public_link = ma.fields.String(load_default=None, allow_none=True) + authorized = ma.fields.Boolean(required=True) @ma.validates_schema(skip_on_field_errors=True) def validate_object(self, data: dict, **kwargs: Any) -> None: diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py index c149e252c..048de30d6 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py @@ -199,7 +199,13 @@ async def post(self) -> web.StreamResponse: await df.save() task_processor = self.dl_request.get_task_processor() - await task_processor.schedule(DownloadYaDocsTask(file_id=df.id)) + await task_processor.schedule( + DownloadYaDocsTask( + file_id=df.id, + authorized=authorized, + connection_id=connection_id, + ) + ) LOGGER.info(f"Scheduled DownloadYaDocsTask for file_id {df.id}") return web.json_response( diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py index f98b6acec..02eab8fe8 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py @@ -54,7 +54,7 @@ async def test_yadocuments_private_file( ): private_path = "test private table.xlsx" resp = await fu_client.make_request( - ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token) + ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token, authorized=True) ) assert resp.status == 201 file_id = resp.json["file_id"] diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py index 6f300d387..46fa5ee23 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py @@ -81,6 +81,7 @@ def upload_documents( public_link: Optional[str] = None, private_path: Optional[str] = None, oauth_token: Optional[str] = None, + authorized: bool = False, *, require_ok: bool = True, ) -> Req: @@ -91,6 +92,7 @@ def upload_documents( "public_link": public_link, "private_path": private_path, "oauth_token": oauth_token, + "authorized": authorized, }, require_ok=require_ok, ) diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index 59d3b7207..38ac6e640 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -34,6 +34,8 @@ class DownloadYaDocsTask(BaseTaskMeta): file_id: str = attr.ib() authorized: bool = attr.ib() + + tenant_id: Optional[str] = attr.ib(default=None) connection_id: Optional[str] = attr.ib(default=None) diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index 0b1222c21..bb7f52944 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -46,7 +46,7 @@ class NoToken(Exception): pass -def _get_yadocs_oauth_token( +async def _get_yadocs_oauth_token( dfile_token: Optional[str], conn_id: Optional[str], usm: AsyncUSManager, @@ -81,10 +81,11 @@ async def run(self) -> TaskResult: assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties) + oauth_token: Optional[str] = None if self.meta.authorized: try: dfile_token = dfile.user_source_properties.oauth_token - auth = await _get_yadocs_oauth_token(dfile_token, self.meta.connection_id, usm) + oauth_token = await _get_yadocs_oauth_token(dfile_token, self.meta.connection_id, usm) except NoToken: LOGGER.error("Authorized call but no token found in either DataFile or connection, failing task") return Fail() @@ -100,17 +101,14 @@ async def run(self) -> TaskResult: link=dfile.user_source_properties.public_link ) - elif ( - dfile.user_source_properties.private_path is not None - and dfile.user_source_properties.oauth_token is not None - ): + elif dfile.user_source_properties.private_path is not None and oauth_token is not None: spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref( path=dfile.user_source_properties.private_path, - token=dfile.user_source_properties.oauth_token, + token=oauth_token, ) spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta( path=dfile.user_source_properties.private_path, - token=dfile.user_source_properties.oauth_token, + token=oauth_token, ) else: raise exc.DLFileUploaderBaseError() diff --git "a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" index 3a0ff9e18..d6f45bbad 100644 --- "a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" +++ "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" @@ -28,7 +28,7 @@ async def test_download_yadocs_task( ) await df.save() - task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id)) + task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id, authorized=False)) result = await wait_task(task, task_state) assert result[-1] == "success"