-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ya docs handler #28
Ya docs handler #28
Changes from all commits
a5c2566
cb5661d
03f0130
18ea1ba
5f56b57
8a368e7
b994a73
f82bc38
e18b335
73dae6f
a790345
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,10 @@ | |
RequiredResourceCommon, | ||
) | ||
from dl_constants.enums import FileProcessingStatus | ||
from dl_file_uploader_api_lib.data_file_preparer import gsheets_data_file_preparer | ||
from dl_file_uploader_api_lib.data_file_preparer import ( | ||
gsheets_data_file_preparer, | ||
yadocs_data_file_preparer, | ||
) | ||
from dl_file_uploader_api_lib.schemas import files as files_schemas | ||
from dl_file_uploader_api_lib.views.base import FileUploaderBaseView | ||
from dl_file_uploader_lib.common_locks import get_update_connection_source_lock | ||
|
@@ -38,6 +41,7 @@ | |
) | ||
from dl_file_uploader_task_interface.tasks import ( | ||
DownloadGSheetTask, | ||
DownloadYaDocsTask, | ||
ParseFileTask, | ||
ProcessExcelTask, | ||
TaskExecutionMode, | ||
|
@@ -162,6 +166,46 @@ async def post(self) -> web.StreamResponse: | |
) | ||
|
||
|
||
class DocumentsView(FileUploaderBaseView): | ||
REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check | ||
|
||
FILE_TYPE_TO_DATA_FILE_PREPARER_MAP: dict[ | ||
FileType, Callable[[str, RedisModelManager, Optional[str]], Awaitable[DataFile]] | ||
] = { | ||
FileType.yadocs: yadocs_data_file_preparer, | ||
} | ||
|
||
async def post(self) -> web.StreamResponse: | ||
req_data = await self._load_post_request_schema_data(files_schemas.FileDocumentsRequestSchema) | ||
|
||
file_type = FileType.yadocs | ||
|
||
rmm = self.dl_request.get_redis_model_manager() | ||
|
||
oauth_token: Optional[str] = req_data["oauth_token"] | ||
public_link: Optional[str] = req_data["public_link"] | ||
private_path: Optional[str] = req_data["private_path"] | ||
|
||
df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type]( | ||
oauth_token=oauth_token, | ||
private_path=private_path, | ||
public_link=public_link, | ||
redis_model_manager=rmm, | ||
) | ||
|
||
LOGGER.info(f"Data file id: {df.id}") | ||
await df.save() | ||
|
||
task_processor = self.dl_request.get_task_processor() | ||
await task_processor.schedule(DownloadYaDocsTask(file_id=df.id)) | ||
LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scheduled |
||
|
||
return web.json_response( | ||
files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}), | ||
status=HTTPStatus.CREATED, | ||
) | ||
|
||
|
||
class FileStatusView(FileUploaderBaseView): | ||
async def get(self) -> web.StreamResponse: | ||
req_data = files_schemas.FileStatusRequestSchema().load(self.request.match_info) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
params: | ||
GOOGLE_API_KEY: {getter: $osenv, key: GOOGLE_API_KEY} | ||
YA_DOCS_API_KEY: {getter: $osenv, key: YA_DOCS_API_KEY} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import asyncio | ||
import logging | ||
|
||
import pytest | ||
|
||
from dl_constants.enums import FileProcessingStatus | ||
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder | ||
from dl_file_uploader_lib.redis_model.models import ( | ||
DataFile, | ||
YaDocsUserSourceProperties, | ||
) | ||
|
||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_yadocuments_public_file( | ||
fu_client, | ||
s3_client, | ||
s3_tmp_bucket, | ||
redis_model_manager, | ||
reader_app, | ||
): | ||
public_link = "https://disk.yandex.ru/i/ZgabI6zyoYn8IQ" | ||
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link)) | ||
assert resp.status == 201 | ||
file_id = resp.json["file_id"] | ||
assert file_id | ||
|
||
await asyncio.sleep(5) | ||
|
||
rmm = redis_model_manager | ||
df = await DataFile.get(manager=rmm, obj_id=file_id) | ||
assert isinstance(df, DataFile) | ||
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties) | ||
assert df.status == FileProcessingStatus.ready | ||
assert df.error is None | ||
assert df.user_source_properties.public_link == public_link | ||
assert df.filename == "test public table.xlsx" | ||
for sheet_src in df.sources: | ||
assert sheet_src.status == FileProcessingStatus.ready | ||
assert sheet_src.error is None | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_yadocuments_private_file( | ||
fu_client, | ||
s3_client, | ||
s3_tmp_bucket, | ||
redis_model_manager, | ||
reader_app, | ||
ya_docs_oauth_token, | ||
): | ||
private_path = "test private table.xlsx" | ||
resp = await fu_client.make_request( | ||
ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token) | ||
) | ||
assert resp.status == 201 | ||
file_id = resp.json["file_id"] | ||
assert file_id | ||
|
||
await asyncio.sleep(5) | ||
|
||
rmm = redis_model_manager | ||
df = await DataFile.get(manager=rmm, obj_id=file_id) | ||
assert isinstance(df, DataFile) | ||
assert isinstance(df.user_source_properties, YaDocsUserSourceProperties) | ||
assert df.status == FileProcessingStatus.ready | ||
assert df.error is None | ||
assert df.user_source_properties.private_path == private_path | ||
assert df.filename == "test private table.xlsx" | ||
for sheet_src in df.sources: | ||
assert sheet_src.status == FileProcessingStatus.ready | ||
assert sheet_src.error is None | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_yadocuments_invalid_link( | ||
fu_client, | ||
s3_client, | ||
s3_tmp_bucket, | ||
redis_model_manager, | ||
reader_app, | ||
): | ||
public_link_invalid = "https://disk.yandeks.ru/i/ZgabI6zyoYn8IQ" | ||
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_invalid, require_ok=False)) | ||
assert resp.status == 400 | ||
assert resp.json["code"] == "ERR.FILE.INVALID_LINK" | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_yadocuments_not_found( | ||
fu_client, | ||
s3_client, | ||
s3_tmp_bucket, | ||
redis_model_manager, | ||
reader_app, | ||
): | ||
public_link_nonexisten = "https://disk.yandex.ru/i/fffffffff" | ||
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_nonexisten)) | ||
assert resp.status == 201 | ||
file_id = resp.json["file_id"] | ||
|
||
await asyncio.sleep(2) | ||
|
||
resp = await fu_client.make_request(ReqBuilder.file_status(file_id)) | ||
assert resp.status == 200 | ||
assert resp.json["file_id"] == file_id | ||
assert resp.json["status"] == "failed" | ||
assert resp.json["error"]["code"] == "ERR.FILE.NOT_FOUND" | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_documents_unsupported( | ||
fu_client, | ||
s3_client, | ||
s3_tmp_bucket, | ||
redis_model_manager, | ||
reader_app, | ||
): | ||
public_link_to_docs = "https://disk.yandex.ru/i/ros0GDegLEpyew" | ||
resp = await fu_client.make_request(ReqBuilder.upload_documents(public_link=public_link_to_docs)) | ||
assert resp.status == 201 | ||
file_id = resp.json["file_id"] | ||
|
||
await asyncio.sleep(2) | ||
|
||
resp = await fu_client.make_request(ReqBuilder.file_status(file_id)) | ||
assert resp.status == 200 | ||
assert resp.json["file_id"] == file_id | ||
assert resp.json["status"] == "failed" | ||
assert resp.json["error"]["code"] == "ERR.FILE.UNSUPPORTED_DOCUMENT" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, Fil | |
async def run(self) -> TaskResult: | ||
dfile: Optional[DataFile] = None | ||
redis = self._ctx.redis_service.get_redis() | ||
task_processor = self._ctx.make_task_processor(self._request_id) | ||
|
||
try: | ||
rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) | ||
|
@@ -88,6 +89,10 @@ async def run(self) -> TaskResult: | |
return Success() | ||
|
||
dfile.filename = spreadsheet_meta["name"] | ||
xlsx_suffix = ".xlsx" | ||
if not dfile.filename.endswith(xlsx_suffix): | ||
raise exc.UnsupportedDocument | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's save the exact reason the file was rejected in the error details (in future PRs) raise exc.UnsupportedDocument(details={"reason": f"Supported file extensions are: '.xlsx'. Got {dfile.filename}"}) |
||
|
||
s3 = self._ctx.s3_service | ||
|
||
async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]: | ||
|
@@ -115,6 +120,9 @@ async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[byte | |
await dfile.save() | ||
LOGGER.info(f'Uploaded file "{dfile.filename}".') | ||
|
||
await task_processor.schedule(task_interface.ProcessExcelTask(file_id=dfile.id)) | ||
LOGGER.info(f"Scheduled ProcessExcelTask for file_id {dfile.id}") | ||
|
||
except Exception as ex: | ||
LOGGER.exception(ex) | ||
if dfile is None: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, not exactly
If we are working with an existing connection, we won't receive the token and we'll have to use the one saved inside the connection – it is only sent to the backend when the connection is created or the token is updated, we never give secrets back in our responses
(This can be fixed later as a different PR, when we have the pipeline going with actual connections)