Skip to content

Commit

Permalink
Add file uploader api handler for YaDocs
Browse files Browse the repository at this point in the history
  • Loading branch information
vallbull committed Oct 18, 2023
1 parent dd1c7bf commit 9241ada
Show file tree
Hide file tree
Showing 12 changed files with 293 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def create_app(self, app_version: str) -> web.Application:

app.router.add_route("post", "/api/v2/files", files_views.FilesView)
app.router.add_route("post", "/api/v2/links", files_views.LinksView)
app.router.add_route("post", "/api/v2/documents", files_views.DocumentsView)
app.router.add_route("post", "/api/v2/update_connection_data", files_views.UpdateConnectionDataView)
app.router.add_route(
"post", "/api/v2/update_connection_data_internal", files_views.InternalUpdateConnectionDataView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from dl_file_uploader_lib.exc import InvalidLink
from dl_file_uploader_lib.redis_model.base import RedisModelManager
from dl_file_uploader_lib.redis_model.models import DataFile
from dl_file_uploader_lib.redis_model.models.models import GSheetsUserSourceProperties
from dl_file_uploader_lib.redis_model.models.models import (
GSheetsUserSourceProperties,
YaDocsUserSourceProperties,
)


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,3 +61,39 @@ async def gsheets_data_file_preparer(
)

return df


async def yadocs_data_file_preparer(
oauth_token: Optional[str],
private_path: Optional[str],
public_link: Optional[str],
redis_model_manager: RedisModelManager,
) -> DataFile:
default_host: str = "disk.yandex.ru"
allowed_hosts: frozenset[str] = frozenset({default_host})
example_url: str = "https://disk.yandex.ru/i/OyzdmFI0MUEEgA"
example_url_message: str = f"example: {example_url!r}"
if public_link is not None:
parts = urllib.parse.urlparse(public_link)
if parts.scheme not in ("http", "https"):
raise InvalidLink(f"Invalid url scheme: {parts.scheme!r}, must be 'http' or 'https'; {example_url_message}")
host = parts.hostname
if host not in allowed_hosts:
raise InvalidLink(f"Invalid host: {host!r}; should be {default_host!r}; {example_url_message}")

path = parts.path
prefix = "/i/"
if not path.startswith(prefix):
raise InvalidLink(f"Invalid URL path prefix: {path!r}; should be {prefix!r}; {example_url_message}")

df = DataFile(
manager=redis_model_manager,
filename="YaDocument",
status=FileProcessingStatus.in_progress,
file_type=FileType.yadocs,
user_source_properties=YaDocsUserSourceProperties(
private_path=private_path, public_link=public_link, oauth_token=oauth_token
),
)

return df
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def validate_authorized(data: dict) -> None:
)


def validate_docs_data(data):
if not ((data["public_link"] is None) ^ (data["private_path"] is None)):
raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified")
if data["public_link"] is None:
if data["private_path"] is None or data["oauth_token"] is None:
raise ma.ValidationError("Both path and token must be provided for private files")


class FileLinkRequestSchema(BaseRequestSchema):
type = ma.fields.Enum(FileType, required=True)
url = ma.fields.String(required=True)
Expand All @@ -34,6 +42,17 @@ def validate_object(self, data: dict, **kwargs: Any) -> None:
validate_authorized(data)


class FileDocumentsRequestSchema(BaseRequestSchema):
# connection_id = ma.fields.String(load_default=None, allow_none=True)
private_path = ma.fields.String(load_default=None, allow_none=True)
oauth_token = ma.fields.String(load_default=None, allow_none=True)
public_link = ma.fields.String(load_default=None, allow_none=True)

@ma.validates_schema(skip_on_field_errors=True)
def validate_object(self, data: dict, **kwargs: Any) -> None:
validate_docs_data(data)


class FileUploadResponseSchema(ma.Schema):
file_id = ma.fields.String()
title = ma.fields.String()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
RequiredResourceCommon,
)
from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_api_lib.data_file_preparer import gsheets_data_file_preparer
from dl_file_uploader_api_lib.data_file_preparer import (
gsheets_data_file_preparer,
yadocs_data_file_preparer,
)
from dl_file_uploader_api_lib.schemas import files as files_schemas
from dl_file_uploader_api_lib.views.base import FileUploaderBaseView
from dl_file_uploader_lib.common_locks import get_update_connection_source_lock
Expand All @@ -38,6 +41,7 @@
)
from dl_file_uploader_task_interface.tasks import (
DownloadGSheetTask,
DownloadYaDocsTask,
ParseFileTask,
ProcessExcelTask,
TaskExecutionMode,
Expand Down Expand Up @@ -162,6 +166,56 @@ async def post(self) -> web.StreamResponse:
)


class DocumentsView(FileUploaderBaseView):
REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check

FILE_TYPE_TO_DATA_FILE_PREPARER_MAP: dict[
FileType, Callable[[str, RedisModelManager, Optional[str]], Awaitable[DataFile]]
] = {
FileType.yadocs: yadocs_data_file_preparer,
}

async def post(self) -> web.StreamResponse:
req_data = await self._load_post_request_schema_data(files_schemas.FileDocumentsRequestSchema)

file_type = FileType.yadocs

rmm = self.dl_request.get_redis_model_manager()

oauth_token: Optional[str] = req_data["oauth_token"]
public_link: Optional[str] = req_data["public_link"]
private_path: Optional[str] = req_data["private_path"]

df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type](
oauth_token=oauth_token,
private_path=private_path,
public_link=public_link,
redis_model_manager=rmm,
)

LOGGER.info(f"Data file id: {df.id}")
await df.save()

task_processor = self.dl_request.get_task_processor()
await task_processor.schedule(DownloadYaDocsTask(file_id=df.id))
LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}")

df = await DataFile.get(manager=rmm, obj_id=df.id)
if df.status == FileProcessingStatus.failed:
return web.json_response(
files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}),
status=HTTPStatus.OK,
)

await task_processor.schedule(ProcessExcelTask(file_id=df.id))
LOGGER.info(f"Scheduled ProcessExcelTask for file_id {df.id}")

return web.json_response(
files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}),
status=HTTPStatus.CREATED,
)


class FileStatusView(FileUploaderBaseView):
async def get(self) -> web.StreamResponse:
req_data = files_schemas.FileStatusRequestSchema().load(self.request.match_info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,8 @@ def reader_app(loop, secure_reader):
loop.run_until_complete(runner.setup())
site = aiohttp.web.UnixSite(runner, path=secure_reader.SOCKET)
return loop.run_until_complete(site.start())


@pytest.fixture(scope="session")
def ya_docs_oauth_token(env_param_getter):
return env_param_getter.get_str_value("YA_DOCS_API_KEY")
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
params:
GOOGLE_API_KEY: {getter: $osenv, key: GOOGLE_API_KEY}
YA_DOCS_API_KEY: {getter: $osenv, key: YA_DOCS_API_KEY}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import asyncio
import logging

import pytest

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder
from dl_file_uploader_lib.redis_model.models import DataFile, YaDocsUserSourceProperties, GSheetsUserSourceProperties


LOGGER = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_yadocuments_public_file(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link = "https://disk.yandex.ru/i/ZgabI6zyoYn8IQ"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link))
assert resp.status == 201
file_id = resp.json["file_id"]
assert file_id

await asyncio.sleep(5)

rmm = redis_model_manager
df = await DataFile.get(manager=rmm, obj_id=file_id)
assert isinstance(df, DataFile)
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties)
assert df.status == FileProcessingStatus.ready
assert df.error is None
assert df.user_source_properties.public_link == public_link
assert df.filename == "test public table.xlsx"
for sheet_src in df.sources:
assert sheet_src.status == FileProcessingStatus.ready
assert sheet_src.error is None


@pytest.mark.asyncio
async def test_yadocuments_private_file(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
ya_docs_oauth_token,
):
private_path = "test private table.xlsx"
resp = await fu_client.make_request(
ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token)
)
assert resp.status == 201
file_id = resp.json["file_id"]
assert file_id

await asyncio.sleep(5)

rmm = redis_model_manager
df = await DataFile.get(manager=rmm, obj_id=file_id)
assert isinstance(df, DataFile)
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties)
assert df.status == FileProcessingStatus.ready
assert df.error is None
assert df.user_source_properties.private_path == private_path
assert df.filename == "test private table.xlsx"
for sheet_src in df.sources:
assert sheet_src.status == FileProcessingStatus.ready
assert sheet_src.error is None


@pytest.mark.asyncio
async def test_yadocuments_invalid_link(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_invalid = "https://disk.yandeks.ru/i/ZgabI6zyoYn8IQ"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_invalid, require_ok=False))
assert resp.status == 400
assert resp.json["code"] == "ERR.FILE.INVALID_LINK"


@pytest.mark.asyncio
async def test_yadocuments_not_found(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_nonexisten = "https://disk.yandex.ru/i/fffffffff"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_nonexisten))
assert resp.status == 200
file_id = resp.json["file_id"]

await asyncio.sleep(2)

resp = await fu_client.make_request(ReqBuilder.file_status(file_id))
assert resp.status == 200
assert resp.json["file_id"] == file_id
assert resp.json["status"] == "failed"
assert resp.json["error"]["code"] == "ERR.FILE.NOT_FOUND"


@pytest.mark.asyncio
async def test_documents_unsupported(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_to_docs = "https://disk.yandex.ru/i/ros0GDegLEpyew"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_to_docs))
assert resp.status == 200
file_id = resp.json["file_id"]

await asyncio.sleep(2)

resp = await fu_client.make_request(ReqBuilder.file_status(file_id))
assert resp.status == 200
assert resp.json["file_id"] == file_id
assert resp.json["status"] == "failed"
assert resp.json["error"]["code"] == "ERR.FILE.UNSUPPORTED_DOCUMENT"
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ def upload_gsheet(
require_ok=require_ok,
)

@classmethod
def upload_documents(
cls,
public_link: Optional[str] = None,
private_path: Optional[str] = None,
oauth_token: Optional[str] = None,
*,
require_ok: bool = True,
) -> Req:
return Req(
method="post",
url="/api/v2/documents",
data_json={
"public_link": public_link,
"private_path": private_path,
"oauth_token": oauth_token,
},
require_ok=require_ok,
)

@classmethod
def file_status(cls, file_id: str) -> Req:
return Req(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
from __future__ import annotations


import logging
from typing import Optional

import aiohttp
import attr

from dl_constants.enums import FileProcessingStatus

from dl_file_uploader_lib import exc
from dl_file_uploader_lib.data_sink.raw_bytes import (
RawBytesAsyncDataStream,
S3RawFileAsyncDataSink,
)

from dl_file_uploader_lib.redis_model.base import RedisModelManager
from dl_file_uploader_lib.redis_model.models import (
DataFile,
FileProcessingError,
YaDocsUserSourceProperties,
)
from dl_file_uploader_lib.yadocs_client import YaDocsClient, yadocs_error_to_file_uploader_exception
from dl_file_uploader_lib.yadocs_client import (
YaDocsClient,
yadocs_error_to_file_uploader_exception,
)
from dl_file_uploader_task_interface.context import FileUploaderTaskContext
import dl_file_uploader_task_interface.tasks as task_interface
from dl_task_processor.task import (
Expand Down Expand Up @@ -85,6 +85,10 @@ async def run(self) -> TaskResult:
return Success()

dfile.filename = spreadsheet_meta["name"]
xslx_suffix = ".xlsx"
if not dfile.filename.endswith(xslx_suffix):
raise exc.UnsupportedDocument

s3 = self._ctx.s3_service

async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024):
Expand Down
Loading

0 comments on commit 9241ada

Please sign in to comment.