Skip to content

Commit

Permalink
BI-4976: Download yadocs task (#10)
Browse files Browse the repository at this point in the history
* Download yadocs task

* Download yadocs task

* Fix issues

* Small type annotation fix

* Small unused import fix

* Fix mypy

* One more mypy fix

---------

Co-authored-by: Valeria Bulanova <[email protected]>
  • Loading branch information
vallbull and vallbull authored Oct 13, 2023
1 parent 79cb89c commit 610a0ef
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema):
FileType.csv.name: SourceInfoSchemaBase,
FileType.gsheets.name: SourceInfoSchemaGSheets,
FileType.xlsx.name: SourceInfoSchemaBase,
FileType.yadocs.name: SourceInfoSchemaBase,
}


Expand Down
1 change: 1 addition & 0 deletions lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class FileType(Enum):
csv = "csv"
gsheets = "gsheets"
xlsx = "xlsx"
yadocs = "yadocs"


class CSVEncoding(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
RenameTenantStatusModel,
SourceNotFoundError,
SpreadsheetFileSourceSettings,
YaDocsUserSourceProperties,
)
from .storage_schemas import (
DataFileSchema,
Expand Down Expand Up @@ -46,6 +47,7 @@
"EmptySourcesError",
"RenameTenantStatusModel",
"PreviewSet",
"YaDocsUserSourceProperties",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ def get_secret_keys(self) -> set[DataKey]:
return {DataKey(parts=("refresh_token",))}


@attr.s(init=True, kw_only=True)
class YaDocsUserSourceProperties(UserSourceProperties):
file_type: FileType = attr.ib(default=FileType.yadocs)

private_path: Optional[str] = attr.ib(default=None)
public_link: Optional[str] = attr.ib(default=None)

oauth_token: Optional[str] = attr.ib(default=None, repr=False)

def get_secret_keys(self) -> set[DataKey]:
return {DataKey(parts=("oauth_token",))}


@attr.s(init=True, kw_only=True)
class UserSourceDataSourceProperties:
file_type: FileType = attr.ib()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
GSheetsUserSourceDataSourceProperties,
GSheetsUserSourceProperties,
RenameTenantStatusModel,
YaDocsUserSourceProperties,
)


Expand Down Expand Up @@ -151,9 +152,19 @@ class Meta(BaseSchema.Meta):
spreadsheet_id = fields.String()


class YaDocsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema):
class Meta(BaseSchema.Meta):
target = YaDocsUserSourceProperties

private_path = fields.String(allow_none=True)
public_link = fields.String(allow_none=True)
oauth_token = fields.String(allow_none=True)


class UserSourcePropertiesSchema(FileTypeOneOfSchema):
type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = {
FileType.gsheets.name: GSheetsUserSourcePropertiesSchema,
FileType.yadocs.name: YaDocsUserSourcePropertiesSchema,
}


Expand Down
83 changes: 83 additions & 0 deletions lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

from typing import (
Any,
Type,
)

from aiohttp.client import ClientSession

from dl_file_uploader_lib import exc as file_upl_exc


def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict) -> file_upl_exc.DLFileUploaderBaseError:
err_cls: Type[file_upl_exc.DLFileUploaderBaseError]

if status_code == 401:
err_cls = file_upl_exc.PermissionDenied
elif status_code == 404:
err_cls = file_upl_exc.DocumentNotFound
elif status_code == 400:
err_cls = file_upl_exc.UnsupportedDocument
elif status_code >= 500:
err_cls = file_upl_exc.RemoteServerError
else:
err_cls = file_upl_exc.DLFileUploaderBaseError

return err_cls(
details=resp_info,
)


class YaDocsClient:
headers: dict[str, Any] = {
"Content-Type": "application/json",
"Accept": "application/json",
}
hostname: str = "https://cloud-api.yandex.net/v1/disk"

def __init__(self, session: ClientSession):
self.session = session

async def get_spreadsheet_public_ref(self, link: str) -> str:
resp = await self.session.get(
f"{self.hostname}/public/resources/download/?public_key={link}",
headers=self.headers,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return (await resp.json())["href"]

async def get_spreadsheet_public_meta(self, link: str) -> dict[str, Any]:
resp = await self.session.get(
f"{self.hostname}/public/resources/?public_key={link}",
headers=self.headers,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return await resp.json()

async def get_spreadsheet_private_ref(self, path: str, token: str) -> str:
headers_with_token = self._create_headers_with_token(token)
resp = await self.session.get(
f"{self.hostname}/resources/download/?path={path}",
headers=headers_with_token,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return (await resp.json())["href"]

async def get_spreadsheet_private_meta(self, path: str, token: str) -> dict[str, Any]:
headers_with_token = self._create_headers_with_token(token)
resp = await self.session.get(
f"{self.hostname}/resources/?path={path}",
headers=headers_with_token,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return await resp.json()

def _create_headers_with_token(self, token: str) -> dict[str, Any]:
headers_with_token = self.headers.copy()
headers_with_token.update({"Authorization": "OAuth " + token})
return headers_with_token
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ class DownloadGSheetTask(BaseTaskMeta):
exec_mode: TaskExecutionMode = attr.ib(default=TaskExecutionMode.BASIC)


@attr.s
class DownloadYaDocsTask(BaseTaskMeta):
name = TaskName("download_yadocs")

file_id: str = attr.ib()


@attr.s
class ParseFileTask(BaseTaskMeta):
name = TaskName("parse_file")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
RenameTenantFilesTask,
)
from .delete import DeleteFileTask
from .download import DownloadGSheetTask
from .download_gsheets import DownloadGSheetTask
from .download_yadocs import DownloadYaDocsTask
from .excel import ProcessExcelTask
from .parse import ParseFileTask
from .save import SaveSourceTask
Expand All @@ -16,6 +17,7 @@
REGISTRY: TaskRegistry = TaskRegistry.create(
[
DownloadGSheetTask,
DownloadYaDocsTask,
ParseFileTask,
ProcessExcelTask,
SaveSourceTask,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

import logging
from typing import (
AsyncGenerator,
Optional,
)

import aiohttp
import attr

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_lib import exc
from dl_file_uploader_lib.data_sink.raw_bytes import (
RawBytesAsyncDataStream,
S3RawFileAsyncDataSink,
)
from dl_file_uploader_lib.redis_model.base import RedisModelManager
from dl_file_uploader_lib.redis_model.models import (
DataFile,
FileProcessingError,
YaDocsUserSourceProperties,
)
from dl_file_uploader_lib.yadocs_client import (
YaDocsClient,
yadocs_error_to_file_uploader_exception,
)
from dl_file_uploader_task_interface.context import FileUploaderTaskContext
import dl_file_uploader_task_interface.tasks as task_interface
from dl_task_processor.task import (
BaseExecutorTask,
Fail,
Retry,
Success,
TaskResult,
)


LOGGER = logging.getLogger(__name__)


@attr.s
class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]):
cls_meta = task_interface.DownloadYaDocsTask

async def run(self) -> TaskResult:
dfile: Optional[DataFile] = None
redis = self._ctx.redis_service.get_redis()

try:
rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config)
dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id)
assert dfile is not None

assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties)

async with aiohttp.ClientSession() as session:
yadocs_client = YaDocsClient(session)
try:
if dfile.user_source_properties.public_link is not None:
spreadsheet_ref = await yadocs_client.get_spreadsheet_public_ref(
link=dfile.user_source_properties.public_link
)
spreadsheet_meta = await yadocs_client.get_spreadsheet_public_meta(
link=dfile.user_source_properties.public_link
)

elif (
dfile.user_source_properties.private_path is not None
and dfile.user_source_properties.oauth_token is not None
):
spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref(
path=dfile.user_source_properties.private_path,
token=dfile.user_source_properties.oauth_token,
)
spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta(
path=dfile.user_source_properties.private_path,
token=dfile.user_source_properties.oauth_token,
)
else:
raise exc.DLFileUploaderBaseError()
except exc.DLFileUploaderBaseError as e:
LOGGER.exception(e)
download_error = FileProcessingError.from_exception(e)
dfile.status = FileProcessingStatus.failed
dfile.error = download_error
await dfile.save()
return Success()

dfile.filename = spreadsheet_meta["name"]
s3 = self._ctx.s3_service

async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]:
async with aiohttp.ClientSession() as session:
async with session.get(spreadsheet_ref) as resp:
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
while True:
chunk = await resp.content.read(chunk_size)
if chunk:
LOGGER.debug(f"Received chunk of {len(chunk)} bytes.")
yield chunk
else:
LOGGER.info("Empty chunk received.")
break

data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter())
async with S3RawFileAsyncDataSink(
s3=s3.client,
s3_key=dfile.s3_key,
bucket_name=s3.tmp_bucket_name,
) as data_sink:
await data_sink.dump_data_stream(data_stream)

await dfile.save()
LOGGER.info(f'Uploaded file "{dfile.filename}".')

except Exception as ex:
LOGGER.exception(ex)
if dfile is None:
return Retry(attempts=3)
else:
dfile.status = FileProcessingStatus.failed
exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed()
dfile.error = FileProcessingError.from_exception(exc_to_save)
await dfile.save()

return Fail()
return Success()
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_lib.enums import FileType
from dl_file_uploader_lib.redis_model.models import (
DataFile,
YaDocsUserSourceProperties,
)
from dl_file_uploader_task_interface.tasks import DownloadYaDocsTask
from dl_task_processor.state import wait_task
from dl_testing.s3_utils import s3_file_exists


@pytest.mark.asyncio
async def test_download_yadocs_task(
task_processor_client,
task_state,
s3_client,
redis_model_manager,
s3_tmp_bucket,
):
df = DataFile(
filename="",
file_type=FileType.yadocs,
manager=redis_model_manager,
status=FileProcessingStatus.in_progress,
user_source_properties=YaDocsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"),
)
await df.save()

task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id))
result = await wait_task(task, task_state)

assert result[-1] == "success"
assert await s3_file_exists(key=df.s3_key, bucket=s3_tmp_bucket, s3_client=s3_client)

0 comments on commit 610a0ef

Please sign in to comment.