From 3774068a8023f773d7c0cb581b1d6e296cd85b31 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Tue, 10 Oct 2023 18:31:51 +0300 Subject: [PATCH 1/7] Download yadocs task --- Taskfile.dist.yml | 2 +- .../schemas/sources.py | 1 + .../dl_file_uploader_lib/enums.py | 1 + .../redis_model/models/__init__.py | 2 + .../redis_model/models/models.py | 10 +++ .../redis_model/models/storage_schemas.py | 11 +++ .../yadocuments_client.py | 43 ++++++++++ .../dl_file_uploader_task_interface/tasks.py | 7 ++ .../tasks/__init__.py | 6 +- .../tasks/download.py | 84 +++++++++++++++++++ .../ext/test_yadocuments.py | 35 ++++++++ 11 files changed, 200 insertions(+), 2 deletions(-) create mode 100644 lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py create mode 100644 lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py diff --git a/Taskfile.dist.yml b/Taskfile.dist.yml index 5b8b2482b..eae80e6c0 100644 --- a/Taskfile.dist.yml +++ b/Taskfile.dist.yml @@ -2,7 +2,7 @@ version: '3' vars: USER_REL_PATH: - sh: python -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))" + sh: python3 -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))" ROOT_DIR: sh: pwd diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py index 445b64c20..1ce595a29 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py @@ -107,6 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema): FileType.csv.name: SourceInfoSchemaBase, FileType.gsheets.name: SourceInfoSchemaGSheets, FileType.xlsx.name: SourceInfoSchemaBase, + FileType.yadocuments.name: SourceInfoSchemaBase, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py index 3d3627557..432f6987d 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py @@ -18,6 +18,7 @@ class FileType(Enum): csv = "csv" gsheets = "gsheets" xlsx = "xlsx" + yadocuments = "yadocuments" class CSVEncoding(Enum): diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py index b3bbd3d14..d07e8d0c7 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py @@ -19,6 +19,7 @@ RenameTenantStatusModel, SourceNotFoundError, SpreadsheetFileSourceSettings, + YaDocumentsUserSourceProperties, ) from .storage_schemas import ( DataFileSchema, @@ -46,6 +47,7 @@ "EmptySourcesError", "RenameTenantStatusModel", "PreviewSet", + "YaDocumentsUserSourceProperties", ) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py index 85abda58a..10d729382 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py @@ -102,6 +102,16 @@ def get_secret_keys(self) -> set[DataKey]: return {DataKey(parts=("refresh_token",))} +@attr.s(init=True, kw_only=True) +class YaDocumentsUserSourceProperties(UserSourceProperties): + file_type: FileType = attr.ib(default=FileType.yadocuments) + + private_path: Optional[str] = attr.ib(default=None) + public_link: Optional[str] = attr.ib(default=None) + + oauth_token: Optional[str] = attr.ib(default=None) + + @attr.s(init=True, kw_only=True) class UserSourceDataSourceProperties: file_type: FileType = attr.ib() diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py index b138dac9d..1d80265f6 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py @@ -35,6 +35,7 @@ GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, RenameTenantStatusModel, + YaDocumentsUserSourceProperties, ) @@ -151,9 +152,19 @@ class Meta(BaseSchema.Meta): spreadsheet_id = fields.String() +class YaDocumentsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema): + class Meta(BaseSchema.Meta): + target = YaDocumentsUserSourceProperties + + private_path = fields.String(allow_none=True) + public_link = fields.String(allow_none=True) + oauth_token = fields.String(allow_none=True) + + class UserSourcePropertiesSchema(FileTypeOneOfSchema): type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = { FileType.gsheets.name: GSheetsUserSourcePropertiesSchema, + FileType.yadocuments.name: YaDocumentsUserSourcePropertiesSchema, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py new file mode 100644 index 000000000..57e869708 --- /dev/null +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py @@ -0,0 +1,43 @@ +import requests + + +class YaDocumentsClient: + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + + def get_spreadsheet_public(self, link): + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/public/resources/download/?public_key={link}", + headers=self.headers, + ) + return resp.json()["href"] + + def get_spreadsheet_public_meta(self, link): + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/public/resources/?public_key={link}", + headers=self.headers, + ) + return resp.json() + + def get_spreadsheet_private(self, path, token): + headers_with_token = self._create_headers_with_token(token) + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/resources/download/?path={path}", + headers=headers_with_token, + ) + return resp.json()["href"] + + def get_spreadsheet_private_meta(self, path, token): + headers_with_token = self._create_headers_with_token(token) + resp = requests.get( + f"https://cloud-api.yandex.net/v1/disk/resources/?path={path}", + headers=headers_with_token, + ) + return resp.json() + + def _create_headers_with_token(self, token: str): + headers_with_token = self.headers.copy() + headers_with_token.update({"Authorization": "OAuth " + token}) + return headers_with_token diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index ee8ff92be..772e19461 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -28,6 +28,13 @@ class DownloadGSheetTask(BaseTaskMeta): exec_mode: TaskExecutionMode = attr.ib(default=TaskExecutionMode.BASIC) +@attr.s +class DownloadYaDocumentsTask(BaseTaskMeta): + name = TaskName("download_yadocuments") + + file_id: str = attr.ib() + + @attr.s class ParseFileTask(BaseTaskMeta): name = TaskName("parse_file") diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py index c3c7b5b4d..19da19f71 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py @@ -7,7 +7,10 @@ RenameTenantFilesTask, ) from .delete import DeleteFileTask -from .download import DownloadGSheetTask +from .download import ( + DownloadGSheetTask, + DownloadYaDocumentsTask, +) from .excel import ProcessExcelTask from .parse import ParseFileTask from .save import SaveSourceTask @@ -16,6 +19,7 @@ REGISTRY: TaskRegistry = TaskRegistry.create( [ DownloadGSheetTask, + DownloadYaDocumentsTask, ParseFileTask, ProcessExcelTask, SaveSourceTask, diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py index 8750015e8..2c9093fd2 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py @@ -11,6 +11,7 @@ ) import aiogoogle +import aiohttp import attr from dl_constants.enums import ( @@ -26,6 +27,10 @@ from dl_core.us_manager.us_manager_async import AsyncUSManager from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.json_each_row import S3JsonEachRowUntypedFileAsyncDataSink +from dl_file_uploader_lib.data_sink.raw_bytes import ( + RawBytesAsyncDataStream, + S3RawFileAsyncDataSink, +) from dl_file_uploader_lib.gsheets_client import ( GSheetsClient, GSheetsOAuth2, @@ -39,7 +44,9 @@ GSheetsFileSourceSettings, GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, + YaDocumentsUserSourceProperties, ) +from dl_file_uploader_lib.yadocuments_client import YaDocumentsClient from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface from dl_file_uploader_task_interface.tasks import TaskExecutionMode @@ -340,3 +347,80 @@ async def run(self) -> TaskResult: finally: await usm.close() return Success() + + +@attr.s +class DownloadYaDocumentsTask(BaseExecutorTask[task_interface.DownloadYaDocumentsTask, FileUploaderTaskContext]): + cls_meta = task_interface.DownloadYaDocumentsTask + + async def run(self) -> TaskResult: + dfile: Optional[DataFile] = None + redis = self._ctx.redis_service.get_redis() + + try: + rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) + dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) + assert dfile is not None + + assert isinstance(dfile.user_source_properties, YaDocumentsUserSourceProperties) + + yadocs_client = YaDocumentsClient() + + if dfile.user_source_properties.public_link is not None: + spreadsheet_ref = yadocs_client.get_spreadsheet_public(link=dfile.user_source_properties.public_link) + spreadsheet_meta = yadocs_client.get_spreadsheet_public_meta( + link=dfile.user_source_properties.public_link + ) + + elif ( + dfile.user_source_properties.private_path is not None + and dfile.user_source_properties.oauth_token is not None + ): + spreadsheet_ref = yadocs_client.get_spreadsheet_private( + path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token + ) + spreadsheet_meta = yadocs_client.get_spreadsheet_private_meta( + path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token + ) + else: + raise exc.DLFileUploaderBaseError() + dfile.filename = spreadsheet_meta["name"] + + s3 = self._ctx.s3_service + + async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): + async with aiohttp.ClientSession() as session: + async with session.get(spreadsheet_ref) as response: + assert response.status == 200 + while True: + chunk = await response.content.read(chunk_size) + if chunk: + LOGGER.debug(f"Received chunk of {len(chunk)} bytes.") + yield chunk + else: + LOGGER.info("Empty chunk received.") + break + + data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) + async with S3RawFileAsyncDataSink( + s3=s3.client, + s3_key=dfile.s3_key, + bucket_name=s3.tmp_bucket_name, + ) as data_sink: + await data_sink.dump_data_stream(data_stream) + + await dfile.save() + LOGGER.info(f'Uploaded file "{dfile.filename}".') + + except Exception as ex: + LOGGER.exception(ex) + if dfile is None: + return Retry(attempts=3) + else: + dfile.status = FileProcessingStatus.failed + exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed() + dfile.error = FileProcessingError.from_exception(exc_to_save) + await dfile.save() + + return Fail() + return Success() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py new file mode 100644 index 000000000..dfd17e417 --- /dev/null +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py @@ -0,0 +1,35 @@ +import pytest + +from dl_constants.enums import FileProcessingStatus +from dl_file_uploader_lib.enums import FileType +from dl_file_uploader_lib.redis_model.models import ( + DataFile, + YaDocumentsUserSourceProperties, +) +from dl_file_uploader_task_interface.tasks import DownloadYaDocumentsTask +from dl_task_processor.state import wait_task +from dl_testing.s3_utils import s3_file_exists + + +@pytest.mark.asyncio +async def test_download_yadocs_task( + task_processor_client, + task_state, + s3_client, + redis_model_manager, + s3_tmp_bucket, +): + df = DataFile( + filename="", + file_type=FileType.yadocuments, + manager=redis_model_manager, + status=FileProcessingStatus.in_progress, + user_source_properties=YaDocumentsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), + ) + await df.save() + + task = await task_processor_client.schedule(DownloadYaDocumentsTask(file_id=df.id)) + result = await wait_task(task, task_state) + + assert result[-1] == "success" + assert await s3_file_exists(key=df.s3_key, bucket=s3_tmp_bucket, s3_client=s3_client) From 165cf3a0ee4926b6b977693bf323f6ed8415dd1b Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Tue, 10 Oct 2023 18:33:31 +0300 Subject: [PATCH 2/7] Download yadocs task --- Taskfile.dist.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Taskfile.dist.yml b/Taskfile.dist.yml index eae80e6c0..5b8b2482b 100644 --- a/Taskfile.dist.yml +++ b/Taskfile.dist.yml @@ -2,7 +2,7 @@ version: '3' vars: USER_REL_PATH: - sh: python3 -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))" + sh: python -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))" ROOT_DIR: sh: pwd From ab5a41d190c2f89c996f4a012705098d0880d4c0 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Thu, 12 Oct 2023 14:11:08 +0300 Subject: [PATCH 3/7] Fix issues --- .../schemas/sources.py | 2 +- .../dl_file_uploader_lib/enums.py | 2 +- .../redis_model/models/__init__.py | 4 +- .../redis_model/models/models.py | 9 +- .../redis_model/models/storage_schemas.py | 8 +- .../dl_file_uploader_lib/yadocs_client.py | 80 +++++++++++ .../yadocuments_client.py | 43 ------ .../dl_file_uploader_task_interface/tasks.py | 4 +- .../tasks/__init__.py | 8 +- .../{download.py => download_gsheets.py} | 84 ------------ .../tasks/download_yadocs.py | 126 ++++++++++++++++++ .../ext/test_yado\321\201s.py" | 10 +- 12 files changed, 230 insertions(+), 150 deletions(-) create mode 100644 lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py delete mode 100644 lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py rename lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/{download.py => download_gsheets.py} (81%) create mode 100644 lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py rename lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py => "lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" (68%) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py index 1ce595a29..bd672d960 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py @@ -107,7 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema): FileType.csv.name: SourceInfoSchemaBase, FileType.gsheets.name: SourceInfoSchemaGSheets, FileType.xlsx.name: SourceInfoSchemaBase, - FileType.yadocuments.name: SourceInfoSchemaBase, + FileType.yadocs.name: SourceInfoSchemaBase, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py index 432f6987d..88bfa3d54 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py @@ -18,7 +18,7 @@ class FileType(Enum): csv = "csv" gsheets = "gsheets" xlsx = "xlsx" - yadocuments = "yadocuments" + yadocs = "yadocs" class CSVEncoding(Enum): diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py index d07e8d0c7..7922a09f9 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py @@ -19,7 +19,7 @@ RenameTenantStatusModel, SourceNotFoundError, SpreadsheetFileSourceSettings, - YaDocumentsUserSourceProperties, + YaDocsUserSourceProperties, ) from .storage_schemas import ( DataFileSchema, @@ -47,7 +47,7 @@ "EmptySourcesError", "RenameTenantStatusModel", "PreviewSet", - "YaDocumentsUserSourceProperties", + "YaDocsUserSourceProperties", ) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py index 10d729382..6f2711d45 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py @@ -103,13 +103,16 @@ def get_secret_keys(self) -> set[DataKey]: @attr.s(init=True, kw_only=True) -class YaDocumentsUserSourceProperties(UserSourceProperties): - file_type: FileType = attr.ib(default=FileType.yadocuments) +class YaDocsUserSourceProperties(UserSourceProperties): + file_type: FileType = attr.ib(default=FileType.yadocs) private_path: Optional[str] = attr.ib(default=None) public_link: Optional[str] = attr.ib(default=None) - oauth_token: Optional[str] = attr.ib(default=None) + oauth_token: Optional[str] = attr.ib(default=None, repr=False) + + def get_secret_keys(self) -> set[DataKey]: + return {DataKey(parts=("oauth_token",))} @attr.s(init=True, kw_only=True) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py index 1d80265f6..23f2031ed 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py @@ -35,7 +35,7 @@ GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, RenameTenantStatusModel, - YaDocumentsUserSourceProperties, + YaDocsUserSourceProperties, ) @@ -152,9 +152,9 @@ class Meta(BaseSchema.Meta): spreadsheet_id = fields.String() -class YaDocumentsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema): +class YaDocsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema): class Meta(BaseSchema.Meta): - target = YaDocumentsUserSourceProperties + target = YaDocsUserSourceProperties private_path = fields.String(allow_none=True) public_link = fields.String(allow_none=True) @@ -164,7 +164,7 @@ class Meta(BaseSchema.Meta): class UserSourcePropertiesSchema(FileTypeOneOfSchema): type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = { FileType.gsheets.name: GSheetsUserSourcePropertiesSchema, - FileType.yadocuments.name: YaDocumentsUserSourcePropertiesSchema, + FileType.yadocs.name: YaDocsUserSourcePropertiesSchema, } diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py new file mode 100644 index 000000000..1d1690c72 --- /dev/null +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py @@ -0,0 +1,80 @@ +from typing import Any + +import aiohttp +from aiohttp.client import ( + ClientResponse, + ClientSession, +) + +from dl_file_uploader_lib import exc as file_upl_exc + + +def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict): + if status_code == 401: + err_cls = file_upl_exc.PermissionDenied + elif status_code == 404: + err_cls = file_upl_exc.DocumentNotFound + elif status_code == 400: + err_cls = file_upl_exc.UnsupportedDocument + elif status_code >= 500: + err_cls = file_upl_exc.RemoteServerError + else: + err_cls = file_upl_exc.DLFileUploaderBaseError + + return err_cls( + details=resp_info, + ) + + +class YaDocsClient: + headers: dict[str, Any] = { + "Content-Type": "application/json", + "Accept": "application/json", + } + hostname: str = "https://cloud-api.yandex.net/v1/disk" + + def __init__(self, session: ClientSession): + self.session = session + + async def get_spreadsheet_public_ref(self, link: str) -> str: + resp = await self.session.get( + f"{self.hostname}/public/resources/download/?public_key={link}", + headers=self.headers, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return (await resp.json())["href"] + + async def get_spreadsheet_public_meta(self, link: str) -> ClientResponse: + resp = await self.session.get( + f"{self.hostname}/public/resources/?public_key={link}", + headers=self.headers, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return await resp.json() + + async def get_spreadsheet_private_ref(self, path: str, token: str) -> str: + headers_with_token = self._create_headers_with_token(token) + resp = await self.session.get( + f"{self.hostname}/resources/download/?path={path}", + headers=headers_with_token, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return (await resp.json())["href"] + + async def get_spreadsheet_private_meta(self, path: str, token: str) -> ClientResponse: + headers_with_token = self._create_headers_with_token(token) + resp = await self.session.get( + f"{self.hostname}/resources/?path={path}", + headers=headers_with_token, + ) + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + return await resp.json() + + def _create_headers_with_token(self, token: str) -> dict[str, Any]: + headers_with_token = self.headers.copy() + headers_with_token.update({"Authorization": "OAuth " + token}) + return headers_with_token diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py deleted file mode 100644 index 57e869708..000000000 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocuments_client.py +++ /dev/null @@ -1,43 +0,0 @@ -import requests - - -class YaDocumentsClient: - headers = { - "Content-Type": "application/json", - "Accept": "application/json", - } - - def get_spreadsheet_public(self, link): - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/public/resources/download/?public_key={link}", - headers=self.headers, - ) - return resp.json()["href"] - - def get_spreadsheet_public_meta(self, link): - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/public/resources/?public_key={link}", - headers=self.headers, - ) - return resp.json() - - def get_spreadsheet_private(self, path, token): - headers_with_token = self._create_headers_with_token(token) - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/resources/download/?path={path}", - headers=headers_with_token, - ) - return resp.json()["href"] - - def get_spreadsheet_private_meta(self, path, token): - headers_with_token = self._create_headers_with_token(token) - resp = requests.get( - f"https://cloud-api.yandex.net/v1/disk/resources/?path={path}", - headers=headers_with_token, - ) - return resp.json() - - def _create_headers_with_token(self, token: str): - headers_with_token = self.headers.copy() - headers_with_token.update({"Authorization": "OAuth " + token}) - return headers_with_token diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index 772e19461..682df4904 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -29,8 +29,8 @@ class DownloadGSheetTask(BaseTaskMeta): @attr.s -class DownloadYaDocumentsTask(BaseTaskMeta): - name = TaskName("download_yadocuments") +class DownloadYaDocsTask(BaseTaskMeta): + name = TaskName("download_yadocs") file_id: str = attr.ib() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py index 19da19f71..f10c1d470 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/__init__.py @@ -7,10 +7,8 @@ RenameTenantFilesTask, ) from .delete import DeleteFileTask -from .download import ( - DownloadGSheetTask, - DownloadYaDocumentsTask, -) +from .download_gsheets import DownloadGSheetTask +from .download_yadocs import DownloadYaDocsTask from .excel import ProcessExcelTask from .parse import ParseFileTask from .save import SaveSourceTask @@ -19,7 +17,7 @@ REGISTRY: TaskRegistry = TaskRegistry.create( [ DownloadGSheetTask, - DownloadYaDocumentsTask, + DownloadYaDocsTask, ParseFileTask, ProcessExcelTask, SaveSourceTask, diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py similarity index 81% rename from lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py rename to lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py index 2c9093fd2..8750015e8 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_gsheets.py @@ -11,7 +11,6 @@ ) import aiogoogle -import aiohttp import attr from dl_constants.enums import ( @@ -27,10 +26,6 @@ from dl_core.us_manager.us_manager_async import AsyncUSManager from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.json_each_row import S3JsonEachRowUntypedFileAsyncDataSink -from dl_file_uploader_lib.data_sink.raw_bytes import ( - RawBytesAsyncDataStream, - S3RawFileAsyncDataSink, -) from dl_file_uploader_lib.gsheets_client import ( GSheetsClient, GSheetsOAuth2, @@ -44,9 +39,7 @@ GSheetsFileSourceSettings, GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, - YaDocumentsUserSourceProperties, ) -from dl_file_uploader_lib.yadocuments_client import YaDocumentsClient from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface from dl_file_uploader_task_interface.tasks import TaskExecutionMode @@ -347,80 +340,3 @@ async def run(self) -> TaskResult: finally: await usm.close() return Success() - - -@attr.s -class DownloadYaDocumentsTask(BaseExecutorTask[task_interface.DownloadYaDocumentsTask, FileUploaderTaskContext]): - cls_meta = task_interface.DownloadYaDocumentsTask - - async def run(self) -> TaskResult: - dfile: Optional[DataFile] = None - redis = self._ctx.redis_service.get_redis() - - try: - rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) - dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) - assert dfile is not None - - assert isinstance(dfile.user_source_properties, YaDocumentsUserSourceProperties) - - yadocs_client = YaDocumentsClient() - - if dfile.user_source_properties.public_link is not None: - spreadsheet_ref = yadocs_client.get_spreadsheet_public(link=dfile.user_source_properties.public_link) - spreadsheet_meta = yadocs_client.get_spreadsheet_public_meta( - link=dfile.user_source_properties.public_link - ) - - elif ( - dfile.user_source_properties.private_path is not None - and dfile.user_source_properties.oauth_token is not None - ): - spreadsheet_ref = yadocs_client.get_spreadsheet_private( - path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token - ) - spreadsheet_meta = yadocs_client.get_spreadsheet_private_meta( - path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token - ) - else: - raise exc.DLFileUploaderBaseError() - dfile.filename = spreadsheet_meta["name"] - - s3 = self._ctx.s3_service - - async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): - async with aiohttp.ClientSession() as session: - async with session.get(spreadsheet_ref) as response: - assert response.status == 200 - while True: - chunk = await response.content.read(chunk_size) - if chunk: - LOGGER.debug(f"Received chunk of {len(chunk)} bytes.") - yield chunk - else: - LOGGER.info("Empty chunk received.") - break - - data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) - async with S3RawFileAsyncDataSink( - s3=s3.client, - s3_key=dfile.s3_key, - bucket_name=s3.tmp_bucket_name, - ) as data_sink: - await data_sink.dump_data_stream(data_stream) - - await dfile.save() - LOGGER.info(f'Uploaded file "{dfile.filename}".') - - except Exception as ex: - LOGGER.exception(ex) - if dfile is None: - return Retry(attempts=3) - else: - dfile.status = FileProcessingStatus.failed - exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed() - dfile.error = FileProcessingError.from_exception(exc_to_save) - await dfile.save() - - return Fail() - return Success() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py new file mode 100644 index 000000000..2b39ecedd --- /dev/null +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -0,0 +1,126 @@ +from __future__ import annotations + + +import logging +from typing import Optional + +import aiohttp +import attr + +from dl_constants.enums import FileProcessingStatus + +from dl_file_uploader_lib import exc +from dl_file_uploader_lib.data_sink.raw_bytes import ( + RawBytesAsyncDataStream, + S3RawFileAsyncDataSink, +) + +from dl_file_uploader_lib.redis_model.base import RedisModelManager +from dl_file_uploader_lib.redis_model.models import ( + DataFile, + FileProcessingError, + YaDocsUserSourceProperties, +) +from dl_file_uploader_lib.yadocs_client import YaDocsClient, yadocs_error_to_file_uploader_exception +from dl_file_uploader_task_interface.context import FileUploaderTaskContext +import dl_file_uploader_task_interface.tasks as task_interface +from dl_task_processor.task import ( + BaseExecutorTask, + Fail, + Retry, + Success, + TaskResult, +) + + +LOGGER = logging.getLogger(__name__) + + +@attr.s +class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]): + cls_meta = task_interface.DownloadYaDocsTask + + async def run(self) -> TaskResult: + dfile: Optional[DataFile] = None + redis = self._ctx.redis_service.get_redis() + + try: + rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) + dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) + assert dfile is not None + + assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties) + + async with aiohttp.ClientSession() as session: + yadocs_client = YaDocsClient(session) + try: + if dfile.user_source_properties.public_link is not None: + spreadsheet_ref = await yadocs_client.get_spreadsheet_public_ref( + link=dfile.user_source_properties.public_link + ) + spreadsheet_meta = await yadocs_client.get_spreadsheet_public_meta( + link=dfile.user_source_properties.public_link + ) + + elif ( + dfile.user_source_properties.private_path is not None + and dfile.user_source_properties.oauth_token is not None + ): + spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref( + path=dfile.user_source_properties.private_path, + token=dfile.user_source_properties.oauth_token, + ) + spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta( + path=dfile.user_source_properties.private_path, + token=dfile.user_source_properties.oauth_token, + ) + else: + raise exc.DLFileUploaderBaseError() + except exc.DLFileUploaderBaseError as e: + LOGGER.exception(e) + download_error = FileProcessingError.from_exception(e) + dfile.status = FileProcessingStatus.failed + dfile.error = download_error + await dfile.save() + return Success() + + dfile.filename = spreadsheet_meta["name"] + s3 = self._ctx.s3_service + + async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): + async with aiohttp.ClientSession() as session: + async with session.get(spreadsheet_ref) as resp: + if resp.status != 200: + raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) + while True: + chunk = await resp.content.read(chunk_size) + if chunk: + LOGGER.debug(f"Received chunk of {len(chunk)} bytes.") + yield chunk + else: + LOGGER.info("Empty chunk received.") + break + + data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) + async with S3RawFileAsyncDataSink( + s3=s3.client, + s3_key=dfile.s3_key, + bucket_name=s3.tmp_bucket_name, + ) as data_sink: + await data_sink.dump_data_stream(data_stream) + + await dfile.save() + LOGGER.info(f'Uploaded file "{dfile.filename}".') + + except Exception as ex: + LOGGER.exception(ex) + if dfile is None: + return Retry(attempts=3) + else: + dfile.status = FileProcessingStatus.failed + exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed() + dfile.error = FileProcessingError.from_exception(exc_to_save) + await dfile.save() + + return Fail() + return Success() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" similarity index 68% rename from lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py rename to "lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" index dfd17e417..3a0ff9e18 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocuments.py +++ "b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" @@ -4,9 +4,9 @@ from dl_file_uploader_lib.enums import FileType from dl_file_uploader_lib.redis_model.models import ( DataFile, - YaDocumentsUserSourceProperties, + YaDocsUserSourceProperties, ) -from dl_file_uploader_task_interface.tasks import DownloadYaDocumentsTask +from dl_file_uploader_task_interface.tasks import DownloadYaDocsTask from dl_task_processor.state import wait_task from dl_testing.s3_utils import s3_file_exists @@ -21,14 +21,14 @@ async def test_download_yadocs_task( ): df = DataFile( filename="", - file_type=FileType.yadocuments, + file_type=FileType.yadocs, manager=redis_model_manager, status=FileProcessingStatus.in_progress, - user_source_properties=YaDocumentsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), + user_source_properties=YaDocsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), ) await df.save() - task = await task_processor_client.schedule(DownloadYaDocumentsTask(file_id=df.id)) + task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id)) result = await wait_task(task, task_state) assert result[-1] == "success" From 2c9f0c4f6bfd34105184373390a8fefa05399091 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Thu, 12 Oct 2023 14:19:29 +0300 Subject: [PATCH 4/7] Small type annotation fix --- lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py index 1d1690c72..4db8d798b 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py @@ -9,7 +9,7 @@ from dl_file_uploader_lib import exc as file_upl_exc -def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict): +def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict) -> file_upl_exc.DLFileUploaderBaseError: if status_code == 401: err_cls = file_upl_exc.PermissionDenied elif status_code == 404: From 4b97c2596495a01f030e46f1040429cded30d382 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Thu, 12 Oct 2023 15:18:20 +0300 Subject: [PATCH 5/7] Small unused import fix --- lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py index 4db8d798b..c0aab4648 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py @@ -1,6 +1,5 @@ from typing import Any -import aiohttp from aiohttp.client import ( ClientResponse, ClientSession, From 748b2294698319c3795e2020a17b9c57aea56727 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Thu, 12 Oct 2023 18:06:45 +0300 Subject: [PATCH 6/7] Fix mypy --- .../dl_file_uploader_lib/yadocs_client.py | 6 ++++-- .../tasks/download_yadocs.py | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py index c0aab4648..a310aacf9 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import Any from aiohttp.client import ( @@ -44,7 +46,7 @@ async def get_spreadsheet_public_ref(self, link: str) -> str: raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) return (await resp.json())["href"] - async def get_spreadsheet_public_meta(self, link: str) -> ClientResponse: + async def get_spreadsheet_public_meta(self, link: str) -> dict[str, Any]: resp = await self.session.get( f"{self.hostname}/public/resources/?public_key={link}", headers=self.headers, @@ -63,7 +65,7 @@ async def get_spreadsheet_private_ref(self, path: str, token: str) -> str: raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json()) return (await resp.json())["href"] - async def get_spreadsheet_private_meta(self, path: str, token: str) -> ClientResponse: + async def get_spreadsheet_private_meta(self, path: str, token: str) -> dict[str, Any]: headers_with_token = self._create_headers_with_token(token) resp = await self.session.get( f"{self.hostname}/resources/?path={path}", diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index 2b39ecedd..818dca1e6 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -1,6 +1,5 @@ from __future__ import annotations - import logging from typing import Optional @@ -8,20 +7,21 @@ import attr from dl_constants.enums import FileProcessingStatus - from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.raw_bytes import ( RawBytesAsyncDataStream, S3RawFileAsyncDataSink, ) - from dl_file_uploader_lib.redis_model.base import RedisModelManager from dl_file_uploader_lib.redis_model.models import ( DataFile, FileProcessingError, YaDocsUserSourceProperties, ) -from dl_file_uploader_lib.yadocs_client import YaDocsClient, yadocs_error_to_file_uploader_exception +from dl_file_uploader_lib.yadocs_client import ( + YaDocsClient, + yadocs_error_to_file_uploader_exception, +) from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface from dl_task_processor.task import ( @@ -87,7 +87,7 @@ async def run(self) -> TaskResult: dfile.filename = spreadsheet_meta["name"] s3 = self._ctx.s3_service - async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024): + async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> None: async with aiohttp.ClientSession() as session: async with session.get(spreadsheet_ref) as resp: if resp.status != 200: From 6f632e371cd29d402d57ced246391293322babb6 Mon Sep 17 00:00:00 2001 From: Valeria Bulanova Date: Thu, 12 Oct 2023 18:18:02 +0300 Subject: [PATCH 7/7] One more mypy fix --- .../dl_file_uploader_lib/yadocs_client.py | 12 +++++++----- .../tasks/download_yadocs.py | 7 +++++-- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py index a310aacf9..435e03908 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py @@ -1,16 +1,18 @@ from __future__ import annotations -from typing import Any - -from aiohttp.client import ( - ClientResponse, - ClientSession, +from typing import ( + Any, + Type, ) +from aiohttp.client import ClientSession + from dl_file_uploader_lib import exc as file_upl_exc def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict) -> file_upl_exc.DLFileUploaderBaseError: + err_cls: Type[file_upl_exc.DLFileUploaderBaseError] + if status_code == 401: err_cls = file_upl_exc.PermissionDenied elif status_code == 404: diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index 818dca1e6..f3fe78b2b 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -1,7 +1,10 @@ from __future__ import annotations import logging -from typing import Optional +from typing import ( + AsyncGenerator, + Optional, +) import aiohttp import attr @@ -87,7 +90,7 @@ async def run(self) -> TaskResult: dfile.filename = spreadsheet_meta["name"] s3 = self._ctx.s3_service - async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> None: + async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]: async with aiohttp.ClientSession() as session: async with session.get(spreadsheet_ref) as resp: if resp.status != 200: