From b621bae660ac380fe41bb71f6578fcddfe67aa5a Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 20 Sep 2024 11:10:29 +0800 Subject: [PATCH] Optimize: Introduce multiple disk write for MPU --- tosfs/core.py | 114 +++++++++++++++++--------------------------------- 1 file changed, 38 insertions(+), 76 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index d202204..7b3d5b6 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -2102,82 +2102,6 @@ def _upload_chunk(self, final: bool = False) -> bool: return not final - def _fetch_range(self, start: int, end: int) -> bytes: - if start == end: - logger.debug( - "skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d", - self.bucket, - self.key, - start, - end, - ) - return b"" - logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end) - - def fetch() -> bytes: - temp_buffer = io.BytesIO() - for chunk in self.fs.tos_client.get_object( - self.bucket, self.key, self.version_id, range_start=start, range_end=end - ): - temp_buffer.write(chunk) - temp_buffer.seek(0) - return temp_buffer.read() - - return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num) - - # def commit(self) -> None: - # """Complete multipart upload or PUT.""" - # logger.debug("Commit %s", self) - # if self.tell() == 0: - # if self.buffer is not None: - # logger.debug("Empty file committed %s", self) - # self._abort_mpu() - # self.fs.touch(self.path, **self.kwargs) - # elif not self.parts: - # if self.buffer is not None: - # logger.debug("One-shot upload of %s", self) - # self.buffer.seek(0) - # data = self.buffer.read() - # write_result = retryable_func_executor( - # lambda: self.fs.tos_client.put_object( - # self.bucket, self.key, content=data - # ), - # max_retry_num=self.fs.max_retry_num, - # ) - # else: - # raise RuntimeError - # else: - # logger.debug("Complete multi-part upload for %s ", self) - # write_result = retryable_func_executor( - # lambda: self.fs.tos_client.complete_multipart_upload( - # self.bucket, - # self.key, - # upload_id=self.mpu.upload_id, - # parts=self.parts, - # ), - # max_retry_num=self.fs.max_retry_num, - # ) - # - # if self.fs.version_aware: - # self.version_id = write_result.version_id - # - # self.buffer = None - - def discard(self) -> None: - """Close the file without writing.""" - self._abort_mpu() - self.buffer = None # file becomes unusable - - def _abort_mpu(self) -> None: - if self.mpu: - retryable_func_executor( - lambda: self.fs.tos_client.abort_multipart_upload( - self.bucket, self.key, self.mpu.upload_id - ), - max_retry_num=self.fs.max_retry_num, - ) - self.mpu = None - def _upload_multiple_chunks(self, bucket: str, key: str) -> None: if self.buffer: self.buffer.seek(0) @@ -2230,6 +2154,44 @@ def _upload_part_from_file(self, staging_file: str, part_number: int) -> PartInf is_completed=None, ) + def _fetch_range(self, start: int, end: int) -> bytes: + if start == end: + logger.debug( + "skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d", + self.bucket, + self.key, + start, + end, + ) + return b"" + logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end) + + def fetch() -> bytes: + temp_buffer = io.BytesIO() + for chunk in self.fs.tos_client.get_object( + self.bucket, self.key, self.version_id, range_start=start, range_end=end + ): + temp_buffer.write(chunk) + temp_buffer.seek(0) + return temp_buffer.read() + + return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num) + + def discard(self) -> None: + """Close the file without writing.""" + self._abort_mpu() + self.buffer = None # file becomes unusable + + def _abort_mpu(self) -> None: + if self.mpu: + retryable_func_executor( + lambda: self.fs.tos_client.abort_multipart_upload( + self.bucket, self.key, self.mpu.upload_id + ), + max_retry_num=self.fs.max_retry_num, + ) + self.mpu = None + def commit(self) -> None: """Complete multipart upload or PUT.""" logger.debug("Commit %s", self)