Skip to content

Commit

Permalink
Optimize: Introduce multiple disk write for MPU
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 20, 2024
1 parent a229c49 commit 2d81dda
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(
default_fill_cache: bool = True,
default_cache_type: str = "readahead",
multipart_staging_dirs: str = tempfile.mkdtemp(),
multipart_size: int = 4096,
multipart_size: int = 8 << 20,
multipart_thread_pool_size: int = max(2, os.cpu_count() or 1),
multipart_staging_buffer_size: int = 4096,
multipart_threshold: int = 10485760,
Expand Down Expand Up @@ -169,15 +169,16 @@ def __init__(
directory). Separate the staging dirs with comma if there are many
staging dir paths.
multipart_size : int, optional
The max byte size which will buffer the staging data in-memory before
flushing to the staging file. Decrease the random write in local staging
disk dramatically if writing plenty of small files. (default is 4096).
The multipart upload part size of the given object storage.
(default is 8MB).
multipart_thread_pool_size : int, optional
The size of the thread pool for multipart uploads (default is
max(2, os.cpu_count()).
multipart_staging_buffer_size : int, optional
The size of thread pool used for uploading multipart in parallel for the
given object storage. (default is 4096).
given object storage. (default is max(2, os.cpu_count()).
multipart_staging_buffer_size : int, optional
The max byte size which will buffer the staging data in-memory before
flushing to the staging file. It will decrease the random write in local
staging disk dramatically if writing plenty of small files.
(default is 4096).
multipart_threshold : int, optional
The threshold which control whether enable multipart upload during
writing data to the given object storage, if the write data size is less
Expand Down Expand Up @@ -2043,6 +2044,7 @@ def __init__(
self.multipart_threshold = fs.multipart_threshold
self.executor = ThreadPoolExecutor(max_workers=self.thread_pool_size)
self.staging_files: list[str] = []
self.staging_buffer: io.BytesIO = io.BytesIO()

if "a" in mode and fs.exists(path):
head = retryable_func_executor(
Expand Down Expand Up @@ -2089,7 +2091,7 @@ def _upload_chunk(self, final: bool = False) -> bool:
self.autocommit
and not self.append_block
and final
and self.tell() < self.blocksize
and self.tell() < max(self.blocksize, self.multipart_threshold)
):
# only happens when closing small file, use one-shot PUT
pass
Expand Down Expand Up @@ -2133,12 +2135,27 @@ def _upload_multiple_chunks(self, bucket: str, key: str) -> None:
if not chunk:
break

staging_dir = next(self.staging_dirs)
with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp:
tmp.write(chunk)
self.staging_files.append(tmp.name)
self._write_to_staging_buffer(chunk)

def _write_to_staging_buffer(self, chunk: bytes) -> None:
self.staging_buffer.write(chunk)
if self.staging_buffer.tell() >= self.staging_buffer_size:
self._flush_staging_buffer()

def _flush_staging_buffer(self) -> None:
if self.staging_buffer.tell() == 0:
return

self.staging_buffer.seek(0)
staging_dir = next(self.staging_dirs)
with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp:
tmp.write(self.staging_buffer.read())
self.staging_files.append(tmp.name)

self.staging_buffer = io.BytesIO()

def _upload_staged_files(self) -> None:
self._flush_staging_buffer()
futures = []
for i, staging_file in enumerate(self.staging_files):
part_number = i + 1
Expand Down

0 comments on commit 2d81dda

Please sign in to comment.