Skip to content

Commit

Permalink
Metric callback improve (#390)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Dec 21, 2023
1 parent 2c19abf commit c91f4c3
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 203 deletions.
7 changes: 6 additions & 1 deletion include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct aws_s3_meta_request_event {
enum aws_s3_meta_request_event_type {
AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */
AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */
/* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */
AWS_S3_META_REQUEST_EVENT_TELEMETRY, /* telemetry_callback */
} type;

union {
Expand All @@ -72,6 +72,11 @@ struct aws_s3_meta_request_event {
struct {
struct aws_s3_meta_request_progress info;
} progress;

/* data for AWS_S3_META_REQUEST_EVENT_TELEMETRY */
struct {
struct aws_s3_request_metrics *metrics;
} telemetry;
} u;
};

Expand Down
5 changes: 5 additions & 0 deletions include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct aws_http_stream;
struct aws_http_headers;
struct aws_http_message;
struct aws_s3_client;
struct aws_s3_request;
struct aws_s3_meta_request;

struct aws_cached_signing_config_aws {
struct aws_allocator *allocator;
Expand Down Expand Up @@ -276,6 +278,9 @@ void aws_s3_get_part_range(
AWS_S3_API
int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor error_code_string);

AWS_S3_API
void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request);

AWS_EXTERN_C_END

#endif /* AWS_S3_UTIL_H */
6 changes: 1 addition & 5 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ typedef void(aws_s3_meta_request_progress_fn)(
void *user_data);

/**
* Invoked to report the telemetry of the meta request once a single request finishes. Invoked from the thread of the
* connection that request made from.
* Invoked to report the telemetry of the meta request once a single request finishes.
* Note: *metrics is only valid for the duration of the callback. If you need to keep it around, use
* `aws_s3_request_metrics_acquire`
*/
Expand Down Expand Up @@ -690,9 +689,6 @@ struct aws_s3_meta_request_options {
* If set the request will keep track of the metrics from `aws_s3_request_metrics`, and fire the callback when the
* request finishes receiving response.
* See `aws_s3_meta_request_telemetry_fn`
*
* Notes:
* - The callback will be invoked multiple times from different threads.
*/
aws_s3_meta_request_telemetry_fn *telemetry_callback;

Expand Down
7 changes: 6 additions & 1 deletion source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ static void s_s3_auto_ranged_get_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
bool finishing_metrics = true;

/* If the object range was found, then record it. */
if (found_object_size) {
Expand Down Expand Up @@ -722,6 +723,8 @@ static void s_s3_auto_ranged_get_request_finished(
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);
/* The body of the request is queued to be streamed, don't finish the metrics yet. */
finishing_metrics = false;

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
Expand Down Expand Up @@ -756,7 +759,9 @@ static void s_s3_auto_ranged_get_request_finished(
meta_request->synced_data.finish_result.validation_algorithm = request->validation_algorithm;
}
}

if (finishing_metrics) {
aws_s3_request_finish_up_metrics_synced(request, meta_request);
}
aws_s3_meta_request_unlock_synced_data(meta_request);
}
/* END CRITICAL SECTION */
Expand Down
298 changes: 129 additions & 169 deletions source/s3_auto_ranged_put.c

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion source/s3_copy_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,6 @@ static void s_s3_copy_object_request_finished(

struct aws_s3_copy_object *copy_object = meta_request->impl;
aws_s3_meta_request_lock_synced_data(meta_request);

switch (request->request_tag) {

case AWS_S3_COPY_OBJECT_REQUEST_TAG_GET_OBJECT_SIZE: {
Expand Down Expand Up @@ -611,6 +610,8 @@ static void s_s3_copy_object_request_finished(
/* Copy all the response headers from this request. */
copy_http_headers(request->send_data.response_headers, final_response_headers);

/* Invoke the callback without lock */
aws_s3_meta_request_unlock_synced_data(meta_request);
/* Notify the user of the headers. */
if (meta_request->headers_callback(
meta_request,
Expand All @@ -621,6 +622,8 @@ static void s_s3_copy_object_request_finished(
error_code = aws_last_error_or_unknown();
}
meta_request->headers_callback = NULL;
/* Grab the lock again after the callback */
aws_s3_meta_request_lock_synced_data(meta_request);

aws_http_headers_release(final_response_headers);
}
Expand Down Expand Up @@ -769,6 +772,8 @@ static void s_s3_copy_object_request_finished(
}

/* Notify the user of the headers. */
/* Invoke the callback without lock */
aws_s3_meta_request_unlock_synced_data(meta_request);
if (meta_request->headers_callback(
meta_request,
final_response_headers,
Expand All @@ -778,6 +783,8 @@ static void s_s3_copy_object_request_finished(
error_code = aws_last_error_or_unknown();
}
meta_request->headers_callback = NULL;
/* Grab the lock again after the callback */
aws_s3_meta_request_lock_synced_data(meta_request);

aws_http_headers_release(final_response_headers);
}
Expand All @@ -797,5 +804,8 @@ static void s_s3_copy_object_request_finished(
break;
}
}

aws_s3_request_finish_up_metrics_synced(request, meta_request);

aws_s3_meta_request_unlock_synced_data(meta_request);
}
8 changes: 8 additions & 0 deletions source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ static void s_s3_meta_request_default_request_finished(
meta_request_default->synced_data.cached_response_status = request->send_data.response_status;
meta_request_default->synced_data.request_completed = true;
meta_request_default->synced_data.request_error_code = error_code;
bool finishing_metrics = true;

if (error_code == AWS_ERROR_SUCCESS) {
/* Send progress_callback for delivery on io_event_loop thread.
Expand All @@ -423,10 +424,17 @@ static void s_s3_meta_request_default_request_finished(
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);
/* The body of the request is queued to be streamed, don't record the end timestamp for the request
* yet. */
finishing_metrics = false;
} else {
aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
}

if (finishing_metrics) {
aws_s3_request_finish_up_metrics_synced(request, meta_request);
}

aws_s3_meta_request_unlock_synced_data(meta_request);
}
/* END CRITICAL SECTION */
Expand Down
30 changes: 30 additions & 0 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,22 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);

++num_parts_delivered;

if (request->send_data.metrics != NULL) {
/* Request is done streaming the body, complete the metrics for the request now. */
struct aws_s3_request_metrics *metrics = request->send_data.metrics;
metrics->crt_info_metrics.error_code = error_code;
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;

if (meta_request->telemetry_callback != NULL) {
/* We already in the meta request event thread, invoke the telemetry callback directly */
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
}
request->send_data.metrics = aws_s3_request_metrics_release(metrics);
}

aws_s3_request_release(request);
} break;

Expand Down Expand Up @@ -1783,6 +1799,20 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
}
} break;

case AWS_S3_META_REQUEST_EVENT_TELEMETRY: {
struct aws_s3_request_metrics *metrics = event.u.telemetry.metrics;
AWS_FATAL_ASSERT(meta_request->telemetry_callback != NULL);
AWS_FATAL_ASSERT(metrics != NULL);

if (metrics->time_metrics.end_timestamp_ns == -1) {
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
}
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
event.u.telemetry.metrics = aws_s3_request_metrics_release(event.u.telemetry.metrics);
} break;

default:
AWS_FATAL_ASSERT(false);
}
Expand Down
33 changes: 20 additions & 13 deletions source/s3_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ void aws_s3_request_setup_send_data(struct aws_s3_request *request, struct aws_h
AWS_PRECONDITION(request);
AWS_PRECONDITION(message);

if (request != NULL && request->send_data.metrics != NULL) {
/* If there is a metrics from previous attempt, complete it now. */
struct aws_s3_request_metrics *metric = request->send_data.metrics;
aws_high_res_clock_get_ticks((uint64_t *)&metric->time_metrics.end_timestamp_ns);
metric->time_metrics.total_duration_ns =
metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns;

struct aws_s3_meta_request *meta_request = request->meta_request;
if (meta_request != NULL && meta_request->telemetry_callback != NULL) {

aws_s3_meta_request_lock_synced_data(meta_request);
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_TELEMETRY};
event.u.telemetry.metrics = aws_s3_request_metrics_acquire(metric);
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
aws_s3_meta_request_unlock_synced_data(meta_request);
}
request->send_data.metrics = aws_s3_request_metrics_release(metric);
}
aws_s3_request_clean_up_send_data(request);

request->send_data.message = message;
Expand All @@ -76,24 +94,13 @@ static void s_s3_request_clean_up_send_data_message(struct aws_s3_request *reque

void aws_s3_request_clean_up_send_data(struct aws_s3_request *request) {
AWS_PRECONDITION(request);
/* The metrics should be collected and provided to user before reaching here */
AWS_FATAL_ASSERT(request->send_data.metrics == NULL);

s_s3_request_clean_up_send_data_message(request);

aws_signable_destroy(request->send_data.signable);
request->send_data.signable = NULL;
if (request->send_data.metrics) {
/* invoke callback */
struct aws_s3_meta_request *meta_request = request->meta_request;
struct aws_s3_request_metrics *metric = request->send_data.metrics;
aws_high_res_clock_get_ticks((uint64_t *)&metric->time_metrics.end_timestamp_ns);
metric->time_metrics.total_duration_ns =
metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns;
if (meta_request->telemetry_callback) {
meta_request->telemetry_callback(meta_request, metric, meta_request->user_data);
}
request->send_data.metrics = aws_s3_request_metrics_release(metric);
}

aws_http_headers_release(request->send_data.response_headers);
request->send_data.response_headers = NULL;

Expand Down
23 changes: 23 additions & 0 deletions source/s3_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

#include "aws/s3/private/s3_util.h"
#include "aws/s3/private/s3_client_impl.h"
#include "aws/s3/private/s3_meta_request_impl.h"
#include "aws/s3/private/s3_request.h"
#include <aws/auth/credentials.h>
#include <aws/common/clock.h>
#include <aws/common/string.h>
#include <aws/common/xml_parser.h>
#include <aws/http/request_response.h>
Expand Down Expand Up @@ -629,3 +632,23 @@ int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor e
}
return AWS_ERROR_UNKNOWN;
}

void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request) {
AWS_PRECONDITION(meta_request);
AWS_PRECONDITION(request);

if (request->send_data.metrics != NULL) {
/* Request is done, complete the metrics for the request now. */
struct aws_s3_request_metrics *metrics = request->send_data.metrics;
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;

if (meta_request->telemetry_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_TELEMETRY};
event.u.telemetry.metrics = aws_s3_request_metrics_acquire(metrics);
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}
request->send_data.metrics = aws_s3_request_metrics_release(metrics);
}
}
20 changes: 7 additions & 13 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ static int s_test_s3_request_create_destroy(struct aws_allocator *allocator, voi

request->send_data.response_headers = aws_http_headers_new(allocator);
ASSERT_TRUE(request->send_data.response_headers != NULL);
ASSERT_TRUE(request->send_data.metrics != NULL);
request->send_data.metrics = aws_s3_request_metrics_release(request->send_data.metrics);

aws_s3_request_clean_up_send_data(request);

Expand Down Expand Up @@ -3964,25 +3966,17 @@ static int s_test_s3_meta_request_default(struct aws_allocator *allocator, void

aws_s3_tester_unlock_synced_data(&tester);

ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(&meta_request_test_results, 0));

meta_request = aws_s3_meta_request_release(meta_request);

aws_s3_tester_wait_for_meta_request_shutdown(&tester);

/*
* TODO: telemetry is sent from request destructor, http threads hold on to
* req for a little bit after on_req_finished callback and its possible that
* telemetry callback will be invoked after meta reqs on_finished callback.
* Moving the telemetry check to after meta req shutdown callback. Need to
* figure out whether current behavior can be improved.
*/
/* Check the size of the metrics should be the same as the number of
requests, which should be 1 */
ASSERT_UINT_EQUALS(1, aws_array_list_length(&meta_request_test_results.synced_data.metrics));
struct aws_s3_request_metrics *metrics = NULL;
aws_array_list_back(&meta_request_test_results.synced_data.metrics, (void **)&metrics);

ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(&meta_request_test_results, 0));

meta_request = aws_s3_meta_request_release(meta_request);

aws_s3_tester_wait_for_meta_request_shutdown(&tester);
aws_s3_meta_request_test_results_clean_up(&meta_request_test_results);

aws_http_message_release(message);
Expand Down

0 comments on commit c91f4c3

Please sign in to comment.