Skip to content

Commit

Permalink
expose S3 multipart-threshold, and status-code for errors (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
graebm authored Nov 1, 2023
1 parent d62491f commit fbf82f2
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 77 deletions.
41 changes: 32 additions & 9 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,17 @@ class S3Client(NativeResource):
for each connection, unless `tls_mode` is :attr:`S3RequestTlsMode.DISABLED`
part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5MB.
(5*1024*1024 by default)
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5 MiB.
(8*1024*1024 by default)
throughput_target_gbps (Optional[float]): Throughput target in Gbps that we are trying to reach.
(5 Gbps by default)
multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads.
Uploads over this size will use the multipart upload strategy.
Uploads this size or less will use a single request.
If not set, `part_size` is used as the threshold.
throughput_target_gbps (Optional[float]): Throughput target in
Gigabits per second (Gbps) that we are trying to reach.
(10.0 Gbps by default)
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -156,6 +162,7 @@ def __init__(
credential_provider=None,
tls_connection_options=None,
part_size=None,
multipart_upload_threshold=None,
throughput_target_gbps=None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
Expand Down Expand Up @@ -193,6 +200,8 @@ def on_shutdown():
tls_mode = 0
if part_size is None:
part_size = 0
if multipart_upload_threshold is None:
multipart_upload_threshold = 0
if throughput_target_gbps is None:
throughput_target_gbps = 0

Expand All @@ -205,6 +214,7 @@ def on_shutdown():
region,
tls_mode,
part_size,
multipart_upload_threshold,
throughput_target_gbps,
s3_client_core)

Expand Down Expand Up @@ -287,10 +297,16 @@ def make_request(
failed because server side sent an unsuccessful response, the headers
of the response is provided here. Else None will be returned.
* `error_body` (Optional[Bytes]): If request failed because server
* `error_body` (Optional[bytes]): If request failed because server
side sent an unsuccessful response, the body of the response is
provided here. Else None will be returned.
* `status_code` (Optional[int]): HTTP response status code (if available).
If request failed because server side sent an unsuccessful response,
this is its status code. If the operation was successful,
this is the final response's status code. If the operation
failed for another reason, None is returned.
* `**kwargs` (dict): Forward-compatibility kwargs.
on_progress: Optional callback invoked when part of the transfer is done to report the progress.
Expand Down Expand Up @@ -461,19 +477,26 @@ def _on_body(self, chunk, offset):
def _on_shutdown(self):
self._shutdown_event.set()

def _on_finish(self, error_code, error_headers, error_body):
def _on_finish(self, error_code, status_code, error_headers, error_body):
# If C layer gives status_code 0, that means "unknown"
if status_code == 0:
status_code = None

error = None
if error_code:
error = awscrt.exceptions.from_code(error_code)
if error_body:
# TODO The error body is XML, will need to parse it to something prettier.
extra_message = ". Body from error request is: " + str(error_body)
error.message = error.message + extra_message
try:
extra_message = ". Body from error request is: " + str(error_body)
error.message = error.message + extra_message
except BaseException:
pass
self._finished_future.set_exception(error)
else:
self._finished_future.set_result(None)
if self._on_done_cb:
self._on_done_cb(error=error, error_headers=error_headers, error_body=error_body)
self._on_done_cb(error=error, error_headers=error_headers, error_body=error_body, status_code=status_code)

def _on_progress(self, progress):
if self._on_progress_cb:
Expand Down
25 changes: 14 additions & 11 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,20 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
double throughput_target_gbps; /* d */
PyObject *py_core; /* O */
PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
PyObject *py_core; /* O */
if (!PyArg_ParseTuple(
args,
"OOOOOs#iKdO",
"OOOOOs#iKKdO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -120,6 +121,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&region.len,
&tls_mode,
&part_size,
&multipart_upload_threshold,
&throughput_target_gbps,
&py_core)) {
return NULL;
Expand Down Expand Up @@ -185,6 +187,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.tls_mode = tls_mode,
.signing_config = signing_config,
.part_size = part_size,
.multipart_upload_threshold = multipart_upload_threshold,
.tls_connection_options = tls_options,
.throughput_target_gbps = throughput_target_gbps,
.shutdown_callback = s_s3_client_shutdown,
Expand Down
3 changes: 2 additions & 1 deletion source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ static void s_s3_request_on_finish(
result = PyObject_CallMethod(
request_binding->py_core,
"_on_finish",
"(iOy#)",
"(iiOy#)",
error_code,
meta_request_result->response_status,
header_list ? header_list : Py_None,
(const char *)(error_body.buffer),
(Py_ssize_t)error_body.len);
Expand Down
Loading

0 comments on commit fbf82f2

Please sign in to comment.