Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ya docs handler #28

Merged
merged 11 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def create_app(self, app_version: str) -> web.Application:

app.router.add_route("post", "/api/v2/files", files_views.FilesView)
app.router.add_route("post", "/api/v2/links", files_views.LinksView)
app.router.add_route("post", "/api/v2/documents", files_views.DocumentsView)
app.router.add_route("post", "/api/v2/update_connection_data", files_views.UpdateConnectionDataView)
app.router.add_route(
"post", "/api/v2/update_connection_data_internal", files_views.InternalUpdateConnectionDataView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from dl_file_uploader_lib.exc import InvalidLink
from dl_file_uploader_lib.redis_model.base import RedisModelManager
from dl_file_uploader_lib.redis_model.models import DataFile
from dl_file_uploader_lib.redis_model.models.models import GSheetsUserSourceProperties
from dl_file_uploader_lib.redis_model.models.models import (
GSheetsUserSourceProperties,
YaDocsUserSourceProperties,
)


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,3 +61,39 @@ async def gsheets_data_file_preparer(
)

return df


async def yadocs_data_file_preparer(
oauth_token: Optional[str],
private_path: Optional[str],
public_link: Optional[str],
redis_model_manager: RedisModelManager,
) -> DataFile:
default_host: str = "disk.yandex.ru"
allowed_hosts: frozenset[str] = frozenset({default_host})
example_url: str = "https://disk.yandex.ru/i/OyzdmFI0MUEEgA"
example_url_message: str = f"example: {example_url!r}"
if public_link is not None:
parts = urllib.parse.urlparse(public_link)
if parts.scheme not in ("http", "https"):
raise InvalidLink(f"Invalid url scheme: {parts.scheme!r}, must be 'http' or 'https'; {example_url_message}")
host = parts.hostname
if host not in allowed_hosts:
raise InvalidLink(f"Invalid host: {host!r}; should be {default_host!r}; {example_url_message}")

path = parts.path
prefix = "/i/"
if not path.startswith(prefix):
raise InvalidLink(f"Invalid URL path prefix: {path!r}; should be {prefix!r}; {example_url_message}")

df = DataFile(
manager=redis_model_manager,
filename="YaDocument",
status=FileProcessingStatus.in_progress,
file_type=FileType.yadocs,
user_source_properties=YaDocsUserSourceProperties(
private_path=private_path, public_link=public_link, oauth_token=oauth_token
),
)

return df
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def validate_authorized(data: dict) -> None:
)


def validate_docs_data(data):
if not ((data["public_link"] is None) ^ (data["private_path"] is None)):
raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified")
if data["public_link"] is None:
if data["private_path"] is None or data["oauth_token"] is None:
raise ma.ValidationError("Both path and token must be provided for private files")
Comment on lines +29 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, not exactly
If we are working with an existing connection, we won't receive the token and we'll have to use the one saved inside the connection – it is only sent to the backend when the connection is created or the token is updated, we never give secrets back in our responses
(This can be fixed later as a different PR, when we have the pipeline going with actual connections)



class FileLinkRequestSchema(BaseRequestSchema):
type = ma.fields.Enum(FileType, required=True)
url = ma.fields.String(required=True)
Expand All @@ -34,6 +42,17 @@ def validate_object(self, data: dict, **kwargs: Any) -> None:
validate_authorized(data)


class FileDocumentsRequestSchema(BaseRequestSchema):
# connection_id = ma.fields.String(load_default=None, allow_none=True)
private_path = ma.fields.String(load_default=None, allow_none=True)
oauth_token = ma.fields.String(load_default=None, allow_none=True)
public_link = ma.fields.String(load_default=None, allow_none=True)

@ma.validates_schema(skip_on_field_errors=True)
def validate_object(self, data: dict, **kwargs: Any) -> None:
validate_docs_data(data)


class FileUploadResponseSchema(ma.Schema):
file_id = ma.fields.String()
title = ma.fields.String()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
RequiredResourceCommon,
)
from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_api_lib.data_file_preparer import gsheets_data_file_preparer
from dl_file_uploader_api_lib.data_file_preparer import (
gsheets_data_file_preparer,
yadocs_data_file_preparer,
)
from dl_file_uploader_api_lib.schemas import files as files_schemas
from dl_file_uploader_api_lib.views.base import FileUploaderBaseView
from dl_file_uploader_lib.common_locks import get_update_connection_source_lock
Expand All @@ -38,6 +41,7 @@
)
from dl_file_uploader_task_interface.tasks import (
DownloadGSheetTask,
DownloadYaDocsTask,
ParseFileTask,
ProcessExcelTask,
TaskExecutionMode,
Expand Down Expand Up @@ -162,6 +166,46 @@ async def post(self) -> web.StreamResponse:
)


class DocumentsView(FileUploaderBaseView):
REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check

FILE_TYPE_TO_DATA_FILE_PREPARER_MAP: dict[
FileType, Callable[[str, RedisModelManager, Optional[str]], Awaitable[DataFile]]
] = {
FileType.yadocs: yadocs_data_file_preparer,
}

async def post(self) -> web.StreamResponse:
req_data = await self._load_post_request_schema_data(files_schemas.FileDocumentsRequestSchema)

file_type = FileType.yadocs

rmm = self.dl_request.get_redis_model_manager()

oauth_token: Optional[str] = req_data["oauth_token"]
public_link: Optional[str] = req_data["public_link"]
private_path: Optional[str] = req_data["private_path"]

df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type](
oauth_token=oauth_token,
private_path=private_path,
public_link=public_link,
redis_model_manager=rmm,
)

LOGGER.info(f"Data file id: {df.id}")
await df.save()

task_processor = self.dl_request.get_task_processor()
await task_processor.schedule(DownloadYaDocsTask(file_id=df.id))
LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduled DownloadGSheetTask DownloadYaDocsTask


return web.json_response(
files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}),
status=HTTPStatus.CREATED,
)


class FileStatusView(FileUploaderBaseView):
async def get(self) -> web.StreamResponse:
req_data = files_schemas.FileStatusRequestSchema().load(self.request.match_info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,8 @@ def reader_app(loop, secure_reader):
loop.run_until_complete(runner.setup())
site = aiohttp.web.UnixSite(runner, path=secure_reader.SOCKET)
return loop.run_until_complete(site.start())


@pytest.fixture(scope="session")
def ya_docs_oauth_token(env_param_getter):
return env_param_getter.get_str_value("YA_DOCS_API_KEY")
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
params:
GOOGLE_API_KEY: {getter: $osenv, key: GOOGLE_API_KEY}
YA_DOCS_API_KEY: {getter: $osenv, key: YA_DOCS_API_KEY}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import asyncio
import logging

import pytest

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder
from dl_file_uploader_lib.redis_model.models import (
DataFile,
YaDocsUserSourceProperties,
)


LOGGER = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_yadocuments_public_file(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link = "https://disk.yandex.ru/i/ZgabI6zyoYn8IQ"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link))
assert resp.status == 201
file_id = resp.json["file_id"]
assert file_id

await asyncio.sleep(5)

rmm = redis_model_manager
df = await DataFile.get(manager=rmm, obj_id=file_id)
assert isinstance(df, DataFile)
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties)
assert df.status == FileProcessingStatus.ready
assert df.error is None
assert df.user_source_properties.public_link == public_link
assert df.filename == "test public table.xlsx"
for sheet_src in df.sources:
assert sheet_src.status == FileProcessingStatus.ready
assert sheet_src.error is None


@pytest.mark.asyncio
async def test_yadocuments_private_file(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
ya_docs_oauth_token,
):
private_path = "test private table.xlsx"
resp = await fu_client.make_request(
ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token)
)
assert resp.status == 201
file_id = resp.json["file_id"]
assert file_id

await asyncio.sleep(5)

rmm = redis_model_manager
df = await DataFile.get(manager=rmm, obj_id=file_id)
assert isinstance(df, DataFile)
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties)
assert df.status == FileProcessingStatus.ready
assert df.error is None
assert df.user_source_properties.private_path == private_path
assert df.filename == "test private table.xlsx"
for sheet_src in df.sources:
assert sheet_src.status == FileProcessingStatus.ready
assert sheet_src.error is None


@pytest.mark.asyncio
async def test_yadocuments_invalid_link(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_invalid = "https://disk.yandeks.ru/i/ZgabI6zyoYn8IQ"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_invalid, require_ok=False))
assert resp.status == 400
assert resp.json["code"] == "ERR.FILE.INVALID_LINK"


@pytest.mark.asyncio
async def test_yadocuments_not_found(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_nonexisten = "https://disk.yandex.ru/i/fffffffff"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_nonexisten))
assert resp.status == 201
file_id = resp.json["file_id"]

await asyncio.sleep(2)

resp = await fu_client.make_request(ReqBuilder.file_status(file_id))
assert resp.status == 200
assert resp.json["file_id"] == file_id
assert resp.json["status"] == "failed"
assert resp.json["error"]["code"] == "ERR.FILE.NOT_FOUND"


@pytest.mark.asyncio
async def test_documents_unsupported(
fu_client,
s3_client,
s3_tmp_bucket,
redis_model_manager,
reader_app,
):
public_link_to_docs = "https://disk.yandex.ru/i/ros0GDegLEpyew"
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_to_docs))
assert resp.status == 201
file_id = resp.json["file_id"]

await asyncio.sleep(2)

resp = await fu_client.make_request(ReqBuilder.file_status(file_id))
assert resp.status == 200
assert resp.json["file_id"] == file_id
assert resp.json["status"] == "failed"
assert resp.json["error"]["code"] == "ERR.FILE.UNSUPPORTED_DOCUMENT"
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ def upload_gsheet(
require_ok=require_ok,
)

@classmethod
def upload_documents(
cls,
public_link: Optional[str] = None,
private_path: Optional[str] = None,
oauth_token: Optional[str] = None,
*,
require_ok: bool = True,
) -> Req:
return Req(
method="post",
url="/api/v2/documents",
data_json={
"public_link": public_link,
"private_path": private_path,
"oauth_token": oauth_token,
},
require_ok=require_ok,
)

@classmethod
def file_status(cls, file_id: str) -> Req:
return Req(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, Fil
async def run(self) -> TaskResult:
dfile: Optional[DataFile] = None
redis = self._ctx.redis_service.get_redis()
task_processor = self._ctx.make_task_processor(self._request_id)

try:
rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config)
Expand Down Expand Up @@ -88,6 +89,10 @@ async def run(self) -> TaskResult:
return Success()

dfile.filename = spreadsheet_meta["name"]
xlsx_suffix = ".xlsx"
if not dfile.filename.endswith(xlsx_suffix):
raise exc.UnsupportedDocument
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's save the exact reason the file was rejected in the error details (in future PRs)
Something like this

raise exc.UnsupportedDocument(details={"reason": f"Supported file extensions are: '.xlsx'. Got {dfile.filename}"})


s3 = self._ctx.s3_service

async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]:
Expand Down Expand Up @@ -115,6 +120,9 @@ async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[byte
await dfile.save()
LOGGER.info(f'Uploaded file "{dfile.filename}".')

await task_processor.schedule(task_interface.ProcessExcelTask(file_id=dfile.id))
LOGGER.info(f"Scheduled ProcessExcelTask for file_id {dfile.id}")

except Exception as ex:
LOGGER.exception(ex)
if dfile is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,14 @@ async def run(self) -> TaskResult:
sheet_data_source.error = FileProcessingError.from_exception(exc.EmptyDocument())
sheet_data_source.status = FileProcessingStatus.failed
else:
has_header, raw_schema, raw_schema_header, raw_schema_body = guess_header_and_schema_excel(
sheetdata
)
try:
has_header, raw_schema, raw_schema_header, raw_schema_body = guess_header_and_schema_excel(
sheetdata
)
except Exception as ex:
sheet_data_source.status = FileProcessingStatus.failed
exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.ParseFailed()
sheet_data_source.error = FileProcessingError.from_exception(exc_to_save)
KonstantAnxiety marked this conversation as resolved.
Show resolved Hide resolved
sheet_settings = None

if sheet_data_source.is_applicable:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ParseFileTask(BaseExecutorTask[task_interface.ParseFileTask, FileUploaderT
FileType.csv: CSVFileParser,
FileType.gsheets: SpreadsheetFileParser,
FileType.xlsx: SpreadsheetFileParser,
FileType.yadocs: SpreadsheetFileParser,
}

async def run(self) -> TaskResult:
Expand Down
Loading
Loading