diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py index 5af0a6333..ce4ffe5e7 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py @@ -72,6 +72,26 @@ class FileUploadResponseSchema(ma.Schema): title = ma.fields.String() +class MakePresignedUrlRequestSchema(ma.Schema): + content_md5 = ma.fields.String(required=True) + + +class PresignedUrlSchema(ma.Schema): + class PresignedUrlFields(ma.Schema): + class Meta: + unknown = ma.INCLUDE + + key = ma.fields.String() + x_amz_algorithm = ma.fields.String(data_key="x-amz-algorithm") + x_amz_credential = ma.fields.String(data_key="x-amz-credential") + x_amz_date = ma.fields.String(data_key="x-amz-date") + policy = ma.fields.String() + x_amz_signature = ma.fields.String(data_key="x-amz-signature") + + url = ma.fields.String(required=True) + fields = ma.fields.Nested(PresignedUrlFields) + + class FileStatusRequestSchema(BaseRequestSchema): file_id = ma.fields.String(required=True) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py index 9062bd1c1..8192c00b5 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py @@ -9,6 +9,7 @@ ClassVar, Optional, ) +import uuid from aiohttp import web from aiohttp.multipart import BodyPartReader @@ -82,13 +83,13 @@ async def post(self) -> web.StreamResponse: s3 = self.dl_request.get_s3_service() rmm = self.dl_request.get_redis_model_manager() - df = DataFile( + dfile = DataFile( manager=rmm, filename=filename, file_type=file_type, status=FileProcessingStatus.in_progress, ) - LOGGER.info(f"Data file id: {df.id}") + LOGGER.info(f"Data file id: {dfile.id}") async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[bytes, None]: assert isinstance(field, BodyPartReader) @@ -104,7 +105,7 @@ async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[byte data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) async with S3RawFileAsyncDataSink( s3=s3.client, - s3_key=df.s3_key, + s3_key=dfile.s3_key_old, bucket_name=s3.tmp_bucket_name, max_file_size_exc=exc.FileLimitError, ) as data_sink: @@ -112,23 +113,49 @@ async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[byte # df.size = size # TODO - await df.save() + await dfile.save() LOGGER.info(f'Uploaded file "{filename}".') task_processor = self.dl_request.get_task_processor() if file_type == FileType.xlsx: - await task_processor.schedule(ProcessExcelTask(file_id=df.id)) - LOGGER.info(f"Scheduled ProcessExcelTask for file_id {df.id}") + await task_processor.schedule(ProcessExcelTask(file_id=dfile.id)) + LOGGER.info(f"Scheduled ProcessExcelTask for file_id {dfile.id}") else: - await task_processor.schedule(ParseFileTask(file_id=df.id)) - LOGGER.info(f"Scheduled ParseFileTask for file_id {df.id}") + await task_processor.schedule(ParseFileTask(file_id=dfile.id)) + LOGGER.info(f"Scheduled ParseFileTask for file_id {dfile.id}") return web.json_response( - files_schemas.FileUploadResponseSchema().dump({"file_id": df.id, "title": df.filename}), + files_schemas.FileUploadResponseSchema().dump({"file_id": dfile.id, "title": dfile.filename}), status=HTTPStatus.CREATED, ) +class MakePresignedUrlView(FileUploaderBaseView): + REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check + + async def post(self) -> web.StreamResponse: + req_data = await self._load_post_request_schema_data(files_schemas.MakePresignedUrlRequestSchema) + content_md5: str = req_data["content_md5"] + + s3 = self.dl_request.get_s3_service() + s3_key = "{}_{}".format(self.dl_request.rci.user_id or "unknown", str(uuid.uuid4())) + + url = s3.client.generate_presigned_post( + Bucket=s3.tmp_bucket_name, + Key=s3_key, + ExpiresIn=60 * 60, # 1 hour # TODO config? + Conditions=[ + ["content-length-range", 1, 200 * 1024 * 1024], # 1B .. 200MB # TODO use constant + {"Content-MD5": content_md5}, + ] + ) + + return web.json_response( + files_schemas.PresignedUrlSchema().dump(url), + status=HTTPStatus.OK, + ) + + class LinksView(FileUploaderBaseView): REQUIRED_RESOURCES: ClassVar[frozenset[RequiredResource]] = frozenset() # Don't skip CSRF check diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/db/test_files_api.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/db/test_files_api.py index 2c3e263cf..81b2882a6 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/db/test_files_api.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/db/test_files_api.py @@ -27,6 +27,17 @@ async def test_file_upload_cors(fu_client, s3_tmp_bucket, upload_file_req): assert cors_header in resp.headers +@pytest.mark.asyncio +async def test_make_presigned_url(fu_client, s3_tmp_bucket): + expected_url_fields = ("key", "x-amz-algorithm", "x-amz-credential", "x-amz-date", "policy", "x-amz-signature") + + resp = await fu_client.make_request(ReqBuilder.presigned_url("mymd5")) + assert resp.status == 200 + assert "url" in resp.json, resp.json + assert "fields" in resp.json, resp.json + assert all(field in resp.json["fields"] for field in expected_url_fields), resp.json + + @pytest.mark.asyncio async def test_file_upload( fu_client, diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py index ef472936b..7a946d574 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py @@ -98,6 +98,17 @@ def upload_documents( require_ok=require_ok, ) + @classmethod + def presigned_url(cls, content_md5: str, *, require_ok: bool = True) -> Req: + return Req( + method="post", + url=f"/api/v2/make_presigned_url", + data_json={ + "content_md5": content_md5, + }, + require_ok=require_ok, + ) + @classmethod def file_status(cls, file_id: str) -> Req: return Req( diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py index 5ea465b25..468e2ddf5 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py @@ -185,6 +185,7 @@ def is_applicable(self) -> bool: @attr.s(init=True, kw_only=True) class DataFile(RedisModelUserIdAuth): + s3_key: Optional[str] = attr.ib(default=None) filename: Optional[str] = attr.ib() file_type: Optional[FileType] = attr.ib(default=None) file_settings: Optional[FileSettings] = attr.ib(default=None) @@ -195,10 +196,14 @@ class DataFile(RedisModelUserIdAuth): error: Optional[FileProcessingError] = attr.ib(default=None) KEY_PREFIX: ClassVar[str] = "df" - DEFAULT_TTL: ClassVar[Optional[int]] = 12 * 60 * 60 # 12 hours + DEFAULT_TTL: ClassVar[Optional[int]] = 3 * 60 * 60 # 3 hours @property - def s3_key(self) -> str: + def s3_key_old(self) -> str: + # transition from s3_key generated by self.id to stored self.s3_key, to be removed in future releases + # see also: DataFileSchema + if self.s3_key is not None: + return self.s3_key return self.id def get_secret_keys(self) -> set[DataKey]: diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py index cb4e2eee3..bbd294fd9 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py @@ -224,6 +224,7 @@ class DataFileSchema(BaseModelSchema): class Meta(BaseModelSchema.Meta): target = DataFile + s3_key = fields.String(load_default=None, dump_default=None) # TODO remove defaults after transition filename = fields.String() file_type = fields.Enum(FileType, allow_none=True) file_settings = fields.Nested(FileSettingsSchema, allow_none=True) diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index e1176b403..e4479b193 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -154,7 +154,7 @@ async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[byte data_stream = RawBytesAsyncDataStream(data_iter=_chunk_iter()) async with S3RawFileAsyncDataSink( s3=s3.client, - s3_key=dfile.s3_key, + s3_key=dfile.s3_key_old, bucket_name=s3.tmp_bucket_name, max_file_size_exc=exc.FileLimitError, ) as data_sink: diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py index 6a41fb383..c54235ba8 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py @@ -78,7 +78,7 @@ async def run(self) -> TaskResult: s3_resp = await s3.client.get_object( Bucket=s3.tmp_bucket_name, - Key=dfile.s3_key, + Key=dfile.s3_key_old, ) file_obj = await s3_resp["Body"].read() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py index 558fa4270..977934943 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py @@ -73,7 +73,7 @@ async def _prepare_file( dst_source_id: str, conn_raw_schema: list[SchemaColumn], ) -> str: - src_filename = dfile.s3_key if dfile.file_type == FileType.csv else src_source.s3_key + src_filename = dfile.s3_key_old if dfile.file_type == FileType.csv else src_source.s3_key tmp_s3_filename = _make_tmp_source_s3_filename(dst_source_id) diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/file_parser.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/file_parser.py index 8e1905675..0809f7c4f 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/file_parser.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/file_parser.py @@ -102,7 +102,7 @@ async def _make_sample_text(self) -> str: s3_resp = await self.s3.client.get_object( Bucket=self.s3.tmp_bucket_name, - Key=self.dfile.s3_key, + Key=self.dfile.s3_key_old, Range=f"bytes=0-{self.sample_size}", ) sample_bytes = await s3_resp["Body"].read() @@ -165,7 +165,7 @@ async def guess_header_and_schema( has_header = self.file_settings["first_line_is_header"] LOGGER.info(f"Overriding `has_header` with user defined: has_header={has_header}") - data_stream = await loop.run_in_executor(self.tpe, self._get_sync_s3_data_stream, self.dfile.s3_key) + data_stream = await loop.run_in_executor(self.tpe, self._get_sync_s3_data_stream, self.dfile.s3_key_old) has_header, raw_schema = await loop.run_in_executor( self.tpe, guess_header_and_schema, diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/conftest.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/conftest.py index b7941222e..1b5da0fee 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/conftest.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/conftest.py @@ -26,7 +26,7 @@ @pytest.fixture(scope="function") async def upload_file(s3_tmp_bucket, s3_persistent_bucket, s3_client, redis_model_manager): async def uploader(csv_data: bytes) -> DataFile: - data_file_desc = DataFile( + dfile = DataFile( manager=redis_model_manager, filename="test_file.csv", file_type=FileType.csv, @@ -36,12 +36,12 @@ async def uploader(csv_data: bytes) -> DataFile: await s3_client.put_object( ACL="private", Bucket=s3_tmp_bucket, - Key=data_file_desc.s3_key, + Key=dfile.s3_key_old, Body=csv_data, ) - await data_file_desc.save() - return data_file_desc + await dfile.save() + return dfile yield uploader @@ -96,7 +96,7 @@ async def uploaded_file_dt_id(uploaded_file_dt) -> str: async def uploaded_10mb_file_id(s3_tmp_bucket, s3_persistent_bucket, s3_client, redis_model_manager) -> str: csv_data = generate_sample_csv_data_str(row_count=10000, str_cols_count=30).encode("utf-8") - data_file_desc = DataFile( + dfile = DataFile( manager=redis_model_manager, filename="test_file_10mb.csv", file_type=FileType.csv, @@ -106,18 +106,18 @@ async def uploaded_10mb_file_id(s3_tmp_bucket, s3_persistent_bucket, s3_client, await s3_client.put_object( ACL="private", Bucket=s3_tmp_bucket, - Key=data_file_desc.s3_key, + Key=dfile.s3_key_old, Body=csv_data, ) - await data_file_desc.save() - yield data_file_desc.id + await dfile.save() + yield dfile.id @pytest.fixture(scope="function") async def uploaded_excel_file(s3_tmp_bucket, s3_persistent_bucket, s3_client, redis_model_manager): async def uploader(filename: str) -> DataFile: - data_file_desc = DataFile( + dfile = DataFile( manager=redis_model_manager, filename=filename, file_type=FileType.xlsx, @@ -131,12 +131,12 @@ async def uploader(filename: str) -> DataFile: await s3_client.put_object( ACL="private", Bucket=s3_tmp_bucket, - Key=data_file_desc.s3_key, + Key=dfile.s3_key_old, Body=fd.read(), ) - await data_file_desc.save() - return data_file_desc + await dfile.save() + return dfile yield uploader diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_tasks.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_tasks.py index 79eb74772..2a70c592c 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_tasks.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/db/test_tasks.py @@ -459,7 +459,7 @@ async def test_datetime64( ) rmm = redis_model_manager - df = DataFile( + dfile = DataFile( manager=rmm, filename="test_file.csv", file_type=FileType.csv, @@ -469,32 +469,32 @@ async def test_datetime64( await s3_client.put_object( ACL="private", Bucket=s3_tmp_bucket, - Key=df.s3_key, + Key=dfile.s3_key_old, Body=csv_data, ) - await df.save() + await dfile.save() - df = await DataFile.get(manager=rmm, obj_id=df.id) - assert df.status == FileProcessingStatus.in_progress + dfile = await DataFile.get(manager=rmm, obj_id=dfile.id) + assert dfile.status == FileProcessingStatus.in_progress - task = await task_processor_client.schedule(ParseFileTask(file_id=df.id)) + task = await task_processor_client.schedule(ParseFileTask(file_id=dfile.id)) result = await wait_task(task, task_state) assert result[-1] == "success" - df = await DataFile.get(manager=rmm, obj_id=df.id) - assert df.status == FileProcessingStatus.ready + dfile = await DataFile.get(manager=rmm, obj_id=dfile.id) + assert dfile.status == FileProcessingStatus.ready - assert len(df.sources) == 1 - source = df.sources[0] + assert len(dfile.sources) == 1 + source = dfile.sources[0] assert source.status == FileProcessingStatus.ready - conn = await create_file_connection(usm, df.id, source.id, source.raw_schema) + conn = await create_file_connection(usm, dfile.id, source.id, source.raw_schema) assert conn.get_file_source_by_id(source.id).status == FileProcessingStatus.in_progress task_save = await task_processor_client.schedule( SaveSourceTask( tenant_id="common", - file_id=df.id, + file_id=dfile.id, src_source_id=source.id, dst_source_id=source.id, connection_id=conn.uuid, @@ -526,7 +526,7 @@ async def test_datetime_tz( usm = default_async_usm_per_test rmm = redis_model_manager - df = DataFile( + dfile = DataFile( manager=rmm, filename="test_file.csv", file_type=FileType.csv, @@ -536,32 +536,32 @@ async def test_datetime_tz( await s3_client.put_object( ACL="private", Bucket=s3_tmp_bucket, - Key=df.s3_key, + Key=dfile.s3_key_old, Body=csv_data, ) - await df.save() + await dfile.save() - df = await DataFile.get(manager=rmm, obj_id=df.id) - assert df.status == FileProcessingStatus.in_progress + dfile = await DataFile.get(manager=rmm, obj_id=dfile.id) + assert dfile.status == FileProcessingStatus.in_progress - task = await task_processor_client.schedule(ParseFileTask(file_id=df.id)) + task = await task_processor_client.schedule(ParseFileTask(file_id=dfile.id)) result = await wait_task(task, task_state) assert result[-1] == "success" - df = await DataFile.get(manager=rmm, obj_id=df.id) - assert df.status == FileProcessingStatus.ready + dfile = await DataFile.get(manager=rmm, obj_id=dfile.id) + assert dfile.status == FileProcessingStatus.ready - assert len(df.sources) == 1 - source = df.sources[0] + assert len(dfile.sources) == 1 + source = dfile.sources[0] assert source.status == FileProcessingStatus.ready - conn = await create_file_connection(usm, df.id, source.id, source.raw_schema) + conn = await create_file_connection(usm, dfile.id, source.id, source.raw_schema) assert conn.get_file_source_by_id(source.id).status == FileProcessingStatus.in_progress task_save = await task_processor_client.schedule( SaveSourceTask( tenant_id="common", - file_id=df.id, + file_id=dfile.id, src_source_id=source.id, dst_source_id=source.id, connection_id=conn.uuid, @@ -625,7 +625,7 @@ async def test_too_many_columns_csv( csv_data = generate_sample_csv_data_str(3, 11).encode("utf-8") rmm = redis_model_manager - df = DataFile( + dfile = DataFile( manager=rmm, filename="too_many_columns.csv", file_type=FileType.csv, @@ -635,24 +635,24 @@ async def test_too_many_columns_csv( await s3_client.put_object( ACL="private", Bucket=s3_tmp_bucket, - Key=df.s3_key, + Key=dfile.s3_key_old, Body=csv_data, ) - await df.save() + await dfile.save() - df = await DataFile.get(manager=rmm, obj_id=df.id) - assert df.status == FileProcessingStatus.in_progress + dfile = await DataFile.get(manager=rmm, obj_id=dfile.id) + assert dfile.status == FileProcessingStatus.in_progress - task = await task_processor_client.schedule(ParseFileTask(file_id=df.id)) + task = await task_processor_client.schedule(ParseFileTask(file_id=dfile.id)) result = await wait_task(task, task_state) assert result[-1] == "success" - df = await DataFile.get(manager=rmm, obj_id=df.id) - assert df.status == FileProcessingStatus.failed - assert df.error is not None and df.error.code == exc.TooManyColumnsError.err_code + dfile = await DataFile.get(manager=rmm, obj_id=dfile.id) + assert dfile.status == FileProcessingStatus.failed + assert dfile.error is not None and dfile.error.code == exc.TooManyColumnsError.err_code - assert len(df.sources) == 1 - assert df.sources[0].error.code == exc.TooManyColumnsError.err_code + assert len(dfile.sources) == 1 + assert dfile.sources[0].error.code == exc.TooManyColumnsError.err_code @pytest.mark.asyncio diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocs.py index 69315c6e6..934e784a6 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocs.py @@ -20,17 +20,17 @@ async def test_download_yadocs_task( s3_tmp_bucket, reader_app, ): - df = DataFile( + dfile = DataFile( filename="", file_type=FileType.yadocs, manager=redis_model_manager, status=FileProcessingStatus.in_progress, user_source_properties=YaDocsUserSourceProperties(public_link="https://disk.yandex.lt/i/OyzdmFI0MUEEgA"), ) - await df.save() + await dfile.save() - task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=df.id, authorized=False)) + task = await task_processor_client.schedule(DownloadYaDocsTask(file_id=dfile.id, authorized=False)) result = await wait_task(task, task_state) assert result[-1] == "success" - assert await s3_file_exists(key=df.s3_key, bucket=s3_tmp_bucket, s3_client=s3_client) + assert await s3_file_exists(key=dfile.s3_key_old, bucket=s3_tmp_bucket, s3_client=s3_client) diff --git a/lib/dl_s3/dl_s3/s3_service.py b/lib/dl_s3/dl_s3/s3_service.py index 478159a9f..0894680b9 100644 --- a/lib/dl_s3/dl_s3/s3_service.py +++ b/lib/dl_s3/dl_s3/s3_service.py @@ -8,6 +8,7 @@ ) from aiobotocore.client import AioBaseClient +from aiobotocore.config import AioConfig from aiobotocore.session import get_session from aiohttp import web import attr @@ -59,6 +60,7 @@ async def initialize(self) -> None: aws_access_key_id=self._access_key_id, aws_secret_access_key=self._secret_access_key, endpoint_url=self._endpoint_url, + config=AioConfig(signature_version="s3v4") ) session = get_session()