Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yadocs fixes #88

Merged
merged 2 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def validate_docs_data(data):
if not ((data["public_link"] is None) ^ (data["private_path"] is None)):
raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified")
if data["public_link"] is None:
if data["private_path"] is None or data["oauth_token"] is None:
raise ma.ValidationError("Both path and token must be provided for private files")
if data["private_path"] is None:
raise ma.ValidationError("'private_path' must be provided for private files")
Comment on lines +29 to +30
Copy link
Contributor

@KonstantAnxiety KonstantAnxiety Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is unreachable
This case is covered by the very first condition (not (A ^ B))

elif data["oauth_token"] is None and data["connection_id"] is None:
raise ma.ValidationError("Expected `oauth_token` or `connection_id` to be specified")


class FileLinkRequestSchema(BaseRequestSchema):
Expand All @@ -43,10 +45,11 @@ def validate_object(self, data: dict, **kwargs: Any) -> None:


class FileDocumentsRequestSchema(BaseRequestSchema):
# connection_id = ma.fields.String(load_default=None, allow_none=True)
connection_id = ma.fields.String(load_default=None, allow_none=True)
private_path = ma.fields.String(load_default=None, allow_none=True)
oauth_token = ma.fields.String(load_default=None, allow_none=True)
public_link = ma.fields.String(load_default=None, allow_none=True)
authorized = ma.fields.Boolean(required=True)

@ma.validates_schema(skip_on_field_errors=True)
def validate_object(self, data: dict, **kwargs: Any) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ async def post(self) -> web.StreamResponse:
oauth_token: Optional[str] = req_data["oauth_token"]
public_link: Optional[str] = req_data["public_link"]
private_path: Optional[str] = req_data["private_path"]
connection_id: Optional[str] = req_data["connection_id"]
authorized: bool = req_data["authorized"]

df = await self.FILE_TYPE_TO_DATA_FILE_PREPARER_MAP[file_type](
oauth_token=oauth_token,
Expand All @@ -197,8 +199,14 @@ async def post(self) -> web.StreamResponse:
await df.save()

task_processor = self.dl_request.get_task_processor()
await task_processor.schedule(DownloadYaDocsTask(file_id=df.id))
LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {df.id}")
await task_processor.schedule(
DownloadYaDocsTask(
file_id=df.id,
authorized=authorized,
connection_id=connection_id,
)
)
LOGGER.info(f"Scheduled DownloadYaDocsTask for file_id {df.id}")

return web.json_response(
files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def test_yadocuments_private_file(
):
private_path = "test private table.xlsx"
resp = await fu_client.make_request(
ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token)
ReqBuilder.upload_documents(private_path=private_path, oauth_token=ya_docs_oauth_token, authorized=True)
)
assert resp.status == 201
file_id = resp.json["file_id"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def upload_documents(
public_link: Optional[str] = None,
private_path: Optional[str] = None,
oauth_token: Optional[str] = None,
authorized: bool = False,
*,
require_ok: bool = True,
) -> Req:
Expand All @@ -91,6 +92,7 @@ def upload_documents(
"public_link": public_link,
"private_path": private_path,
"oauth_token": oauth_token,
"authorized": authorized,
},
require_ok=require_ok,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class DownloadYaDocsTask(BaseTaskMeta):
name = TaskName("download_yadocs")

file_id: str = attr.ib()
authorized: bool = attr.ib()

tenant_id: Optional[str] = attr.ib(default=None)
connection_id: Optional[str] = attr.ib(default=None)


@attr.s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import attr

from dl_constants.enums import FileProcessingStatus
from dl_core.us_manager.us_manager_async import AsyncUSManager
from dl_file_uploader_lib import exc
from dl_file_uploader_lib.data_sink.raw_bytes import (
RawBytesAsyncDataStream,
Expand All @@ -35,10 +36,34 @@
TaskResult,
)

from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection


LOGGER = logging.getLogger(__name__)


class NoToken(Exception):
pass


async def _get_yadocs_oauth_token(
dfile_token: Optional[str],
conn_id: Optional[str],
usm: AsyncUSManager,
) -> Optional[str]:
if dfile_token is not None: # if there is a token in dfile, then use it
oauth_token = dfile_token
elif conn_id is not None: # otherwise, use the one from the connection
conn: YaDocsFileS3Connection = await usm.get_by_id(conn_id, YaDocsFileS3Connection)
if conn.data.oauth_token is None:
raise NoToken()
oauth_token = conn.data.oauth_token
else:
raise NoToken()

return oauth_token


@attr.s
class DownloadYaDocsTask(BaseExecutorTask[task_interface.DownloadYaDocsTask, FileUploaderTaskContext]):
cls_meta = task_interface.DownloadYaDocsTask
Expand All @@ -47,6 +72,7 @@ async def run(self) -> TaskResult:
dfile: Optional[DataFile] = None
redis = self._ctx.redis_service.get_redis()
task_processor = self._ctx.make_task_processor(self._request_id)
usm = self._ctx.get_async_usm()

try:
rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config)
Expand All @@ -55,6 +81,15 @@ async def run(self) -> TaskResult:

assert isinstance(dfile.user_source_properties, YaDocsUserSourceProperties)

oauth_token: Optional[str] = None
if self.meta.authorized:
try:
dfile_token = dfile.user_source_properties.oauth_token
oauth_token = await _get_yadocs_oauth_token(dfile_token, self.meta.connection_id, usm)
except NoToken:
LOGGER.error("Authorized call but no token found in either DataFile or connection, failing task")
return Fail()

async with aiohttp.ClientSession() as session:
yadocs_client = YaDocsClient(session)
try:
Expand All @@ -66,17 +101,14 @@ async def run(self) -> TaskResult:
link=dfile.user_source_properties.public_link
)

elif (
dfile.user_source_properties.private_path is not None
and dfile.user_source_properties.oauth_token is not None
):
elif dfile.user_source_properties.private_path is not None and oauth_token is not None:
spreadsheet_ref = await yadocs_client.get_spreadsheet_private_ref(
path=dfile.user_source_properties.private_path,
token=dfile.user_source_properties.oauth_token,
token=oauth_token,
)
spreadsheet_meta = await yadocs_client.get_spreadsheet_private_meta(
path=dfile.user_source_properties.private_path,
token=dfile.user_source_properties.oauth_token,
token=oauth_token,
)
else:
raise exc.DLFileUploaderBaseError()
Expand All @@ -91,7 +123,9 @@ async def run(self) -> TaskResult:
dfile.filename = spreadsheet_meta["name"]
xlsx_suffix = ".xlsx"
if not dfile.filename.endswith(xlsx_suffix):
raise exc.UnsupportedDocument
raise exc.UnsupportedDocument(
details={"reason": f"Supported file extensions are: '.xlsx'. Got {dfile.filename}"}
)

s3 = self._ctx.s3_service

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def test_download_yadocs_task(
)
await df.save()

task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id))
task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id, authorized=False))
result = await wait_task(task, task_state)

assert result[-1] == "success"
Expand Down
Loading