Skip to content

Commit

Permalink
Download yadocs task
Browse files Browse the repository at this point in the history
  • Loading branch information
vallbull committed Oct 10, 2023
1 parent 483d11c commit 3774068
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Taskfile.dist.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'

vars:
USER_REL_PATH:
sh: python -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))"
sh: python3 -c "import os,sys;print(os.path.relpath('{{.USER_WORKING_DIR}}'))"

ROOT_DIR:
sh: pwd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema):
FileType.csv.name: SourceInfoSchemaBase,
FileType.gsheets.name: SourceInfoSchemaGSheets,
FileType.xlsx.name: SourceInfoSchemaBase,
FileType.yadocuments.name: SourceInfoSchemaBase,
}


Expand Down
1 change: 1 addition & 0 deletions lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class FileType(Enum):
csv = "csv"
gsheets = "gsheets"
xlsx = "xlsx"
yadocuments = "yadocuments"


class CSVEncoding(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
RenameTenantStatusModel,
SourceNotFoundError,
SpreadsheetFileSourceSettings,
YaDocumentsUserSourceProperties,
)
from .storage_schemas import (
DataFileSchema,
Expand Down Expand Up @@ -46,6 +47,7 @@
"EmptySourcesError",
"RenameTenantStatusModel",
"PreviewSet",
"YaDocumentsUserSourceProperties",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ def get_secret_keys(self) -> set[DataKey]:
return {DataKey(parts=("refresh_token",))}


@attr.s(init=True, kw_only=True)
class YaDocumentsUserSourceProperties(UserSourceProperties):
file_type: FileType = attr.ib(default=FileType.yadocuments)

private_path: Optional[str] = attr.ib(default=None)
public_link: Optional[str] = attr.ib(default=None)

oauth_token: Optional[str] = attr.ib(default=None)


@attr.s(init=True, kw_only=True)
class UserSourceDataSourceProperties:
file_type: FileType = attr.ib()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
GSheetsUserSourceDataSourceProperties,
GSheetsUserSourceProperties,
RenameTenantStatusModel,
YaDocumentsUserSourceProperties,
)


Expand Down Expand Up @@ -151,9 +152,19 @@ class Meta(BaseSchema.Meta):
spreadsheet_id = fields.String()


class YaDocumentsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema):
class Meta(BaseSchema.Meta):
target = YaDocumentsUserSourceProperties

private_path = fields.String(allow_none=True)
public_link = fields.String(allow_none=True)
oauth_token = fields.String(allow_none=True)


class UserSourcePropertiesSchema(FileTypeOneOfSchema):
type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = {
FileType.gsheets.name: GSheetsUserSourcePropertiesSchema,
FileType.yadocuments.name: YaDocumentsUserSourcePropertiesSchema,
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import requests


class YaDocumentsClient:
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}

def get_spreadsheet_public(self, link):
resp = requests.get(
f"https://cloud-api.yandex.net/v1/disk/public/resources/download/?public_key={link}",
headers=self.headers,
)
return resp.json()["href"]

def get_spreadsheet_public_meta(self, link):
resp = requests.get(
f"https://cloud-api.yandex.net/v1/disk/public/resources/?public_key={link}",
headers=self.headers,
)
return resp.json()

def get_spreadsheet_private(self, path, token):
headers_with_token = self._create_headers_with_token(token)
resp = requests.get(
f"https://cloud-api.yandex.net/v1/disk/resources/download/?path={path}",
headers=headers_with_token,
)
return resp.json()["href"]

def get_spreadsheet_private_meta(self, path, token):
headers_with_token = self._create_headers_with_token(token)
resp = requests.get(
f"https://cloud-api.yandex.net/v1/disk/resources/?path={path}",
headers=headers_with_token,
)
return resp.json()

def _create_headers_with_token(self, token: str):
headers_with_token = self.headers.copy()
headers_with_token.update({"Authorization": "OAuth " + token})
return headers_with_token
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ class DownloadGSheetTask(BaseTaskMeta):
exec_mode: TaskExecutionMode = attr.ib(default=TaskExecutionMode.BASIC)


@attr.s
class DownloadYaDocumentsTask(BaseTaskMeta):
name = TaskName("download_yadocuments")

file_id: str = attr.ib()


@attr.s
class ParseFileTask(BaseTaskMeta):
name = TaskName("parse_file")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
RenameTenantFilesTask,
)
from .delete import DeleteFileTask
from .download import DownloadGSheetTask
from .download import (
DownloadGSheetTask,
DownloadYaDocumentsTask,
)
from .excel import ProcessExcelTask
from .parse import ParseFileTask
from .save import SaveSourceTask
Expand All @@ -16,6 +19,7 @@
REGISTRY: TaskRegistry = TaskRegistry.create(
[
DownloadGSheetTask,
DownloadYaDocumentsTask,
ParseFileTask,
ProcessExcelTask,
SaveSourceTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

import aiogoogle
import aiohttp
import attr

from dl_constants.enums import (
Expand All @@ -26,6 +27,10 @@
from dl_core.us_manager.us_manager_async import AsyncUSManager
from dl_file_uploader_lib import exc
from dl_file_uploader_lib.data_sink.json_each_row import S3JsonEachRowUntypedFileAsyncDataSink
from dl_file_uploader_lib.data_sink.raw_bytes import (
RawBytesAsyncDataStream,
S3RawFileAsyncDataSink,
)
from dl_file_uploader_lib.gsheets_client import (
GSheetsClient,
GSheetsOAuth2,
Expand All @@ -39,7 +44,9 @@
GSheetsFileSourceSettings,
GSheetsUserSourceDataSourceProperties,
GSheetsUserSourceProperties,
YaDocumentsUserSourceProperties,
)
from dl_file_uploader_lib.yadocuments_client import YaDocumentsClient
from dl_file_uploader_task_interface.context import FileUploaderTaskContext
import dl_file_uploader_task_interface.tasks as task_interface
from dl_file_uploader_task_interface.tasks import TaskExecutionMode
Expand Down Expand Up @@ -340,3 +347,80 @@ async def run(self) -> TaskResult:
finally:
await usm.close()
return Success()


@attr.s
class DownloadYaDocumentsTask(BaseExecutorTask[task_interface.DownloadYaDocumentsTask, FileUploaderTaskContext]):
cls_meta = task_interface.DownloadYaDocumentsTask

async def run(self) -> TaskResult:
dfile: Optional[DataFile] = None
redis = self._ctx.redis_service.get_redis()

try:
rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config)
dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id)
assert dfile is not None

assert isinstance(dfile.user_source_properties, YaDocumentsUserSourceProperties)

yadocs_client = YaDocumentsClient()

if dfile.user_source_properties.public_link is not None:
spreadsheet_ref = yadocs_client.get_spreadsheet_public(link=dfile.user_source_properties.public_link)
spreadsheet_meta = yadocs_client.get_spreadsheet_public_meta(
link=dfile.user_source_properties.public_link
)

elif (
dfile.user_source_properties.private_path is not None
and dfile.user_source_properties.oauth_token is not None
):
spreadsheet_ref = yadocs_client.get_spreadsheet_private(
path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token
)
spreadsheet_meta = yadocs_client.get_spreadsheet_private_meta(
path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token
)
else:
raise exc.DLFileUploaderBaseError()
dfile.filename = spreadsheet_meta["name"]

s3 = self._ctx.s3_service

async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024):
async with aiohttp.ClientSession() as session:
async with session.get(spreadsheet_ref) as response:
assert response.status == 200
while True:
chunk = await response.content.read(chunk_size)
if chunk:
LOGGER.debug(f"Received chunk of {len(chunk)} bytes.")
yield chunk
else:
LOGGER.info("Empty chunk received.")
break

data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter())
async with S3RawFileAsyncDataSink(
s3=s3.client,
s3_key=dfile.s3_key,
bucket_name=s3.tmp_bucket_name,
) as data_sink:
await data_sink.dump_data_stream(data_stream)

await dfile.save()
LOGGER.info(f'Uploaded file "{dfile.filename}".')

except Exception as ex:
LOGGER.exception(ex)
if dfile is None:
return Retry(attempts=3)
else:
dfile.status = FileProcessingStatus.failed
exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed()
dfile.error = FileProcessingError.from_exception(exc_to_save)
await dfile.save()

return Fail()
return Success()
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_lib.enums import FileType
from dl_file_uploader_lib.redis_model.models import (
DataFile,
YaDocumentsUserSourceProperties,
)
from dl_file_uploader_task_interface.tasks import DownloadYaDocumentsTask
from dl_task_processor.state import wait_task
from dl_testing.s3_utils import s3_file_exists


@pytest.mark.asyncio
async def test_download_yadocs_task(
task_processor_client,
task_state,
s3_client,
redis_model_manager,
s3_tmp_bucket,
):
df = DataFile(
filename="",
file_type=FileType.yadocuments,
manager=redis_model_manager,
status=FileProcessingStatus.in_progress,
user_source_properties=YaDocumentsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"),
)
await df.save()

task = await task_processor_client.schedule(DownloadYaDocumentsTask(file_id=df.id))
result = await wait_task(task, task_state)

assert result[-1] == "success"
assert await s3_file_exists(key=df.s3_key, bucket=s3_tmp_bucket, s3_client=s3_client)

0 comments on commit 3774068

Please sign in to comment.