Skip to content

Commit

Permalink
Fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
vallbull committed Oct 12, 2023
1 parent 165cf3a commit ab5a41d
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class SourceInfoSchema(FileTypeOneOfSchema):
FileType.csv.name: SourceInfoSchemaBase,
FileType.gsheets.name: SourceInfoSchemaGSheets,
FileType.xlsx.name: SourceInfoSchemaBase,
FileType.yadocuments.name: SourceInfoSchemaBase,
FileType.yadocs.name: SourceInfoSchemaBase,
}


Expand Down
2 changes: 1 addition & 1 deletion lib/dl_file_uploader_lib/dl_file_uploader_lib/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class FileType(Enum):
csv = "csv"
gsheets = "gsheets"
xlsx = "xlsx"
yadocuments = "yadocuments"
yadocs = "yadocs"


class CSVEncoding(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
RenameTenantStatusModel,
SourceNotFoundError,
SpreadsheetFileSourceSettings,
YaDocumentsUserSourceProperties,
YaDocsUserSourceProperties,
)
from .storage_schemas import (
DataFileSchema,
Expand Down Expand Up @@ -47,7 +47,7 @@
"EmptySourcesError",
"RenameTenantStatusModel",
"PreviewSet",
"YaDocumentsUserSourceProperties",
"YaDocsUserSourceProperties",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ def get_secret_keys(self) -> set[DataKey]:


@attr.s(init=True, kw_only=True)
class YaDocumentsUserSourceProperties(UserSourceProperties):
file_type: FileType = attr.ib(default=FileType.yadocuments)
class YaDocsUserSourceProperties(UserSourceProperties):
file_type: FileType = attr.ib(default=FileType.yadocs)

private_path: Optional[str] = attr.ib(default=None)
public_link: Optional[str] = attr.ib(default=None)

oauth_token: Optional[str] = attr.ib(default=None)
oauth_token: Optional[str] = attr.ib(default=None, repr=False)

def get_secret_keys(self) -> set[DataKey]:
return {DataKey(parts=("oauth_token",))}


@attr.s(init=True, kw_only=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
GSheetsUserSourceDataSourceProperties,
GSheetsUserSourceProperties,
RenameTenantStatusModel,
YaDocumentsUserSourceProperties,
YaDocsUserSourceProperties,
)


Expand Down Expand Up @@ -152,9 +152,9 @@ class Meta(BaseSchema.Meta):
spreadsheet_id = fields.String()


class YaDocumentsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema):
class YaDocsUserSourcePropertiesSchema(UserSourcePropertiesBaseSchema):
class Meta(BaseSchema.Meta):
target = YaDocumentsUserSourceProperties
target = YaDocsUserSourceProperties

private_path = fields.String(allow_none=True)
public_link = fields.String(allow_none=True)
Expand All @@ -164,7 +164,7 @@ class Meta(BaseSchema.Meta):
class UserSourcePropertiesSchema(FileTypeOneOfSchema):
type_schemas: dict[str, Type[UserSourcePropertiesBaseSchema]] = {
FileType.gsheets.name: GSheetsUserSourcePropertiesSchema,
FileType.yadocuments.name: YaDocumentsUserSourcePropertiesSchema,
FileType.yadocs.name: YaDocsUserSourcePropertiesSchema,
}


Expand Down
80 changes: 80 additions & 0 deletions lib/dl_file_uploader_lib/dl_file_uploader_lib/yadocs_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Any

import aiohttp
from aiohttp.client import (
ClientResponse,
ClientSession,
)

from dl_file_uploader_lib import exc as file_upl_exc


def yadocs_error_to_file_uploader_exception(status_code: int, resp_info: dict):
if status_code == 401:
err_cls = file_upl_exc.PermissionDenied
elif status_code == 404:
err_cls = file_upl_exc.DocumentNotFound
elif status_code == 400:
err_cls = file_upl_exc.UnsupportedDocument
elif status_code >= 500:
err_cls = file_upl_exc.RemoteServerError
else:
err_cls = file_upl_exc.DLFileUploaderBaseError

return err_cls(
details=resp_info,
)


class YaDocsClient:
headers: dict[str, Any] = {
"Content-Type": "application/json",
"Accept": "application/json",
}
hostname: str = "https://cloud-api.yandex.net/v1/disk"

def __init__(self, session: ClientSession):
self.session = session

async def get_spreadsheet_public_ref(self, link: str) -> str:
resp = await self.session.get(
f"{self.hostname}/public/resources/download/?public_key={link}",
headers=self.headers,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return (await resp.json())["href"]

async def get_spreadsheet_public_meta(self, link: str) -> ClientResponse:
resp = await self.session.get(
f"{self.hostname}/public/resources/?public_key={link}",
headers=self.headers,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return await resp.json()

async def get_spreadsheet_private_ref(self, path: str, token: str) -> str:
headers_with_token = self._create_headers_with_token(token)
resp = await self.session.get(
f"{self.hostname}/resources/download/?path={path}",
headers=headers_with_token,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return (await resp.json())["href"]

async def get_spreadsheet_private_meta(self, path: str, token: str) -> ClientResponse:
headers_with_token = self._create_headers_with_token(token)
resp = await self.session.get(
f"{self.hostname}/resources/?path={path}",
headers=headers_with_token,
)
if resp.status != 200:
raise yadocs_error_to_file_uploader_exception(resp.status, await resp.json())
return await resp.json()

def _create_headers_with_token(self, token: str) -> dict[str, Any]:
headers_with_token = self.headers.copy()
headers_with_token.update({"Authorization": "OAuth " + token})
return headers_with_token

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class DownloadGSheetTask(BaseTaskMeta):


@attr.s
class DownloadYaDocumentsTask(BaseTaskMeta):
name = TaskName("download_yadocuments")
class DownloadYaDocsTask(BaseTaskMeta):
name = TaskName("download_yadocs")

file_id: str = attr.ib()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
RenameTenantFilesTask,
)
from .delete import DeleteFileTask
from .download import (
DownloadGSheetTask,
DownloadYaDocumentsTask,
)
from .download_gsheets import DownloadGSheetTask
from .download_yadocs import DownloadYaDocsTask
from .excel import ProcessExcelTask
from .parse import ParseFileTask
from .save import SaveSourceTask
Expand All @@ -19,7 +17,7 @@
REGISTRY: TaskRegistry = TaskRegistry.create(
[
DownloadGSheetTask,
DownloadYaDocumentsTask,
DownloadYaDocsTask,
ParseFileTask,
ProcessExcelTask,
SaveSourceTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
)

import aiogoogle
import aiohttp
import attr

from dl_constants.enums import (
Expand All @@ -27,10 +26,6 @@
from dl_core.us_manager.us_manager_async import AsyncUSManager
from dl_file_uploader_lib import exc
from dl_file_uploader_lib.data_sink.json_each_row import S3JsonEachRowUntypedFileAsyncDataSink
from dl_file_uploader_lib.data_sink.raw_bytes import (
RawBytesAsyncDataStream,
S3RawFileAsyncDataSink,
)
from dl_file_uploader_lib.gsheets_client import (
GSheetsClient,
GSheetsOAuth2,
Expand All @@ -44,9 +39,7 @@
GSheetsFileSourceSettings,
GSheetsUserSourceDataSourceProperties,
GSheetsUserSourceProperties,
YaDocumentsUserSourceProperties,
)
from dl_file_uploader_lib.yadocuments_client import YaDocumentsClient
from dl_file_uploader_task_interface.context import FileUploaderTaskContext
import dl_file_uploader_task_interface.tasks as task_interface
from dl_file_uploader_task_interface.tasks import TaskExecutionMode
Expand Down Expand Up @@ -347,80 +340,3 @@ async def run(self) -> TaskResult:
finally:
await usm.close()
return Success()


@attr.s
class DownloadYaDocumentsTask(BaseExecutorTask[task_interface.DownloadYaDocumentsTask, FileUploaderTaskContext]):
cls_meta = task_interface.DownloadYaDocumentsTask

async def run(self) -> TaskResult:
dfile: Optional[DataFile] = None
redis = self._ctx.redis_service.get_redis()

try:
rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config)
dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id)
assert dfile is not None

assert isinstance(dfile.user_source_properties, YaDocumentsUserSourceProperties)

yadocs_client = YaDocumentsClient()

if dfile.user_source_properties.public_link is not None:
spreadsheet_ref = yadocs_client.get_spreadsheet_public(link=dfile.user_source_properties.public_link)
spreadsheet_meta = yadocs_client.get_spreadsheet_public_meta(
link=dfile.user_source_properties.public_link
)

elif (
dfile.user_source_properties.private_path is not None
and dfile.user_source_properties.oauth_token is not None
):
spreadsheet_ref = yadocs_client.get_spreadsheet_private(
path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token
)
spreadsheet_meta = yadocs_client.get_spreadsheet_private_meta(
path=dfile.user_source_properties.private_path, token=dfile.user_source_properties.oauth_token
)
else:
raise exc.DLFileUploaderBaseError()
dfile.filename = spreadsheet_meta["name"]

s3 = self._ctx.s3_service

async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024):
async with aiohttp.ClientSession() as session:
async with session.get(spreadsheet_ref) as response:
assert response.status == 200
while True:
chunk = await response.content.read(chunk_size)
if chunk:
LOGGER.debug(f"Received chunk of {len(chunk)} bytes.")
yield chunk
else:
LOGGER.info("Empty chunk received.")
break

data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter())
async with S3RawFileAsyncDataSink(
s3=s3.client,
s3_key=dfile.s3_key,
bucket_name=s3.tmp_bucket_name,
) as data_sink:
await data_sink.dump_data_stream(data_stream)

await dfile.save()
LOGGER.info(f'Uploaded file "{dfile.filename}".')

except Exception as ex:
LOGGER.exception(ex)
if dfile is None:
return Retry(attempts=3)
else:
dfile.status = FileProcessingStatus.failed
exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.DownloadFailed()
dfile.error = FileProcessingError.from_exception(exc_to_save)
await dfile.save()

return Fail()
return Success()
Loading

0 comments on commit ab5a41d

Please sign in to comment.