Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: BI-5952 file-uploader-api handler for processing presigned file from bucket #762

Merged
merged 9 commits into from
Dec 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def create_app(self, app_version: str) -> web.Application:

app.router.add_route("post", "/api/v2/files", files_views.FilesView)
app.router.add_route("post", "/api/v2/make_presigned_url", files_views.MakePresignedUrlView)
app.router.add_route("post", "/api/v2/download_presigned_url", files_views.DownloadPresignedUrlView)
app.router.add_route("post", "/api/v2/links", files_views.LinksView)
app.router.add_route("post", "/api/v2/documents", files_views.DocumentsView)
app.router.add_route("post", "/api/v2/update_connection_data", files_views.UpdateConnectionDataView)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ class MakePresignedUrlRequestSchema(ma.Schema):
content_md5 = ma.fields.String(required=True)


class DownloadPresignedUrlRequestSchema(ma.Schema):
filename = ma.fields.String(required=True)
key = ma.fields.String(required=True)


class PresignedUrlSchema(ma.Schema):
class PresignedUrlFields(ma.Schema):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
)
from dl_s3.data_sink import S3RawFileAsyncDataSink
from dl_s3.stream import RawBytesAsyncDataStream
from dl_s3.utils import s3_file_exists


LOGGER = logging.getLogger(__name__)

S3_KEY_PARTS_SEPARATOR = "/" # used to separate author user_id from the rest of the s3 object key to sign it


def get_file_type_from_name(
filename: Optional[str],
Expand Down Expand Up @@ -131,19 +134,28 @@ async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[byte


class MakePresignedUrlView(FileUploaderBaseView):
PRESIGNED_URL_EXPIRATION_SECONDS: ClassVar[int] = 60 * 60 # 1 hour
PRESIGNED_URL_MIN_BYTES: ClassVar[int] = 1
PRESIGNED_URL_MAX_BYTES: ClassVar[int] = 200 * 1024**2 # 200 MB

async def post(self) -> web.StreamResponse:
req_data = await self._load_post_request_schema_data(files_schemas.MakePresignedUrlRequestSchema)
content_md5: str = req_data["content_md5"]

s3 = self.dl_request.get_s3_service()
s3_key = "{}_{}".format(self.dl_request.rci.user_id or "unknown", str(uuid.uuid4()))
s3_key = S3_KEY_PARTS_SEPARATOR.join(
(
self.dl_request.rci.user_id or "unknown",
str(uuid.uuid4()),
)
)

url = await s3.client.generate_presigned_post(
Bucket=s3.tmp_bucket_name,
Key=s3_key,
ExpiresIn=60 * 60, # 1 hour
ExpiresIn=self.PRESIGNED_URL_EXPIRATION_SECONDS,
Conditions=[
["content-length-range", 1, 200 * 1024 * 1024], # 1B .. 200MB # TODO use constant from DataSink
["content-length-range", self.PRESIGNED_URL_MIN_BYTES, self.PRESIGNED_URL_MAX_BYTES],
{"Content-MD5": content_md5},
],
)
Expand All @@ -154,6 +166,49 @@ async def post(self) -> web.StreamResponse:
)


class DownloadPresignedUrlView(FileUploaderBaseView):
async def post(self) -> web.StreamResponse:
req_data = await self._load_post_request_schema_data(files_schemas.DownloadPresignedUrlRequestSchema)
filename: str = req_data["filename"]
s3_key: str = req_data["key"]

file_type = get_file_type_from_name(filename=filename, allow_xlsx=self.request.app["ALLOW_XLSX"])

s3_key_parts = s3_key.split(S3_KEY_PARTS_SEPARATOR)
if len(s3_key_parts) != 2 or s3_key_parts[0] != self.dl_request.rci.user_id:
raise exc.PermissionDenied()

s3 = self.dl_request.get_s3_service()
file_exists = await s3_file_exists(s3.client, s3.tmp_bucket_name, s3_key)
if not file_exists:
raise exc.DocumentNotFound()

rmm = self.dl_request.get_redis_model_manager()
dfile = DataFile(
s3_key=s3_key,
manager=rmm,
filename=filename,
file_type=file_type,
status=FileProcessingStatus.in_progress,
)
LOGGER.info(f"Data file id: {dfile.id}")

await dfile.save()

task_processor = self.dl_request.get_task_processor()
if file_type == FileType.xlsx:
await task_processor.schedule(ProcessExcelTask(file_id=dfile.id))
LOGGER.info(f"Scheduled ProcessExcelTask for file_id {dfile.id}")
else:
await task_processor.schedule(ParseFileTask(file_id=dfile.id))
LOGGER.info(f"Scheduled ParseFileTask for file_id {dfile.id}")

return web.json_response(
files_schemas.FileUploadResponseSchema().dump({"file_id": dfile.id, "title": dfile.filename}),
status=HTTPStatus.CREATED,
)


class LinksView(FileUploaderBaseView):
REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@
pass


_TESTS_USER_ID = "_the_tests_asyncapp_user_id_"
_TESTS_USER_NAME = "_the_tests_asyncapp_user_name_"


def pytest_configure(config: Any) -> None: # noqa
common_pytest_configure(tracing_service_name="tests_bi_file_uploader")

Expand Down Expand Up @@ -215,8 +219,8 @@ class TestingFileUploaderApiAppFactory(FileUploaderApiAppFactory[FileUploaderAPI
def get_auth_middlewares(self) -> list[Middleware]:
return [
auth_trust_middleware(
fake_user_id="_the_tests_file_uploader_api_user_id_",
fake_user_name="_the_tests_file_uploader_api_user_name_",
fake_user_id=_TESTS_USER_ID,
fake_user_name=_TESTS_USER_NAME,
)
]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import base64
import hashlib
import logging
import os

Expand All @@ -7,6 +9,7 @@
from dl_api_commons.client.common import Req
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder
from dl_file_uploader_lib.testing.data_gen import generate_sample_csv_data_str
from dl_s3.utils import upload_to_s3_by_presigned


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,21 +67,43 @@ def upload_file_req_12mb() -> Req:


@pytest.fixture(scope="function")
async def uploaded_file_id(s3_tmp_bucket, fu_client, upload_file_req) -> str:
resp = await fu_client.make_request(upload_file_req)
assert resp.status == 201
async def uploaded_file_id(s3_tmp_bucket, fu_client, csv_data) -> str:
content_md5 = base64.b64encode(hashlib.md5(csv_data.encode("utf-8")).digest()).decode("utf-8")
presigned_url_resp = await fu_client.make_request(ReqBuilder.presigned_url(content_md5))
assert presigned_url_resp.status == 200, presigned_url_resp.json

upload_resp = await upload_to_s3_by_presigned(presigned_url_resp.json, content_md5, csv_data)
assert upload_resp.status == 204

download_resp = await fu_client.make_request(
ReqBuilder.presigned_url_download(presigned_url_resp.json["fields"]["key"], "csv_data.csv")
)
assert download_resp.status == 201, download_resp.json

assert download_resp.status == 201
await asyncio.sleep(3)
return resp.json["file_id"]
return download_resp.json["file_id"]


@pytest.fixture(scope="function")
async def uploaded_excel_id(
s3_tmp_bucket,
fu_client,
upload_excel_req,
excel_data,
reader_app,
) -> str:
resp = await fu_client.make_request(upload_excel_req)
assert resp.status == 201
content_md5 = base64.b64encode(hashlib.md5(excel_data).digest()).decode("utf-8")
presigned_url_resp = await fu_client.make_request(ReqBuilder.presigned_url(content_md5))
assert presigned_url_resp.status == 200, presigned_url_resp.json

upload_resp = await upload_to_s3_by_presigned(presigned_url_resp.json, content_md5, excel_data)
assert upload_resp.status == 204

download_resp = await fu_client.make_request(
ReqBuilder.presigned_url_download(presigned_url_resp.json["fields"]["key"], "data.xlsx")
)
assert download_resp.status == 201, download_resp.json

assert download_resp.status == 201
await asyncio.sleep(3)
return resp.json["file_id"]
return download_resp.json["file_id"]
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
import asyncio
import base64
import hashlib
import http
import json
import uuid

import aiohttp
import attr
import pytest

from dl_api_commons.base_models import RequestContextInfo
from dl_configs.crypto_keys import get_dummy_crypto_keys_config
from dl_constants.enums import FileProcessingStatus
from dl_file_uploader_api_lib.views.files import (
S3_KEY_PARTS_SEPARATOR,
MakePresignedUrlView,
)
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder
from dl_file_uploader_lib.redis_model.base import RedisModelManager
from dl_file_uploader_lib.redis_model.models import DataFile
from dl_s3.data_sink import S3RawFileAsyncDataSink
from dl_s3.utils import upload_to_s3_by_presigned


@pytest.mark.asyncio
Expand All @@ -28,14 +36,78 @@ async def test_file_upload_cors(fu_client, s3_tmp_bucket, upload_file_req):


@pytest.mark.asyncio
async def test_make_presigned_url(fu_client, s3_tmp_bucket):
async def test_make_presigned_url(fu_client, s3_tmp_bucket, rci):
expected_url_fields = ("key", "x-amz-algorithm", "x-amz-credential", "x-amz-date", "policy", "x-amz-signature")

resp = await fu_client.make_request(ReqBuilder.presigned_url("mymd5"))
assert resp.status == 200
assert "url" in resp.json, resp.json
assert "fields" in resp.json, resp.json
assert all(field in resp.json["fields"] for field in expected_url_fields), resp.json
key = resp.json["fields"]["key"]
key_parts = key.split(S3_KEY_PARTS_SEPARATOR)
assert len(key_parts) == 2, key_parts
assert key_parts[0] == rci.user_id


@pytest.mark.asyncio
async def test_download_presigned_url(fu_client, s3_tmp_bucket, csv_data):
content_md5 = base64.b64encode(hashlib.md5(csv_data.encode("utf-8")).digest()).decode("utf-8")

presigned_url_resp = await fu_client.make_request(ReqBuilder.presigned_url(content_md5))
assert presigned_url_resp.status == 200, presigned_url_resp.json

upload_resp = await upload_to_s3_by_presigned(presigned_url_resp.json, content_md5, csv_data)
upload_resp_data = await upload_resp.read()
assert upload_resp.status == 204, upload_resp_data

download_resp = await fu_client.make_request(
ReqBuilder.presigned_url_download(presigned_url_resp.json["fields"]["key"], "csv_data.csv")
)
assert download_resp.status == 201, download_resp.json


@pytest.mark.asyncio
async def test_download_presigned_url_bad_user(fu_client, s3_tmp_bucket, csv_data):
content_md5 = base64.b64encode(hashlib.md5(csv_data.encode("utf-8")).digest()).decode("utf-8")

presigned_url_resp = await fu_client.make_request(ReqBuilder.presigned_url(content_md5))
assert presigned_url_resp.status == 200, presigned_url_resp.json

_, file_uuid = presigned_url_resp.json["fields"]["key"].split(S3_KEY_PARTS_SEPARATOR)
presigned_url_resp.json["fields"]["key"] = S3_KEY_PARTS_SEPARATOR.join(("hacker", file_uuid))

download_resp = await fu_client.make_request(
ReqBuilder.presigned_url_download(presigned_url_resp.json["fields"]["key"], "csv_data.csv", require_ok=False),
)
assert download_resp.status != 201, download_resp.json
assert download_resp.json["code"] == "ERR.FILE.PERMISSION_DENIED"


@pytest.mark.asyncio
async def test_upload_presigned_too_large_file(monkeypatch, fu_client, s3_tmp_bucket, csv_data):
monkeypatch.setattr(MakePresignedUrlView, "PRESIGNED_URL_MAX_BYTES", 32)

content_md5 = base64.b64encode(hashlib.md5(csv_data.encode("utf-8")).digest()).decode("utf-8")

presigned_url_resp = await fu_client.make_request(ReqBuilder.presigned_url(content_md5))
assert presigned_url_resp.status == 200, presigned_url_resp.json

with pytest.raises(aiohttp.ClientResponseError):
await upload_to_s3_by_presigned(presigned_url_resp.json, content_md5, csv_data)


@pytest.mark.asyncio
async def test_upload_presigned_bad_key(monkeypatch, fu_client, s3_tmp_bucket, csv_data):
content_md5 = base64.b64encode(hashlib.md5(csv_data.encode("utf-8")).digest()).decode("utf-8")

presigned_url_resp = await fu_client.make_request(ReqBuilder.presigned_url(content_md5))
assert presigned_url_resp.status == 200, presigned_url_resp.json

presigned_url_resp.json["fields"]["key"] = "hacker/file"

with pytest.raises(aiohttp.ClientResponseError):
await upload_to_s3_by_presigned(presigned_url_resp.json, content_md5, csv_data)


@pytest.mark.asyncio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dl_core_testing.connection import make_conn_key
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder
from dl_file_uploader_lib import exc
from dl_testing.s3_utils import s3_file_exists
from dl_s3.utils import s3_file_exists

from dl_connector_bundle_chs3.chs3_gsheets.core.constants import CONNECTION_TYPE_GSHEETS_V2
from dl_connector_bundle_chs3.chs3_gsheets.core.lifecycle import GSheetsFileS3ConnectionLifecycleManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dl_core_testing.connection import make_conn_key
from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder
from dl_file_uploader_lib import exc
from dl_testing.s3_utils import s3_file_exists
from dl_s3.utils import s3_file_exists

from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS
from dl_connector_bundle_chs3.chs3_yadocs.core.lifecycle import YaDocsFileS3ConnectionLifecycleManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def presigned_url(cls, content_md5: str, *, require_ok: bool = True) -> Req:
require_ok=require_ok,
)

@classmethod
def presigned_url_download(cls, key: str, filename: str, *, require_ok: bool = True) -> Req:
return Req(
method="post",
url="/api/v2/download_presigned_url",
data_json={
"key": key,
"filename": filename,
},
require_ok=require_ok,
)

@classmethod
def file_status(cls, file_id: str) -> Req:
return Req(
Expand Down
11 changes: 5 additions & 6 deletions lib/dl_file_uploader_api_lib/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ services:
- 51404:6379

s3-storage:
build:
context: ../testenv-common/images
dockerfile: Dockerfile.s3-storage
command: bash /data/entrypoint.sh
image: minio/minio:RELEASE.2024-12-18T13-15-44Z@sha256:1dce27c494a16bae114774f1cec295493f3613142713130c2d22dd5696be6ad3
environment:
S3BACKEND: "mem"
REMOTE_MANAGEMENT_DISABLE: 1
MINIO_ROOT_USER: accessKey1
MINIO_ROOT_PASSWORD: verySecretKey1
MINIO_DOMAIN: local
command: server --address ":8000" /export
ports:
- 51420:8000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ class DataFile(RedisModelUserIdAuth):
def s3_key_old(self) -> str:
# transition from s3_key generated by self.id to stored self.s3_key, to be removed in future releases
# see also: DataFileSchema
if self.s3_key is not None:
return self.s3_key
return self.id
return self.s3_key or self.id

def get_secret_keys(self) -> set[DataKey]:
if self.user_source_properties is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
YaDocsUserSourceProperties,
)
from dl_file_uploader_task_interface.tasks import DownloadYaDocsTask
from dl_s3.utils import s3_file_exists
from dl_task_processor.state import wait_task
from dl_testing.s3_utils import s3_file_exists


@pytest.mark.asyncio
Expand Down
1 change: 1 addition & 0 deletions lib/dl_s3/dl_s3/s3_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def initialize(self) -> None:
aws_secret_access_key=self._secret_access_key,
endpoint_url=self._endpoint_url,
config=AioConfig(signature_version="s3v4"),
# ^ v4 signature is required to generate presigned URLs with restriction policies
)

session = get_session()
Expand Down
Loading
Loading