Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ya docs handler #28

Merged
merged 11 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def create_app(self, app_version: str) -> web.Application:

app.router.add_route("post", "/api/v2/files", files_views.FilesView)
app.router.add_route("post", "/api/v2/links", files_views.LinksView)
app.router.add_route("post", "/api/v2/documents", files_views.DocumentsView)
app.router.add_route("post", "/api/v2/update_connection_data", files_views.UpdateConnectionDataView)
app.router.add_route(
"post", "/api/v2/update_connection_data_internal", files_views.InternalUpdateConnectionDataView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from dl_file_uploader_lib.exc import InvalidLink
from dl_file_uploader_lib.redis_model.base import RedisModelManager
from dl_file_uploader_lib.redis_model.models import DataFile
from dl_file_uploader_lib.redis_model.models.models import GSheetsUserSourceProperties
from dl_file_uploader_lib.redis_model.models.models import (
GSheetsUserSourceProperties,
YaDocsUserSourceProperties,
)


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,3 +61,39 @@ async def gsheets_data_file_preparer(
)

return df


async def yadocs_data_file_preparer(
oauth_token: Optional[str],
private_path: Optional[str],
public_link: Optional[str],
redis_model_manager: RedisModelManager,
) -> DataFile:
default_host: str = "disk.yandex.ru"
allowed_hosts: frozenset[str] = frozenset({default_host})
example_url: str = "https://disk.yandex.ru/i/OyzdmFI0MUEEgA"
example_url_message: str = f"example: {example_url!r}"
if public_link is not None:
parts = urllib.parse.urlparse(public_link)
if parts.scheme not in ("http", "https"):
raise InvalidLink(f"Invalid url scheme: {parts.scheme!r}, must be 'http' or 'https'; {example_url_message}")
host = parts.hostname
if host not in allowed_hosts:
raise InvalidLink(f"Invalid host: {host!r}; should be {default_host!r}; {example_url_message}")

path = parts.path
prefix = "/i/"
if not path.startswith(prefix):
raise InvalidLink(f"Invalid URL path prefix: {path!r}; should be {prefix!r}; {example_url_message}")

df = DataFile(
manager=redis_model_manager,
filename="YaDocument",
status=FileProcessingStatus.in_progress,
file_type=FileType.yadocs,
user_source_properties=YaDocsUserSourceProperties(
private_path=private_path, public_link=public_link, oauth_token=oauth_token
),
)

return df
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def validate_authorized(data: dict) -> None:
)


def validate_docs_data(data):
if not ((data["public_link"] is None) ^ (data["private_path"] is None)):
raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified")
if data["public_link"] is None:
if data["private_path"] is None or data["oauth_token"] is None:
raise ma.ValidationError("Both path and token must be provided for private files")
Comment on lines +29 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, not exactly
If we are working with an existing connection, we won't receive the token and we'll have to use the one saved inside the connection – it is only sent to the backend when the connection is created or the token is updated, we never give secrets back in our responses
(This can be fixed later as a different PR, when we have the pipeline going with actual connections)



class FileLinkRequestSchema(BaseRequestSchema):
type = ma.fields.Enum(FileType, required=True)
url = ma.fields.String(required=True)
Expand All @@ -34,6 +42,17 @@ def validate_object(self, data: dict, **kwargs: Any) -> None:
validate_authorized(data)


class FileDocumentsRequestSchema(BaseRequestSchema):
# connection_id = ma.fields.String(load_default=None, allow_none=True)
private_path = ma.fields.String(load_default=None, allow_none=True)
oauth_token = ma.fields.String(load_default=None, allow_none=True)
public_link = ma.fields.String(load_default=None, allow_none=True)

@ma.validates_schema(skip_on_field_errors=True)
def validate_object(self, data: dict, **kwargs: Any) -> None:
validate_docs_data(data)


class FileUploadResponseSchema(ma.Schema):
file_id = ma.fields.String()
title = ma.fields.String()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
RequiredResourceCommon,
)
from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_api_lib.data_file_preparer import gsheets_data_file_preparer
from dl_file_uploader_api_lib.data_file_preparer import (
gsheets_data_file_preparer,
yadocs_data_file_preparer,
)
from dl_file_uploader_api_lib.schemas import files as files_schemas
from dl_file_uploader_api_lib.views.base import FileUploaderBaseView
from dl_file_uploader_lib.common_locks import get_update_connection_source_lock
Expand All @@ -38,6 +41,7 @@
)
from dl_file_uploader_task_interface.tasks import (
DownloadGSheetTask,
DownloadYaDocsTask,
ParseFileTask,
ProcessExcelTask,
TaskExecutionMode,
Expand Down Expand Up @@ -162,6 +166,56 @@ async def post(self) -> web.StreamResponse:
)


class DocumentsView(FileUploaderBaseView):
REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check

FILE_TYPE_TO_DATA_FILE_PREPARER_MAP: dict[
FileType, Callable[[str, RedisModelManager, Optional[str]], Awaitable[DataFile]]
] = {
FileType.yadocs: yadocs_data_file_preparer,
}

async def post(self) -> web.StreamResponse:
req_data = await self._load_post_request_schema_data(files_schemas.FileDocumentsRequestSchema)

file_type = FileType.yadocs

rmm = self.dl_request.get_redis_model_manager()

oauth_token: Optional[str] = req_data["oauth_token"]
public_link: Optional[str] = req_data["public_link"]
private_path: Optional[str] = req_data["private_path"]

df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type](
oauth_token=oauth_token,
private_path=private_path,
public_link=public_link,
redis_model_manager=rmm,
)

LOGGER.info(f"Data file id: {df.id}")
await df.save()

task_processor = self.dl_request.get_task_processor()
await task_processor.schedule(DownloadYaDocsTask(file_id=df.id))
LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduled DownloadGSheetTask DownloadYaDocsTask


df = await DataFile.get(manager=rmm, obj_id=df.id)
if df.status == FileProcessingStatus.failed:
return web.json_response(
files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}),
status=HTTPStatus.OK,
)
KonstantAnxiety marked this conversation as resolved.
Show resolved Hide resolved

await task_processor.schedule(ProcessExcelTask(file_id=df.id))
LOGGER.info(f"Scheduled ProcessExcelTask for file_id {df.id}")
KonstantAnxiety marked this conversation as resolved.
Show resolved Hide resolved

return web.json_response(
files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}),
status=HTTPStatus.CREATED,
)


class FileStatusView(FileUploaderBaseView):
async def get(self) -> web.StreamResponse:
req_data = files_schemas.FileStatusRequestSchema().load(self.request.match_info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,8 @@ def reader_app(loop, secure_reader):
loop.run_until_complete(runner.setup())
site = aiohttp.web.UnixSite(runner, path=secure_reader.SOCKET)
return loop.run_until_complete(site.start())


@pytest.fixture(scope="session")
def ya_docs_oauth_token(env_param_getter):
return env_param_getter.get_str_value("YA_DOCS_API_KEY")
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
params:
GOOGLE_API_KEY: {getter: $osenv, key: GOOGLE_API_KEY}
YA_DOCS_API_KEY: {getter: $osenv, key: YA_DOCS_API_KEY}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import asyncio
import logging

import pytest

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder
from dl_file_uploader_lib.redis_model.models import DataFile, YaDocsUserSourceProperties, GSheetsUserSourceProperties


LOGGER = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_yadocuments_public_file(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link = "https://disk.yandex.ru/i/ZgabI6zyoYn8IQ"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link))
assert resp.status == 201
file_id = resp.json["file_id"]
assert file_id

await asyncio.sleep(5)

rmm = redis_model_manager
df = await DataFile.get(manager=rmm, obj_id=file_id)
assert isinstance(df, DataFile)
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties)
assert df.status == FileProcessingStatus.ready
assert df.error is None
assert df.user_source_properties.public_link == public_link
assert df.filename == "test public table.xlsx"
for sheet_src in df.sources:
assert sheet_src.status == FileProcessingStatus.ready
assert sheet_src.error is None


@pytest.mark.asyncio
async def test_yadocuments_private_file(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
ya_docs_oauth_token,
):
private_path = "test private table.xlsx"
resp = await fu_client.make_request(
ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token)
)
assert resp.status == 201
file_id = resp.json["file_id"]
assert file_id

await asyncio.sleep(5)

rmm = redis_model_manager
df = await DataFile.get(manager=rmm, obj_id=file_id)
assert isinstance(df, DataFile)
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties)
assert df.status == FileProcessingStatus.ready
assert df.error is None
assert df.user_source_properties.private_path == private_path
assert df.filename == "test private table.xlsx"
for sheet_src in df.sources:
assert sheet_src.status == FileProcessingStatus.ready
assert sheet_src.error is None


@pytest.mark.asyncio
async def test_yadocuments_invalid_link(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_invalid = "https://disk.yandeks.ru/i/ZgabI6zyoYn8IQ"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_invalid, require_ok=False))
assert resp.status == 400
assert resp.json["code"] == "ERR.FILE.INVALID_LINK"


@pytest.mark.asyncio
async def test_yadocuments_not_found(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_nonexisten = "https://disk.yandex.ru/i/fffffffff"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_nonexisten))
assert resp.status == 200
file_id = resp.json["file_id"]

await asyncio.sleep(2)

resp = await fu_client.make_request(ReqBuilder.file_status(file_id))
assert resp.status == 200
assert resp.json["file_id"] == file_id
assert resp.json["status"] == "failed"
assert resp.json["error"]["code"] == "ERR.FILE.NOT_FOUND"


@pytest.mark.asyncio
async def test_documents_unsupported(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_to_docs = "https://disk.yandex.ru/i/ros0GDegLEpyew"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_to_docs))
assert resp.status == 200
file_id = resp.json["file_id"]

await asyncio.sleep(2)

resp = await fu_client.make_request(ReqBuilder.file_status(file_id))
assert resp.status == 200
assert resp.json["file_id"] == file_id
assert resp.json["status"] == "failed"
assert resp.json["error"]["code"] == "ERR.FILE.UNSUPPORTED_DOCUMENT"
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ def upload_gsheet(
require_ok=require_ok,
)

@classmethod
def upload_documents(
cls,
public_link: Optional[str] = None,
private_path: Optional[str] = None,
oauth_token: Optional[str] = None,
*,
require_ok: bool = True,
) -> Req:
return Req(
method="post",
url="/api/v2/documents",
data_json={
"public_link": public_link,
"private_path": private_path,
"oauth_token": oauth_token,
},
require_ok=require_ok,
)

@classmethod
def file_status(cls, file_id: str) -> Req:
return Req(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
GSheetsFileSourceSettings,
GSheetsUserSourceDataSourceProperties,
GSheetsUserSourceProperties,
YaDocsUserSourceProperties,
)
from dl_file_uploader_task_interface.context import FileUploaderTaskContext
import dl_file_uploader_task_interface.tasks as task_interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ async def run(self) -> TaskResult:
return Success()

dfile.filename = spreadsheet_meta["name"]
xslx_suffix = ".xlsx"
if not dfile.filename.endswith(xslx_suffix):
vallbull marked this conversation as resolved.
Show resolved Hide resolved
raise exc.UnsupportedDocument
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's save the exact reason the file was rejected in the error details (in future PRs)
Something like this

raise exc.UnsupportedDocument(details={"reason": f"Supported file extensions are: '.xlsx'. Got {dfile.filename}"})


s3 = self._ctx.s3_service

async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,14 @@ async def run(self) -> TaskResult:
sheet_data_source.error = FileProcessingError.from_exception(exc.EmptyDocument())
sheet_data_source.status = FileProcessingStatus.failed
else:
has_header, raw_schema, raw_schema_header, raw_schema_body = guess_header_and_schema_excel(
sheetdata
)
try:
has_header, raw_schema, raw_schema_header, raw_schema_body = guess_header_and_schema_excel(
sheetdata
)
except Exception as ex:
sheet_data_source.status = FileProcessingStatus.failed
exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.ParseFailed()
sheet_data_source.error = FileProcessingError.from_exception(exc_to_save)
KonstantAnxiety marked this conversation as resolved.
Show resolved Hide resolved
sheet_settings = None

if sheet_data_source.is_applicable:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ParseFileTask(BaseExecutorTask[task_interface.ParseFileTask, FileUploaderT
FileType.csv: CSVFileParser,
FileType.gsheets: SpreadsheetFileParser,
FileType.xlsx: SpreadsheetFileParser,
FileType.yadocs: SpreadsheetFileParser,
}

async def run(self) -> TaskResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ async def test_parse_excel_with_one_row_task(
result = await wait_task(task, task_state)
await sleep(60)

assert result[-1] == "failed"
assert result[-1] == "success"

df = await DataFile.get(manager=rmm, obj_id=uploaded_excel_id)
assert df.status == FileProcessingStatus.failed
assert df.status == FileProcessingStatus.ready
assert df.id == uploaded_excel_id
for src in df.sources:
assert src.status == FileProcessingStatus.failed
Loading