From df88b1c2f5d3242138d59c503715b942682d2017 Mon Sep 17 00:00:00 2001 From: yanghua Date: Tue, 24 Sep 2024 16:39:12 +0800 Subject: [PATCH] Bug: Part is not full when multiple write --- tosfs/mpu.py | 17 ++++++++++++----- tosfs/tests/test_tosfs.py | 12 ++++++++---- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/tosfs/mpu.py b/tosfs/mpu.py index 0fcef97..7719e7c 100644 --- a/tosfs/mpu.py +++ b/tosfs/mpu.py @@ -76,20 +76,27 @@ def upload_multiple_chunks(self, buffer: Optional[io.BytesIO]) -> None: def _write_to_staging_buffer(self, chunk: bytes) -> None: self.staging_buffer.write(chunk) - if self.staging_buffer.tell() >= self.staging_buffer_size: + if self.staging_buffer.tell() >= self.part_size: self._flush_staging_buffer() def _flush_staging_buffer(self) -> None: if self.staging_buffer.tell() == 0: return + buffer_size = self.staging_buffer.tell() self.staging_buffer.seek(0) - staging_dir = next(self.staging_dirs) - with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp: - tmp.write(self.staging_buffer.read()) - self.staging_files.append(tmp.name) + while buffer_size >= self.part_size: + staging_dir = next(self.staging_dirs) + with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp: + tmp.write(self.staging_buffer.read()) + self.staging_files.append(tmp.name) + buffer_size -= self.part_size + + # Move remaining data to a new buffer + remaining_data = self.staging_buffer.read() self.staging_buffer = io.BytesIO() + self.staging_buffer.write(remaining_data) def _upload_staged_files(self) -> None: self._flush_staging_buffer() diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index 5f1bd31..f24f8e1 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -767,19 +767,23 @@ def test_file_write_mpu( file_name = random_str() # mock a content let the write logic trigger mpu: - content = random_str(13 * 1024 * 1024) + first_part = random_str(5 * 1024 * 1024) + second_part = random_str(5 * 1024 * 1024) + third_part = random_str(3 * 1024 * 1024) block_size = 4 * 1024 * 1024 with tosfs.open( f"{bucket}/{temporary_workspace}/{file_name}", "w", block_size=block_size ) as f: - f.write(content) + f.write(first_part) + f.write(second_part) + f.write(third_part) assert tosfs.info(f"{bucket}/{temporary_workspace}/{file_name}")["size"] == len( - content + first_part + second_part + third_part ) with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: - assert f.read() == content + assert f.read() == first_part + second_part + third_part def test_file_write_mpu_threshold_check(