From 8646b1ff835e641ddaa690024bd7e61eccc0a229 Mon Sep 17 00:00:00 2001 From: vinoyang Date: Wed, 25 Sep 2024 11:03:48 +0800 Subject: [PATCH] Add test cases for stability (#129) --- tosfs/mpu.py | 23 ++++++++++----- tosfs/retry.py | 13 +++++++-- tosfs/tests/test_retry.py | 34 +++++++++++++--------- tosfs/tests/test_stability.py | 55 +++++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 24 deletions(-) create mode 100644 tosfs/tests/test_stability.py diff --git a/tosfs/mpu.py b/tosfs/mpu.py index 7719e7c..10b03e7 100644 --- a/tosfs/mpu.py +++ b/tosfs/mpu.py @@ -77,9 +77,9 @@ def upload_multiple_chunks(self, buffer: Optional[io.BytesIO]) -> None: def _write_to_staging_buffer(self, chunk: bytes) -> None: self.staging_buffer.write(chunk) if self.staging_buffer.tell() >= self.part_size: - self._flush_staging_buffer() + self._flush_staging_buffer(False) - def _flush_staging_buffer(self) -> None: + def _flush_staging_buffer(self, final: bool = False) -> None: if self.staging_buffer.tell() == 0: return @@ -93,13 +93,22 @@ def _flush_staging_buffer(self) -> None: self.staging_files.append(tmp.name) buffer_size -= self.part_size - # Move remaining data to a new buffer - remaining_data = self.staging_buffer.read() - self.staging_buffer = io.BytesIO() - self.staging_buffer.write(remaining_data) + if not final: + # Move remaining data to a new buffer + remaining_data = self.staging_buffer.read() + self.staging_buffer = io.BytesIO() + self.staging_buffer.write(remaining_data) + else: + staging_dir = next(self.staging_dirs) + with tempfile.NamedTemporaryFile(delete=False, dir=staging_dir) as tmp: + tmp.write(self.staging_buffer.read()) + self.staging_files.append(tmp.name) + buffer_size -= self.part_size + + self.staging_buffer = io.BytesIO() def _upload_staged_files(self) -> None: - self._flush_staging_buffer() + self._flush_staging_buffer(True) futures = [] for i, staging_file in enumerate(self.staging_files): part_number = i + 1 diff --git a/tosfs/retry.py b/tosfs/retry.py index fc0d5f2..7879ead 100644 --- a/tosfs/retry.py +++ b/tosfs/retry.py @@ -18,6 +18,7 @@ from typing import Any, Optional, Tuple import requests +from requests import RequestException from requests.exceptions import ( ChunkedEncodingError, ConnectTimeout, @@ -70,6 +71,7 @@ ConnectionResetError, ConnectionError, ChunkedEncodingError, + RequestException, } MAX_RETRY_NUM = 20 @@ -142,9 +144,14 @@ def _is_retryable_tos_server_exception(e: TosError) -> bool: def _is_retryable_tos_client_exception(e: TosError) -> bool: - return isinstance(e, TosClientError) and any( - isinstance(e.cause, excp) for excp in TOS_CLIENT_RETRYABLE_EXCEPTIONS - ) + if isinstance(e, TosClientError): + cause = e.cause + while cause is not None: + for excp in TOS_CLIENT_RETRYABLE_EXCEPTIONS: + if isinstance(cause, excp): + return True + cause = getattr(cause, "cause", None) + return False def _get_sleep_time(err: TosError, retry_count: int) -> float: diff --git a/tosfs/tests/test_retry.py b/tosfs/tests/test_retry.py index ab13630..6cb0a25 100644 --- a/tosfs/tests/test_retry.py +++ b/tosfs/tests/test_retry.py @@ -12,20 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# ByteDance Volcengine EMR, Copyright 2024. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - from unittest.mock import Mock import pytest @@ -73,6 +59,26 @@ ), True, ), + ( + TosClientError( + "{'message': 'http request timeout', " + "'case': \"('Connection aborted.', " + "ConnectionResetError(104, 'Connection reset by peer'))\", " + "'request_url': " + "'http://proton-ci.tos-cn-beijing.volces.com/" + "nHnbR/yAlen'}", + TosClientError( + "http request timeout", + ConnectionError( + ProtocolError( + "Connection aborted.", + ConnectionResetError(104, "Connection reset by peer"), + ) + ), + ), + ), + True, + ), ], ) def test_is_retry_exception( diff --git a/tosfs/tests/test_stability.py b/tosfs/tests/test_stability.py new file mode 100644 index 0000000..140f196 --- /dev/null +++ b/tosfs/tests/test_stability.py @@ -0,0 +1,55 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import sleep + +from tosfs.utils import random_str + + +def test_write_breakpoint_continuation(tosfs, bucket, temporary_workspace): + file_name = f"{random_str()}.txt" + first_part = random_str(10 * 1024 * 1024) + second_part = random_str(10 * 1024 * 1024) + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f: + f.write(first_part) + # mock a very long block(business processing or network issue) + sleep(60) + f.write(second_part) + + assert tosfs.info(f"{bucket}/{temporary_workspace}/{file_name}")["size"] == len( + first_part + second_part + ) + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + assert f.read() == first_part + second_part + + +def test_read_breakpoint_continuation(tosfs, bucket, temporary_workspace): + file_name = f"{random_str()}.txt" + first_part = random_str(10 * 1024 * 1024) + second_part = random_str(10 * 1024 * 1024) + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f: + f.write(first_part) + f.write(second_part) + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + read_first_part = f.read(10 * 1024 * 1024) + assert read_first_part == first_part + # mock a very long block(business processing or network issue) + sleep(60) + read_second_part = f.read(10 * 1024 * 1024) + assert read_second_part == second_part + assert read_first_part + read_second_part == first_part + second_part