diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py index 6a45ae905..705cc5e28 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py @@ -26,8 +26,10 @@ def validate_docs_data(data): if not ((data["public_link"] is None) ^ (data["private_path"] is None)): raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified") if data["public_link"] is None: - if data["private_path"] is None or data["oauth_token"] is None: - raise ma.ValidationError("Both path and token must be provided for private files") + if data["private_path"] is None: + raise ma.ValidationError("'private_path' must be provided for private files") + elif data["oauth_token"] is None and data["connection_id"] is None: + raise ma.ValidationError("Expected `oauth_token` or `connection_id` to be specified") class FileLinkRequestSchema(BaseRequestSchema): @@ -43,10 +45,11 @@ def validate_object(self, data: dict, **kwargs: Any) -> None: class FileDocumentsRequestSchema(BaseRequestSchema): - # connection_id = ma.fields.String(load_default=None, allow_none=True) + connection_id = ma.fields.String(load_default=None, allow_none=True) private_path = ma.fields.String(load_default=None, allow_none=True) oauth_token = ma.fields.String(load_default=None, allow_none=True) public_link = ma.fields.String(load_default=None, allow_none=True) + authorized = ma.fields.Boolean(required=True) @ma.validates_schema(skip_on_field_errors=True) def validate_object(self, data: dict, **kwargs: Any) -> None: diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py index da845e1c8..048de30d6 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py @@ -185,6 +185,8 @@ async def post(self) -> web.StreamResponse: oauth_token: Optional[str] = req_data["oauth_token"] public_link: Optional[str] = req_data["public_link"] private_path: Optional[str] = req_data["private_path"] + connection_id: Optional[str] = req_data["connection_id"] + authorized: bool = req_data["authorized"] df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type]( oauth_token=oauth_token, @@ -197,8 +199,14 @@ async def post(self) -> web.StreamResponse: await df.save() task_processor = self.dl_request.get_task_processor() - await task_processor.schedule(DownloadYaDocsTask(file_id=df.id)) - LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}") + await task_processor.schedule( + DownloadYaDocsTask( + file_id=df.id, + authorized=authorized, + connection_id=connection_id, + ) + ) + LOGGER.info(f"Scheduled DownloadYaDocsTask for file_id {df.id}") return web.json_response( files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}), diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py index f98b6acec..02eab8fe8 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_documents_api.py @@ -54,7 +54,7 @@ async def test_yadocuments_private_file( ): private_path = "test private table.xlsx" resp = await fu_client.make_request( - ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token) + ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token, authorized=True) ) assert resp.status == 201 file_id = resp.json["file_id"] diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py index 6f300d387..46fa5ee23 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py @@ -81,6 +81,7 @@ def upload_documents( public_link: Optional[str] = None, private_path: Optional[str] = None, oauth_token: Optional[str] = None, + authorized: bool = False, *, require_ok: bool = True, ) -> Req: @@ -91,6 +92,7 @@ def upload_documents( "public_link": public_link, "private_path": private_path, "oauth_token": oauth_token, + "authorized": authorized, }, require_ok=require_ok, ) diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index 00ba5d634..38ac6e640 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -33,6 +33,10 @@ class DownloadYaDocsTask(BaseTaskMeta): name = TaskName("download_yadocs") file_id: str = attr.ib() + authorized: bool = attr.ib() + + tenant_id: Optional[str] = attr.ib(default=None) + connection_id: Optional[str] = attr.ib(default=None) @attr.s diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index fc2d9f8e9..bb7f52944 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -10,6 +10,7 @@ import attr from dl_constants.enums import FileProcessingStatus +from dl_core.us_manager.us_manager_async import AsyncUSManager from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.raw_bytes import ( RawBytesAsyncDataStream, @@ -35,10 +36,34 @@ TaskResult, ) +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + LOGGER = logging.getLogger(__name__) +class NoToken(Exception): + pass + + +async def _get_yadocs_oauth_token( + dfile_token: Optional[str], + conn_id: Optional[str], + usm: AsyncUSManager, +) -> Optional[str]: + if dfile_token is not None: # if there is a token in dfile, then use it + oauth_token = dfile_token + elif conn_id is not None: # otherwise, use the one from the connection + conn: YaDocsFileS3Connection = await usm.get_by_id(conn_id, YaDocsFileS3Connection) + if conn.data.oauth_token is None: + raise NoToken() + oauth_token = conn.data.oauth_token + else: + raise NoToken() + + return oauth_token + + @attr.s class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]): cls_meta = task_interface.DownloadYaDocsTask @@ -47,6 +72,7 @@ async def run(self) -> TaskResult: dfile: Optional[DataFile] = None redis = self._ctx.redis_service.get_redis() task_processor = self._ctx.make_task_processor(self._request_id) + usm = self._ctx.get_async_usm() try: rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) @@ -55,6 +81,15 @@ async def run(self) -> TaskResult: assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties) + oauth_token: Optional[str] = None + if self.meta.authorized: + try: + dfile_token = dfile.user_source_properties.oauth_token + oauth_token = await _get_yadocs_oauth_token(dfile_token, self.meta.connection_id, usm) + except NoToken: + LOGGER.error("Authorized call but no token found in either DataFile or connection, failing task") + return Fail() + async with aiohttp.ClientSession() as session: yadocs_client = YaDocsClient(session) try: @@ -66,17 +101,14 @@ async def run(self) -> TaskResult: link=dfile.user_source_properties.public_link ) - elif ( - dfile.user_source_properties.private_path is not None - and dfile.user_source_properties.oauth_token is not None - ): + elif dfile.user_source_properties.private_path is not None and oauth_token is not None: spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref( path=dfile.user_source_properties.private_path, - token=dfile.user_source_properties.oauth_token, + token=oauth_token, ) spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta( path=dfile.user_source_properties.private_path, - token=dfile.user_source_properties.oauth_token, + token=oauth_token, ) else: raise exc.DLFileUploaderBaseError() @@ -91,7 +123,9 @@ async def run(self) -> TaskResult: dfile.filename = spreadsheet_meta["name"] xlsx_suffix = ".xlsx" if not dfile.filename.endswith(xlsx_suffix): - raise exc.UnsupportedDocument + raise exc.UnsupportedDocument( + details={"reason": f"Supported file extensions are: '.xlsx'. Got {dfile.filename}"} + ) s3 = self._ctx.s3_service diff --git "a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" index 3a0ff9e18..d6f45bbad 100644 --- "a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" +++ "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" @@ -28,7 +28,7 @@ async def test_download_yadocs_task( ) await df.save() - task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id)) + task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id, authorized=False)) result = await wait_task(task, task_state) assert result[-1] == "success"