diff --git a/tosfs/core.py b/tosfs/core.py index 293c27f..94f52cc 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -116,7 +116,7 @@ def __init__( default_fill_cache: bool = True, default_cache_type: str = "readahead", multipart_staging_dirs: str = tempfile.mkdtemp(), - multipart_size: int = 4096, + multipart_size: int = 8 << 20, multipart_thread_pool_size: int = max(2, os.cpu_count() or 1), multipart_staging_buffer_size: int = 4096, multipart_threshold: int = 10485760, @@ -169,15 +169,16 @@ def __init__( directory). Separate the staging dirs with comma if there are many staging dir paths. multipart_size : int, optional - The max byte size which will buffer the staging data in-memory before - flushing to the staging file. Decrease the random write in local staging - disk dramatically if writing plenty of small files. (default is 4096). + The multipart upload part size of the given object storage. + (default is 8MB). multipart_thread_pool_size : int, optional - The size of the thread pool for multipart uploads (default is - max(2, os.cpu_count()). - multipart_staging_buffer_size : int, optional The size of thread pool used for uploading multipart in parallel for the - given object storage. (default is 4096). + given object storage. (default is max(2, os.cpu_count()). + multipart_staging_buffer_size : int, optional + The max byte size which will buffer the staging data in-memory before + flushing to the staging file. It will decrease the random write in local + staging disk dramatically if writing plenty of small files. + (default is 4096). multipart_threshold : int, optional The threshold which control whether enable multipart upload during writing data to the given object storage, if the write data size is less @@ -2043,6 +2044,7 @@ def __init__( self.multipart_threshold = fs.multipart_threshold self.executor = ThreadPoolExecutor(max_workers=self.thread_pool_size) self.staging_files: list[str] = [] + self.staging_buffer: io.BytesIO = io.BytesIO() if "a" in mode and fs.exists(path): head = retryable_func_executor( @@ -2089,7 +2091,7 @@ def _upload_chunk(self, final: bool = False) -> bool: self.autocommit and not self.append_block and final - and self.tell() < self.blocksize + and self.tell() < max(self.blocksize, self.multipart_threshold) ): # only happens when closing small file, use one-shot PUT pass @@ -2133,12 +2135,27 @@ def _upload_multiple_chunks(self, bucket: str, key: str) -> None: if not chunk: break - staging_dir = next(self.staging_dirs) - with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp: - tmp.write(chunk) - self.staging_files.append(tmp.name) + self._write_to_staging_buffer(chunk) + + def _write_to_staging_buffer(self, chunk: bytes) -> None: + self.staging_buffer.write(chunk) + if self.staging_buffer.tell() >= self.staging_buffer_size: + self._flush_staging_buffer() + + def _flush_staging_buffer(self) -> None: + if self.staging_buffer.tell() == 0: + return + + self.staging_buffer.seek(0) + staging_dir = next(self.staging_dirs) + with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp: + tmp.write(self.staging_buffer.read()) + self.staging_files.append(tmp.name) + + self.staging_buffer = io.BytesIO() def _upload_staged_files(self) -> None: + self._flush_staging_buffer() futures = [] for i, staging_file in enumerate(self.staging_files): part_number = i + 1