diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 0fcf32f08..9738eb180 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -59,7 +59,7 @@ struct aws_s3_meta_request_event { enum aws_s3_meta_request_event_type { AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */ AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */ - /* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */ + AWS_S3_META_REQUEST_EVENT_TELEMETRY, /* telemetry_callback */ } type; union { @@ -72,6 +72,11 @@ struct aws_s3_meta_request_event { struct { struct aws_s3_meta_request_progress info; } progress; + + /* data for AWS_S3_META_REQUEST_EVENT_TELEMETRY */ + struct { + struct aws_s3_request_metrics *metrics; + } telemetry; } u; }; diff --git a/include/aws/s3/private/s3_util.h b/include/aws/s3/private/s3_util.h index f92c17d79..67f7be7d8 100644 --- a/include/aws/s3/private/s3_util.h +++ b/include/aws/s3/private/s3_util.h @@ -34,6 +34,8 @@ struct aws_http_stream; struct aws_http_headers; struct aws_http_message; struct aws_s3_client; +struct aws_s3_request; +struct aws_s3_meta_request; struct aws_cached_signing_config_aws { struct aws_allocator *allocator; @@ -276,6 +278,9 @@ void aws_s3_get_part_range( AWS_S3_API int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor error_code_string); +AWS_S3_API +void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request); + AWS_EXTERN_C_END #endif /* AWS_S3_UTIL_H */ diff --git a/include/aws/s3/s3_client.h b/include/aws/s3/s3_client.h index 091b6d871..baa5faff0 100644 --- a/include/aws/s3/s3_client.h +++ b/include/aws/s3/s3_client.h @@ -209,8 +209,7 @@ typedef void(aws_s3_meta_request_progress_fn)( void *user_data); /** - * Invoked to report the telemetry of the meta request once a single request finishes. Invoked from the thread of the - * connection that request made from. + * Invoked to report the telemetry of the meta request once a single request finishes. * Note: *metrics is only valid for the duration of the callback. If you need to keep it around, use * `aws_s3_request_metrics_acquire` */ @@ -690,9 +689,6 @@ struct aws_s3_meta_request_options { * If set the request will keep track of the metrics from `aws_s3_request_metrics`, and fire the callback when the * request finishes receiving response. * See `aws_s3_meta_request_telemetry_fn` - * - * Notes: - * - The callback will be invoked multiple times from different threads. */ aws_s3_meta_request_telemetry_fn *telemetry_callback; diff --git a/source/s3_auto_ranged_get.c b/source/s3_auto_ranged_get.c index 7e9c2b27f..3366322eb 100644 --- a/source/s3_auto_ranged_get.c +++ b/source/s3_auto_ranged_get.c @@ -673,6 +673,7 @@ static void s_s3_auto_ranged_get_request_finished( /* BEGIN CRITICAL SECTION */ { aws_s3_meta_request_lock_synced_data(meta_request); + bool finishing_metrics = true; /* If the object range was found, then record it. */ if (found_object_size) { @@ -722,6 +723,8 @@ static void s_s3_auto_ranged_get_request_finished( } aws_s3_meta_request_stream_response_body_synced(meta_request, request); + /* The body of the request is queued to be streamed, don't finish the metrics yet. */ + finishing_metrics = false; AWS_LOGF_DEBUG( AWS_LS_S3_META_REQUEST, @@ -756,7 +759,9 @@ static void s_s3_auto_ranged_get_request_finished( meta_request->synced_data.finish_result.validation_algorithm = request->validation_algorithm; } } - + if (finishing_metrics) { + aws_s3_request_finish_up_metrics_synced(request, meta_request); + } aws_s3_meta_request_unlock_synced_data(meta_request); } /* END CRITICAL SECTION */ diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index b43e3f1a1..e0f7ef972 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -1334,95 +1334,88 @@ static void s_s3_auto_ranged_put_request_finished( AWS_PRECONDITION(request); struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; + aws_s3_meta_request_lock_synced_data(meta_request); switch (request->request_tag) { case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_LIST_PARTS: { - /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); - bool has_more_results = false; + bool has_more_results = false; - if (error_code == AWS_ERROR_SUCCESS) { + if (error_code == AWS_ERROR_SUCCESS) { - struct aws_byte_cursor body_cursor = aws_byte_cursor_from_buf(&request->send_data.response_body); - /* Clear the token before */ - aws_string_destroy(auto_ranged_put->synced_data.list_parts_continuation_token); - auto_ranged_put->synced_data.list_parts_continuation_token = NULL; - if (aws_s3_paginated_operation_on_response( - auto_ranged_put->synced_data.list_parts_operation, - &body_cursor, - &auto_ranged_put->synced_data.list_parts_continuation_token, - &has_more_results)) { - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request); - error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED; - } else if (!has_more_results) { - uint64_t bytes_previously_uploaded = 0; - int parts_previously_uploaded = 0; - - for (size_t part_index = 0; - part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list); - part_index++) { - struct aws_s3_mpu_part_info *part = NULL; - aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); - if (part != NULL) { - /* Update the number of parts sent/completed previously */ - ++parts_previously_uploaded; - bytes_previously_uploaded += part->size; - } + struct aws_byte_cursor body_cursor = aws_byte_cursor_from_buf(&request->send_data.response_body); + /* Clear the token before */ + aws_string_destroy(auto_ranged_put->synced_data.list_parts_continuation_token); + auto_ranged_put->synced_data.list_parts_continuation_token = NULL; + if (aws_s3_paginated_operation_on_response( + auto_ranged_put->synced_data.list_parts_operation, + &body_cursor, + &auto_ranged_put->synced_data.list_parts_continuation_token, + &has_more_results)) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request); + error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED; + } else if (!has_more_results) { + uint64_t bytes_previously_uploaded = 0; + int parts_previously_uploaded = 0; + + for (size_t part_index = 0; + part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list); + part_index++) { + struct aws_s3_mpu_part_info *part = NULL; + aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); + if (part != NULL) { + /* Update the number of parts sent/completed previously */ + ++parts_previously_uploaded; + bytes_previously_uploaded += part->size; } + } - AWS_LOGF_DEBUG( - AWS_LS_S3_META_REQUEST, - "id=%p: Resuming PutObject. %d out of %d parts have completed during previous request.", - (void *)meta_request, - parts_previously_uploaded, - auto_ranged_put->total_num_parts_from_content_length); - - /* Deliver an initial progress_callback to report all previously uploaded parts. */ - if (meta_request->progress_callback != NULL && bytes_previously_uploaded > 0) { - struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; - event.u.progress.info.bytes_transferred = bytes_previously_uploaded; - event.u.progress.info.content_length = auto_ranged_put->content_length; - aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); - } + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: Resuming PutObject. %d out of %d parts have completed during previous request.", + (void *)meta_request, + parts_previously_uploaded, + auto_ranged_put->total_num_parts_from_content_length); + + /* Deliver an initial progress_callback to report all previously uploaded parts. */ + if (meta_request->progress_callback != NULL && bytes_previously_uploaded > 0) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + event.u.progress.info.bytes_transferred = bytes_previously_uploaded; + event.u.progress.info.content_length = auto_ranged_put->content_length; + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); } } + } - if (has_more_results) { - /* If list parts has more result, make sure list parts continues */ - auto_ranged_put->synced_data.list_parts_state.continues = true; - auto_ranged_put->synced_data.list_parts_state.completed = false; - } else { - /* No more result, complete the list parts */ - auto_ranged_put->synced_data.list_parts_state.continues = false; - auto_ranged_put->synced_data.list_parts_state.completed = true; - } - auto_ranged_put->synced_data.list_parts_error_code = error_code; + if (has_more_results) { + /* If list parts has more result, make sure list parts continues */ + auto_ranged_put->synced_data.list_parts_state.continues = true; + auto_ranged_put->synced_data.list_parts_state.completed = false; + } else { + /* No more result, complete the list parts */ + auto_ranged_put->synced_data.list_parts_state.continues = false; + auto_ranged_put->synced_data.list_parts_state.completed = true; + } + auto_ranged_put->synced_data.list_parts_error_code = error_code; - if (error_code != AWS_ERROR_SUCCESS) { - if (request->send_data.response_status == AWS_HTTP_STATUS_CODE_404_NOT_FOUND && - auto_ranged_put->resume_token->num_parts_completed == - auto_ranged_put->resume_token->total_num_parts) { - AWS_LOGF_DEBUG( - AWS_LS_S3_META_REQUEST, - "id=%p: Resuming PutObject ended early, since there is nothing to resume" - "(request finished prior to being paused?)", - (void *)meta_request); + if (error_code != AWS_ERROR_SUCCESS) { + if (request->send_data.response_status == AWS_HTTP_STATUS_CODE_404_NOT_FOUND && + auto_ranged_put->resume_token->num_parts_completed == + auto_ranged_put->resume_token->total_num_parts) { + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: Resuming PutObject ended early, since there is nothing to resume" + "(request finished prior to being paused?)", + (void *)meta_request); - aws_s3_meta_request_set_success_synced(meta_request, AWS_HTTP_STATUS_CODE_200_OK); - } else { - aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); - } + aws_s3_meta_request_set_success_synced(meta_request, AWS_HTTP_STATUS_CODE_200_OK); + } else { + aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); } - - aws_s3_meta_request_unlock_synced_data(meta_request); } - /* END CRITICAL SECTION */ - break; - } + } break; case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD: { struct aws_http_headers *needed_response_headers = NULL; @@ -1464,25 +1457,16 @@ static void s_s3_auto_ranged_put_request_finished( } } - /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); + AWS_ASSERT(auto_ranged_put->synced_data.needed_response_headers == NULL); + auto_ranged_put->synced_data.needed_response_headers = needed_response_headers; - AWS_ASSERT(auto_ranged_put->synced_data.needed_response_headers == NULL); - auto_ranged_put->synced_data.needed_response_headers = needed_response_headers; + auto_ranged_put->synced_data.create_multipart_upload_completed = true; + auto_ranged_put->synced_data.list_parts_error_code = error_code; - auto_ranged_put->synced_data.create_multipart_upload_completed = true; - auto_ranged_put->synced_data.list_parts_error_code = error_code; - - if (error_code != AWS_ERROR_SUCCESS) { - aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); - } - - aws_s3_meta_request_unlock_synced_data(meta_request); + if (error_code != AWS_ERROR_SUCCESS) { + aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); } - /* END CRITICAL SECTION */ - break; - } + } break; case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART: { size_t part_number = request->part_number; @@ -1516,63 +1500,53 @@ static void s_s3_auto_ranged_put_request_finished( } } - /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); - - ++auto_ranged_put->synced_data.num_parts_completed; - - if (request_is_noop) { - ++auto_ranged_put->synced_data.num_parts_noop; - } + ++auto_ranged_put->synced_data.num_parts_completed; - if (auto_ranged_put->has_content_length) { - AWS_LOGF_DEBUG( - AWS_LS_S3_META_REQUEST, - "id=%p: %d out of %d parts have completed.", - (void *)meta_request, - auto_ranged_put->synced_data.num_parts_completed, - auto_ranged_put->total_num_parts_from_content_length); - } else { - AWS_LOGF_DEBUG( - AWS_LS_S3_META_REQUEST, - "id=%p: %d parts have completed.", - (void *)meta_request, - auto_ranged_put->synced_data.num_parts_completed); - } + if (request_is_noop) { + ++auto_ranged_put->synced_data.num_parts_noop; + } - if (!request_is_noop) { - if (error_code == AWS_ERROR_SUCCESS) { - AWS_ASSERT(etag != NULL); + if (auto_ranged_put->has_content_length) { + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: %d out of %d parts have completed.", + (void *)meta_request, + auto_ranged_put->synced_data.num_parts_completed, + auto_ranged_put->total_num_parts_from_content_length); + } else { + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: %d parts have completed.", + (void *)meta_request, + auto_ranged_put->synced_data.num_parts_completed); + } - ++auto_ranged_put->synced_data.num_parts_successful; + if (!request_is_noop) { + if (error_code == AWS_ERROR_SUCCESS) { + AWS_ASSERT(etag != NULL); - /* Send progress_callback for delivery on io_event_loop thread */ - if (meta_request->progress_callback != NULL) { - struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; - event.u.progress.info.bytes_transferred = request->request_body.len; - event.u.progress.info.content_length = auto_ranged_put->content_length; - aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); - } + ++auto_ranged_put->synced_data.num_parts_successful; - /* Store part's ETag */ - struct aws_s3_mpu_part_info *part = NULL; - aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); - AWS_ASSERT(part != NULL); - AWS_ASSERT(part->etag == NULL); - part->etag = etag; - } else { - ++auto_ranged_put->synced_data.num_parts_failed; - aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); + /* Send progress_callback for delivery on io_event_loop thread */ + if (meta_request->progress_callback != NULL) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + event.u.progress.info.bytes_transferred = request->request_body.len; + event.u.progress.info.content_length = auto_ranged_put->content_length; + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); } - } - aws_s3_meta_request_unlock_synced_data(meta_request); + /* Store part's ETag */ + struct aws_s3_mpu_part_info *part = NULL; + aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); + AWS_ASSERT(part != NULL); + AWS_ASSERT(part->etag == NULL); + part->etag = etag; + } else { + ++auto_ranged_put->synced_data.num_parts_failed; + aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); + } } - /* END CRITICAL SECTION */ - - break; - } + } break; case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_COMPLETE_MULTIPART_UPLOAD: { if (error_code == AWS_ERROR_SUCCESS && meta_request->headers_callback != NULL) { @@ -1584,14 +1558,7 @@ static void s_s3_auto_ranged_put_request_finished( /* Copy over any response headers that we've previously determined are needed for this final * response. */ - - /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); - copy_http_headers(auto_ranged_put->synced_data.needed_response_headers, final_response_headers); - aws_s3_meta_request_unlock_synced_data(meta_request); - } - /* END CRITICAL SECTION */ + copy_http_headers(auto_ranged_put->synced_data.needed_response_headers, final_response_headers); struct aws_byte_cursor xml_doc = aws_byte_cursor_from_buf(&request->send_data.response_body); @@ -1619,6 +1586,8 @@ static void s_s3_auto_ranged_put_request_finished( aws_byte_buf_clean_up(&etag_header_value_byte_buf); } + /* Invoke the callback without lock */ + aws_s3_meta_request_unlock_synced_data(meta_request); /* Notify the user of the headers. */ if (meta_request->headers_callback( meta_request, @@ -1629,37 +1598,28 @@ static void s_s3_auto_ranged_put_request_finished( error_code = aws_last_error_or_unknown(); } meta_request->headers_callback = NULL; + /* Grab the lock again after the callback */ + aws_s3_meta_request_lock_synced_data(meta_request); aws_http_headers_release(final_response_headers); } - /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); - auto_ranged_put->synced_data.complete_multipart_upload_completed = true; - auto_ranged_put->synced_data.complete_multipart_upload_error_code = error_code; + auto_ranged_put->synced_data.complete_multipart_upload_completed = true; + auto_ranged_put->synced_data.complete_multipart_upload_error_code = error_code; - if (error_code != AWS_ERROR_SUCCESS) { - aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); - } - aws_s3_meta_request_unlock_synced_data(meta_request); + if (error_code != AWS_ERROR_SUCCESS) { + aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); } - /* END CRITICAL SECTION */ - - break; - } + } break; case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD: { - /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); - auto_ranged_put->synced_data.abort_multipart_upload_error_code = error_code; - auto_ranged_put->synced_data.abort_multipart_upload_completed = true; - aws_s3_meta_request_unlock_synced_data(meta_request); - } - /* END CRITICAL SECTION */ - break; - } + auto_ranged_put->synced_data.abort_multipart_upload_error_code = error_code; + auto_ranged_put->synced_data.abort_multipart_upload_completed = true; + + } break; } + + aws_s3_request_finish_up_metrics_synced(request, meta_request); + aws_s3_meta_request_unlock_synced_data(meta_request); } static int s_s3_auto_ranged_put_pause( diff --git a/source/s3_copy_object.c b/source/s3_copy_object.c index 4c039a949..ba7e68c12 100644 --- a/source/s3_copy_object.c +++ b/source/s3_copy_object.c @@ -571,7 +571,6 @@ static void s_s3_copy_object_request_finished( struct aws_s3_copy_object *copy_object = meta_request->impl; aws_s3_meta_request_lock_synced_data(meta_request); - switch (request->request_tag) { case AWS_S3_COPY_OBJECT_REQUEST_TAG_GET_OBJECT_SIZE: { @@ -611,6 +610,8 @@ static void s_s3_copy_object_request_finished( /* Copy all the response headers from this request. */ copy_http_headers(request->send_data.response_headers, final_response_headers); + /* Invoke the callback without lock */ + aws_s3_meta_request_unlock_synced_data(meta_request); /* Notify the user of the headers. */ if (meta_request->headers_callback( meta_request, @@ -621,6 +622,8 @@ static void s_s3_copy_object_request_finished( error_code = aws_last_error_or_unknown(); } meta_request->headers_callback = NULL; + /* Grab the lock again after the callback */ + aws_s3_meta_request_lock_synced_data(meta_request); aws_http_headers_release(final_response_headers); } @@ -769,6 +772,8 @@ static void s_s3_copy_object_request_finished( } /* Notify the user of the headers. */ + /* Invoke the callback without lock */ + aws_s3_meta_request_unlock_synced_data(meta_request); if (meta_request->headers_callback( meta_request, final_response_headers, @@ -778,6 +783,8 @@ static void s_s3_copy_object_request_finished( error_code = aws_last_error_or_unknown(); } meta_request->headers_callback = NULL; + /* Grab the lock again after the callback */ + aws_s3_meta_request_lock_synced_data(meta_request); aws_http_headers_release(final_response_headers); } @@ -797,5 +804,8 @@ static void s_s3_copy_object_request_finished( break; } } + + aws_s3_request_finish_up_metrics_synced(request, meta_request); + aws_s3_meta_request_unlock_synced_data(meta_request); } diff --git a/source/s3_default_meta_request.c b/source/s3_default_meta_request.c index 746122311..0e759ff61 100644 --- a/source/s3_default_meta_request.c +++ b/source/s3_default_meta_request.c @@ -401,6 +401,7 @@ static void s_s3_meta_request_default_request_finished( meta_request_default->synced_data.cached_response_status = request->send_data.response_status; meta_request_default->synced_data.request_completed = true; meta_request_default->synced_data.request_error_code = error_code; + bool finishing_metrics = true; if (error_code == AWS_ERROR_SUCCESS) { /* Send progress_callback for delivery on io_event_loop thread. @@ -423,10 +424,17 @@ static void s_s3_meta_request_default_request_finished( } aws_s3_meta_request_stream_response_body_synced(meta_request, request); + /* The body of the request is queued to be streamed, don't record the end timestamp for the request + * yet. */ + finishing_metrics = false; } else { aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); } + if (finishing_metrics) { + aws_s3_request_finish_up_metrics_synced(request, meta_request); + } + aws_s3_meta_request_unlock_synced_data(meta_request); } /* END CRITICAL SECTION */ diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 4b6d0b21d..88783c27a 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -1750,6 +1750,22 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1); ++num_parts_delivered; + + if (request->send_data.metrics != NULL) { + /* Request is done streaming the body, complete the metrics for the request now. */ + struct aws_s3_request_metrics *metrics = request->send_data.metrics; + metrics->crt_info_metrics.error_code = error_code; + aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns); + metrics->time_metrics.total_duration_ns = + metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns; + + if (meta_request->telemetry_callback != NULL) { + /* We already in the meta request event thread, invoke the telemetry callback directly */ + meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data); + } + request->send_data.metrics = aws_s3_request_metrics_release(metrics); + } + aws_s3_request_release(request); } break; @@ -1783,6 +1799,20 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a } } break; + case AWS_S3_META_REQUEST_EVENT_TELEMETRY: { + struct aws_s3_request_metrics *metrics = event.u.telemetry.metrics; + AWS_FATAL_ASSERT(meta_request->telemetry_callback != NULL); + AWS_FATAL_ASSERT(metrics != NULL); + + if (metrics->time_metrics.end_timestamp_ns == -1) { + aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns); + metrics->time_metrics.total_duration_ns = + metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns; + } + meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data); + event.u.telemetry.metrics = aws_s3_request_metrics_release(event.u.telemetry.metrics); + } break; + default: AWS_FATAL_ASSERT(false); } diff --git a/source/s3_request.c b/source/s3_request.c index 2e40f6723..aab3d9526 100644 --- a/source/s3_request.c +++ b/source/s3_request.c @@ -51,6 +51,24 @@ void aws_s3_request_setup_send_data(struct aws_s3_request *request, struct aws_h AWS_PRECONDITION(request); AWS_PRECONDITION(message); + if (request != NULL && request->send_data.metrics != NULL) { + /* If there is a metrics from previous attempt, complete it now. */ + struct aws_s3_request_metrics *metric = request->send_data.metrics; + aws_high_res_clock_get_ticks((uint64_t *)&metric->time_metrics.end_timestamp_ns); + metric->time_metrics.total_duration_ns = + metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns; + + struct aws_s3_meta_request *meta_request = request->meta_request; + if (meta_request != NULL && meta_request->telemetry_callback != NULL) { + + aws_s3_meta_request_lock_synced_data(meta_request); + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_TELEMETRY}; + event.u.telemetry.metrics = aws_s3_request_metrics_acquire(metric); + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); + aws_s3_meta_request_unlock_synced_data(meta_request); + } + request->send_data.metrics = aws_s3_request_metrics_release(metric); + } aws_s3_request_clean_up_send_data(request); request->send_data.message = message; @@ -76,24 +94,13 @@ static void s_s3_request_clean_up_send_data_message(struct aws_s3_request *reque void aws_s3_request_clean_up_send_data(struct aws_s3_request *request) { AWS_PRECONDITION(request); + /* The metrics should be collected and provided to user before reaching here */ + AWS_FATAL_ASSERT(request->send_data.metrics == NULL); s_s3_request_clean_up_send_data_message(request); aws_signable_destroy(request->send_data.signable); request->send_data.signable = NULL; - if (request->send_data.metrics) { - /* invoke callback */ - struct aws_s3_meta_request *meta_request = request->meta_request; - struct aws_s3_request_metrics *metric = request->send_data.metrics; - aws_high_res_clock_get_ticks((uint64_t *)&metric->time_metrics.end_timestamp_ns); - metric->time_metrics.total_duration_ns = - metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns; - if (meta_request->telemetry_callback) { - meta_request->telemetry_callback(meta_request, metric, meta_request->user_data); - } - request->send_data.metrics = aws_s3_request_metrics_release(metric); - } - aws_http_headers_release(request->send_data.response_headers); request->send_data.response_headers = NULL; diff --git a/source/s3_util.c b/source/s3_util.c index eebb6694e..1747a9524 100644 --- a/source/s3_util.c +++ b/source/s3_util.c @@ -5,7 +5,10 @@ #include "aws/s3/private/s3_util.h" #include "aws/s3/private/s3_client_impl.h" +#include "aws/s3/private/s3_meta_request_impl.h" +#include "aws/s3/private/s3_request.h" #include +#include #include #include #include @@ -629,3 +632,23 @@ int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor e } return AWS_ERROR_UNKNOWN; } + +void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request) { + AWS_PRECONDITION(meta_request); + AWS_PRECONDITION(request); + + if (request->send_data.metrics != NULL) { + /* Request is done, complete the metrics for the request now. */ + struct aws_s3_request_metrics *metrics = request->send_data.metrics; + aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns); + metrics->time_metrics.total_duration_ns = + metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns; + + if (meta_request->telemetry_callback != NULL) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_TELEMETRY}; + event.u.telemetry.metrics = aws_s3_request_metrics_acquire(metrics); + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); + } + request->send_data.metrics = aws_s3_request_metrics_release(metrics); + } +} diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index e879a9557..68e1ca4fc 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -404,6 +404,8 @@ static int s_test_s3_request_create_destroy(struct aws_allocator *allocator, voi request->send_data.response_headers = aws_http_headers_new(allocator); ASSERT_TRUE(request->send_data.response_headers != NULL); + ASSERT_TRUE(request->send_data.metrics != NULL); + request->send_data.metrics = aws_s3_request_metrics_release(request->send_data.metrics); aws_s3_request_clean_up_send_data(request); @@ -3964,25 +3966,17 @@ static int s_test_s3_meta_request_default(struct aws_allocator *allocator, void aws_s3_tester_unlock_synced_data(&tester); - ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(&meta_request_test_results, 0)); - - meta_request = aws_s3_meta_request_release(meta_request); - - aws_s3_tester_wait_for_meta_request_shutdown(&tester); - - /* - * TODO: telemetry is sent from request destructor, http threads hold on to - * req for a little bit after on_req_finished callback and its possible that - * telemetry callback will be invoked after meta reqs on_finished callback. - * Moving the telemetry check to after meta req shutdown callback. Need to - * figure out whether current behavior can be improved. - */ /* Check the size of the metrics should be the same as the number of requests, which should be 1 */ ASSERT_UINT_EQUALS(1, aws_array_list_length(&meta_request_test_results.synced_data.metrics)); struct aws_s3_request_metrics *metrics = NULL; aws_array_list_back(&meta_request_test_results.synced_data.metrics, (void **)&metrics); + ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(&meta_request_test_results, 0)); + + meta_request = aws_s3_meta_request_release(meta_request); + + aws_s3_tester_wait_for_meta_request_shutdown(&tester); aws_s3_meta_request_test_results_clean_up(&meta_request_test_results); aws_http_message_release(message);