Skip to content

Commit

Permalink
Bug: Part is not full when multiple write (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored Sep 25, 2024
1 parent be549f3 commit 81df130
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
17 changes: 12 additions & 5 deletions tosfs/mpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,27 @@ def upload_multiple_chunks(self, buffer: Optional[io.BytesIO]) -> None:

def _write_to_staging_buffer(self, chunk: bytes) -> None:
self.staging_buffer.write(chunk)
if self.staging_buffer.tell() >= self.staging_buffer_size:
if self.staging_buffer.tell() >= self.part_size:
self._flush_staging_buffer()

def _flush_staging_buffer(self) -> None:
if self.staging_buffer.tell() == 0:
return

buffer_size = self.staging_buffer.tell()
self.staging_buffer.seek(0)
staging_dir = next(self.staging_dirs)
with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp:
tmp.write(self.staging_buffer.read())
self.staging_files.append(tmp.name)

while buffer_size >= self.part_size:
staging_dir = next(self.staging_dirs)
with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp:
tmp.write(self.staging_buffer.read())
self.staging_files.append(tmp.name)
buffer_size -= self.part_size

# Move remaining data to a new buffer
remaining_data = self.staging_buffer.read()
self.staging_buffer = io.BytesIO()
self.staging_buffer.write(remaining_data)

def _upload_staged_files(self) -> None:
self._flush_staging_buffer()
Expand Down
12 changes: 8 additions & 4 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,19 +767,23 @@ def test_file_write_mpu(
file_name = random_str()

# mock a content let the write logic trigger mpu:
content = random_str(13 * 1024 * 1024)
first_part = random_str(5 * 1024 * 1024)
second_part = random_str(5 * 1024 * 1024)
third_part = random_str(3 * 1024 * 1024)
block_size = 4 * 1024 * 1024
with tosfs.open(
f"{bucket}/{temporary_workspace}/{file_name}", "w", block_size=block_size
) as f:
f.write(content)
f.write(first_part)
f.write(second_part)
f.write(third_part)

assert tosfs.info(f"{bucket}/{temporary_workspace}/{file_name}")["size"] == len(
content
first_part + second_part + third_part
)

with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f:
assert f.read() == content
assert f.read() == first_part + second_part + third_part


def test_file_write_mpu_threshold_check(
Expand Down

0 comments on commit 81df130

Please sign in to comment.