Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BI-4976: Download yadocs task #10

Merged
merged 7 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema):
FileType.csv.name: SourceInfoSchemaBase,
FileType.gsheets.name: SourceInfoSchemaGSheets,
FileType.xlsx.name: SourceInfoSchemaBase,
FileType.yadocs.name: SourceInfoSchemaBase,
}


Expand Down
1 change: 1 addition & 0 deletions lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class FileType(Enum):
csv = "csv"
gsheets = "gsheets"
xlsx = "xlsx"
yadocs = "yadocs"


class CSVEncoding(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
RenameTenantStatusModel,
SourceNotFoundError,
SpreadsheetFileSourceSettings,
YaDocsUserSourceProperties,
)
from .storage_schemas import (
DataFileSchema,
Expand Down Expand Up @@ -46,6 +47,7 @@
"EmptySourcesError",
"RenameTenantStatusModel",
"PreviewSet",
"YaDocsUserSourceProperties",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ def get_secret_keys(self) -> set[DataKey]:
return {DataKey(parts=("refresh_token",))}


@attr.s(init=True, kw_only=True)
class YaDocsUserSourceProperties(UserSourceProperties):
file_type: FileType = attr.ib(default=FileType.yadocs)

private_path: Optional[str] = attr.ib(default=None)
public_link: Optional[str] = attr.ib(default=None)

oauth_token: Optional[str] = attr.ib(default=None, repr=False)

def get_secret_keys(self) -> set[DataKey]:
return {DataKey(parts=("oauth_token",))}


@attr.s(init=True, kw_only=True)
class UserSourceDataSourceProperties:
file_type: FileType = attr.ib()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
GSheetsUserSourceDataSourceProperties,
GSheetsUserSourceProperties,
RenameTenantStatusModel,
YaDocsUserSourceProperties,
)


Expand Down Expand Up @@ -151,9 +152,19 @@ class Meta(BaseSchema.Meta):
spreadsheet_id = fields.String()


class YaDocsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema):
class Meta(BaseSchema.Meta):
target = YaDocsUserSourceProperties

private_path = fields.String(allow_none=True)
public_link = fields.String(allow_none=True)
oauth_token = fields.String(allow_none=True)


class UserSourcePropertiesSchema(FileTypeOneOfSchema):
type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = {
FileType.gsheets.name: GSheetsUserSourcePropertiesSchema,
FileType.yadocs.name: YaDocsUserSourcePropertiesSchema,
}


Expand Down
83 changes: 83 additions & 0 deletions lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

from typing import (
Any,
Type,
)

from aiohttp.client import ClientSession

from dl_file_uploader_lib import exc as file_upl_exc


def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict) -> file_upl_exc.DLFileUploaderBaseError:
err_cls: Type[file_upl_exc.DLFileUploaderBaseError]

if status_code == 401:
err_cls = file_upl_exc.PermissionDenied
elif status_code == 404:
err_cls = file_upl_exc.DocumentNotFound
elif status_code == 400:
err_cls = file_upl_exc.UnsupportedDocument
elif status_code >= 500:
err_cls = file_upl_exc.RemoteServerError
else:
err_cls = file_upl_exc.DLFileUploaderBaseError

return err_cls(
details=resp_info,
)


class YaDocsClient:
headers: dict[str, Any] = {
"Content-Type": "application/json",
"Accept": "application/json",
}
hostname: str = "https://cloud-api.yandex.net/v1/disk"

def __init__(self, session: ClientSession):
self.session = session

async def get_spreadsheet_public_ref(self, link: str) -> str:
resp = await self.session.get(
f"{self.hostname}/public/resources/download/?public_key={link}",
headers=self.headers,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return (await resp.json())["href"]

async def get_spreadsheet_public_meta(self, link: str) -> dict[str, Any]:
resp = await self.session.get(
f"{self.hostname}/public/resources/?public_key={link}",
headers=self.headers,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return await resp.json()

async def get_spreadsheet_private_ref(self, path: str, token: str) -> str:
headers_with_token = self._create_headers_with_token(token)
resp = await self.session.get(
f"{self.hostname}/resources/download/?path={path}",
headers=headers_with_token,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return (await resp.json())["href"]

async def get_spreadsheet_private_meta(self, path: str, token: str) -> dict[str, Any]:
headers_with_token = self._create_headers_with_token(token)
resp = await self.session.get(
f"{self.hostname}/resources/?path={path}",
headers=headers_with_token,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return await resp.json()

def _create_headers_with_token(self, token: str) -> dict[str, Any]:
headers_with_token = self.headers.copy()
headers_with_token.update({"Authorization": "OAuth " + token})
return headers_with_token
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ class DownloadGSheetTask(BaseTaskMeta):
exec_mode: TaskExecutionMode = attr.ib(default=TaskExecutionMode.BASIC)


@attr.s
class DownloadYaDocsTask(BaseTaskMeta):
name = TaskName("download_yadocs")

file_id: str = attr.ib()


@attr.s
class ParseFileTask(BaseTaskMeta):
name = TaskName("parse_file")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
RenameTenantFilesTask,
)
from .delete import DeleteFileTask
from .download import DownloadGSheetTask
from .download_gsheets import DownloadGSheetTask
from .download_yadocs import DownloadYaDocsTask
from .excel import ProcessExcelTask
from .parse import ParseFileTask
from .save import SaveSourceTask
Expand All @@ -16,6 +17,7 @@
REGISTRY: TaskRegistry = TaskRegistry.create(
[
DownloadGSheetTask,
DownloadYaDocsTask,
ParseFileTask,
ProcessExcelTask,
SaveSourceTask,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

import logging
from typing import (
AsyncGenerator,
Optional,
)

import aiohttp
import attr

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_lib import exc
from dl_file_uploader_lib.data_sink.raw_bytes import (
RawBytesAsyncDataStream,
S3RawFileAsyncDataSink,
)
from dl_file_uploader_lib.redis_model.base import RedisModelManager
from dl_file_uploader_lib.redis_model.models import (
DataFile,
FileProcessingError,
YaDocsUserSourceProperties,
)
from dl_file_uploader_lib.yadocs_client import (
YaDocsClient,
yadocs_error_to_file_uploader_exception,
)
from dl_file_uploader_task_interface.context import FileUploaderTaskContext
import dl_file_uploader_task_interface.tasks as task_interface
from dl_task_processor.task import (
BaseExecutorTask,
Fail,
Retry,
Success,
TaskResult,
)


LOGGER = logging.getLogger(__name__)


@attr.s
class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]):
cls_meta = task_interface.DownloadYaDocsTask

async def run(self) -> TaskResult:
dfile: Optional[DataFile] = None
redis = self._ctx.redis_service.get_redis()

try:
rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config)
dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id)
assert dfile is not None

assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties)

async with aiohttp.ClientSession() as session:
yadocs_client = YaDocsClient(session)
try:
if dfile.user_source_properties.public_link is not None:
spreadsheet_ref = await yadocs_client.get_spreadsheet_public_ref(
link=dfile.user_source_properties.public_link
)
spreadsheet_meta = await yadocs_client.get_spreadsheet_public_meta(
link=dfile.user_source_properties.public_link
)

elif (
dfile.user_source_properties.private_path is not None
and dfile.user_source_properties.oauth_token is not None
):
spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref(
path=dfile.user_source_properties.private_path,
token=dfile.user_source_properties.oauth_token,
)
spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta(
path=dfile.user_source_properties.private_path,
token=dfile.user_source_properties.oauth_token,
)
else:
raise exc.DLFileUploaderBaseError()
except exc.DLFileUploaderBaseError as e:
LOGGER.exception(e)
download_error = FileProcessingError.from_exception(e)
dfile.status = FileProcessingStatus.failed
dfile.error = download_error
await dfile.save()
return Success()

dfile.filename = spreadsheet_meta["name"]
s3 = self._ctx.s3_service

async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]:
async with aiohttp.ClientSession() as session:
async with session.get(spreadsheet_ref) as resp:
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
while True:
chunk = await resp.content.read(chunk_size)
if chunk:
LOGGER.debug(f"Received chunk of {len(chunk)} bytes.")
yield chunk
else:
LOGGER.info("Empty chunk received.")
break

data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter())
async with S3RawFileAsyncDataSink(
s3=s3.client,
s3_key=dfile.s3_key,
bucket_name=s3.tmp_bucket_name,
) as data_sink:
await data_sink.dump_data_stream(data_stream)

await dfile.save()
LOGGER.info(f'Uploaded file "{dfile.filename}".')

except Exception as ex:
LOGGER.exception(ex)
if dfile is None:
return Retry(attempts=3)
else:
dfile.status = FileProcessingStatus.failed
exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed()
dfile.error = FileProcessingError.from_exception(exc_to_save)
await dfile.save()

return Fail()
return Success()
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_lib.enums import FileType
from dl_file_uploader_lib.redis_model.models import (
DataFile,
YaDocsUserSourceProperties,
)
from dl_file_uploader_task_interface.tasks import DownloadYaDocsTask
from dl_task_processor.state import wait_task
from dl_testing.s3_utils import s3_file_exists


@pytest.mark.asyncio
async def test_download_yadocs_task(
task_processor_client,
task_state,
s3_client,
redis_model_manager,
s3_tmp_bucket,
):
df = DataFile(
filename="",
file_type=FileType.yadocs,
manager=redis_model_manager,
status=FileProcessingStatus.in_progress,
user_source_properties=YaDocsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"),
)
await df.save()

task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id))
result = await wait_task(task, task_state)

assert result[-1] == "success"
assert await s3_file_exists(key=df.s3_key, bucket=s3_tmp_bucket, s3_client=s3_client)
Loading