Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the binding to use features from aws-c-s3 #509

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 21 additions & 187 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ struct s3_meta_request_binding {
**/
FILE *recv_file;

struct aws_http_message *copied_message;

/* Batch up the transferred size in one sec. */
uint64_t size_transferred;
/* The time stamp when the progress reported */
Expand All @@ -47,9 +45,6 @@ static void s_destroy(struct s3_meta_request_binding *meta_request) {
if (meta_request->recv_file) {
fclose(meta_request->recv_file);
}
if (meta_request->copied_message) {
aws_http_message_release(meta_request->copied_message);
}
Py_XDECREF(meta_request->py_core);
aws_mem_release(aws_py_get_allocator(), meta_request);
}
Expand Down Expand Up @@ -122,6 +117,7 @@ static int s_s3_request_on_headers(
}
}

/* To avoid reporting progress to python too often. We cache it up and only report to python after at least 1 sec. */
static int s_record_progress(struct s3_meta_request_binding *request_binding, uint64_t length, bool *report_progress) {
if (aws_add_u64_checked(request_binding->size_transferred, length, &request_binding->size_transferred)) {
/* Wow */
graebm marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -151,10 +147,6 @@ static int s_s3_request_on_body(
(void)meta_request;
struct s3_meta_request_binding *request_binding = user_data;

bool report_progress;
if (s_record_progress(request_binding, (uint64_t)body->len, &report_progress)) {
return AWS_OP_ERR;
}
if (request_binding->recv_file) {
/* The callback will be invoked with the right order, so we don't need to seek first. */
if (fwrite((void *)body->ptr, body->len, 1, request_binding->recv_file) < 1) {
Expand All @@ -168,9 +160,7 @@ static int s_s3_request_on_body(
aws_error_name(aws_last_error()));
return AWS_OP_ERR;
}
if (!report_progress) {
return AWS_OP_SUCCESS;
}
return AWS_OP_SUCCESS;
}
bool error = true;
/*************** GIL ACQUIRE ***************/
Expand All @@ -179,32 +169,15 @@ static int s_s3_request_on_body(
if (aws_py_gilstate_ensure(&state)) {
return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */
}
if (!request_binding->recv_file) {
result = PyObject_CallMethod(
request_binding->py_core,
"_on_body",
"(y#K)",
(const char *)(body->ptr),
(Py_ssize_t)body->len,
range_start);

if (!result) {
PyErr_WriteUnraisable(request_binding->py_core);
goto done;
}
Py_DECREF(result);
}
if (report_progress) {
/* Hold the GIL before enterring here */
result =
PyObject_CallMethod(request_binding->py_core, "_on_progress", "(K)", request_binding->size_transferred);
if (!result) {
PyErr_WriteUnraisable(request_binding->py_core);
} else {
Py_DECREF(result);
}
request_binding->size_transferred = 0;
result = PyObject_CallMethod(
request_binding->py_core, "_on_body", "(y#K)", (const char *)(body->ptr), (Py_ssize_t)body->len, range_start);

if (!result) {
PyErr_WriteUnraisable(request_binding->py_core);
goto done;
}
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
Py_DECREF(result);
error = false;
done:
PyGILState_Release(state);
Expand Down Expand Up @@ -252,8 +225,6 @@ static void s_s3_request_on_finish(
PyObject *header_list = NULL;
PyObject *result = NULL;

request_binding->copied_message = aws_http_message_release(request_binding->copied_message);

if (request_binding->size_transferred && (error_code == 0)) {
/* report the remaining progress */
result =
Expand Down Expand Up @@ -343,39 +314,21 @@ static void s_s3_request_on_shutdown(void *user_data) {
/*************** GIL RELEASE ***************/
}

/*
* file-based python input stream for reporting the progress
*/
struct aws_input_py_stream_file_impl {
struct aws_input_stream base;
struct aws_input_stream *actual_stream;
struct s3_meta_request_binding *binding;
};

static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
size_t pre_len = dest->len;

if (aws_input_stream_read(impl->actual_stream, dest)) {
return AWS_OP_ERR;
}
static void s_s3_request_on_progress(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_progress *progress,
void *user_data) {

size_t actually_read = 0;
if (aws_sub_size_checked(dest->len, pre_len, &actually_read)) {
return AWS_OP_ERR;
}
struct s3_meta_request_binding *request_binding = user_data;

bool report_progress;
struct s3_meta_request_binding *request_binding = impl->binding;
if (s_record_progress(request_binding, (uint64_t)actually_read, &report_progress)) {
return AWS_OP_ERR;
}
bool report_progress = false;
s_record_progress(request_binding, progress->bytes_transferred, &report_progress);

if (report_progress) {
/*************** GIL ACQUIRE ***************/
PyGILState_STATE state;
if (aws_py_gilstate_ensure(&state)) {
return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */
return; /* Python has shut down. Nothing matters anymore, but don't crash */
}
PyObject *result =
PyObject_CallMethod(request_binding->py_core, "_on_progress", "(K)", request_binding->size_transferred);
Expand All @@ -385,113 +338,7 @@ static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct
request_binding->size_transferred = 0;
PyGILState_Release(state);
/*************** GIL RELEASE ***************/
if (!result) {
return aws_py_raise_error();
}
}
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_file_seek(
struct aws_input_stream *stream,
int64_t offset,
enum aws_stream_seek_basis basis) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
return aws_input_stream_seek(impl->actual_stream, offset, basis);
}

static int s_aws_input_stream_file_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
return aws_input_stream_get_status(impl->actual_stream, status);
}

static int s_aws_input_stream_file_get_length(struct aws_input_stream *stream, int64_t *length) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
return aws_input_stream_get_length(impl->actual_stream, length);
}

static void s_aws_input_stream_file_destroy(struct aws_input_py_stream_file_impl *impl) {
struct aws_allocator *allocator = aws_py_get_allocator();
aws_input_stream_release(impl->actual_stream);
aws_mem_release(allocator, impl);
}

static struct aws_input_stream_vtable s_aws_input_stream_file_vtable = {
.seek = s_aws_input_stream_file_seek,
.read = s_aws_input_stream_file_read,
.get_status = s_aws_input_stream_file_get_status,
.get_length = s_aws_input_stream_file_get_length,
};

static struct aws_input_stream *s_input_stream_new_from_file(
struct aws_allocator *allocator,
const char *file_name,
struct s3_meta_request_binding *request_binding) {
struct aws_input_py_stream_file_impl *impl =
aws_mem_calloc(allocator, 1, sizeof(struct aws_input_py_stream_file_impl));

impl->base.vtable = &s_aws_input_stream_file_vtable;
aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_file_destroy);

impl->actual_stream = aws_input_stream_new_from_file(allocator, file_name);
if (!impl->actual_stream) {
aws_mem_release(allocator, impl);
return NULL;
}
impl->binding = request_binding;

return &impl->base;
}

/* Copy an existing HTTP message without body. */
struct aws_http_message *s_copy_http_message(struct aws_allocator *allocator, struct aws_http_message *base_message) {
AWS_PRECONDITION(allocator);
AWS_PRECONDITION(base_message);

struct aws_http_message *message = aws_http_message_new_request(allocator);

if (message == NULL) {
return NULL;
}

struct aws_byte_cursor request_method;
if (aws_http_message_get_request_method(base_message, &request_method)) {
goto error_clean_up;
}

if (aws_http_message_set_request_method(message, request_method)) {
goto error_clean_up;
}

struct aws_byte_cursor request_path;
if (aws_http_message_get_request_path(base_message, &request_path)) {
goto error_clean_up;
}

if (aws_http_message_set_request_path(message, request_path)) {
goto error_clean_up;
}

size_t num_headers = aws_http_message_get_header_count(base_message);
for (size_t header_index = 0; header_index < num_headers; ++header_index) {
struct aws_http_header header;
if (aws_http_message_get_header(base_message, &header, header_index)) {
goto error_clean_up;
}
if (aws_http_message_add_header(message, header)) {
goto error_clean_up;
}
}

return message;

error_clean_up:

if (message != NULL) {
aws_http_message_release(message);
message = NULL;
}

return NULL;
}

PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
Expand Down Expand Up @@ -579,37 +426,24 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
Py_INCREF(meta_request->py_core);

if (recv_filepath) {
meta_request->recv_file = aws_fopen(recv_filepath, "wb+");
meta_request->recv_file = aws_fopen(recv_filepath, "wb");
if (!meta_request->recv_file) {
aws_translate_and_raise_io_error(errno);
PyErr_SetAwsLastError();
goto error;
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (send_filepath) {
if (type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
/* Copy the http request from python object and replace the old pointer with new pointer */
meta_request->copied_message = s_copy_http_message(allocator, http_request);
struct aws_input_stream *input_body = s_input_stream_new_from_file(allocator, send_filepath, meta_request);
if (!input_body) {
PyErr_SetAwsLastError();
goto error;
}
/* rewrite the input stream of the original request */
aws_http_message_set_body_stream(meta_request->copied_message, input_body);
/* Input body is owned by copied message */
aws_input_stream_release(input_body);
}
}

struct aws_s3_meta_request_options s3_meta_request_opt = {
.type = type,
.message = meta_request->copied_message ? meta_request->copied_message : http_request,
.message = http_request,
.signing_config = signing_config,
.send_filepath = aws_byte_cursor_from_c_str(send_filepath),
.headers_callback = s_s3_request_on_headers,
.body_callback = s_s3_request_on_body,
.finish_callback = s_s3_request_on_finish,
.shutdown_callback = s_s3_request_on_shutdown,
.progress_callback = s_s3_request_on_progress,
.user_data = meta_request,
};

Expand Down
Loading
Loading