Skip to content

Commit

Permalink
cleaned up event loop shutdown leak in iocp sockets. Channel now shut… (
Browse files Browse the repository at this point in the history
#149)

cleaned up event loop shutdown leak in iocp sockets. Channel now shuts down without specifying free_scarce_resources. We'll add a new api later when we're ready for DOS mitigations.
  • Loading branch information
JonathanHenson authored Jun 18, 2019
1 parent 815c4f6 commit 9d70079
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 43 deletions.
1 change: 1 addition & 0 deletions include/aws/io/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ enum aws_io_errors {
AWS_IO_CHANNEL_UNKNOWN_MESSAGE_TYPE,
AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW,
AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED,
AWS_IO_EVENT_LOOP_SHUTDOWN,
AWS_IO_SYS_CALL_FAILURE,
AWS_IO_TLS_ERROR_NEGOTIATION_FAILURE,
AWS_IO_TLS_ERROR_NOT_NEGOTIATED,
Expand Down
14 changes: 11 additions & 3 deletions source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct shutdown_task {
struct aws_channel_task task;
struct aws_channel *channel;
int error_code;
bool shutdown_immediately;
};

struct aws_channel {
Expand Down Expand Up @@ -296,20 +297,22 @@ struct channel_shutdown_task_args {
struct aws_task task;
};

static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately);

static void s_shutdown_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {

(void)task;

struct shutdown_task *shutdown_task = arg;

if (status == AWS_TASK_STATUS_RUN_READY) {
aws_channel_shutdown(shutdown_task->channel, shutdown_task->error_code);
s_channel_shutdown(shutdown_task->channel, shutdown_task->error_code, shutdown_task->shutdown_immediately);
}
}

static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);

int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
if (aws_channel_thread_is_callers_thread(channel)) {
if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);
Expand All @@ -324,7 +327,7 @@ int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
(void *)channel,
(void *)slot);

return aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, error_code != AWS_OP_SUCCESS);
return aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
}

channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
Expand Down Expand Up @@ -362,6 +365,7 @@ int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
&channel->cross_thread_tasks.shutdown_task.task,
s_shutdown_task,
&channel->cross_thread_tasks.shutdown_task);
channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
channel->cross_thread_tasks.shutdown_task.channel = channel;
channel->cross_thread_tasks.shutdown_task.error_code = error_code;
}
Expand All @@ -376,6 +380,10 @@ int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
return AWS_OP_SUCCESS;
}

int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
return s_channel_shutdown(channel, error_code, false);
}

struct aws_io_message *aws_channel_acquire_message_from_pool(
struct aws_channel *channel,
enum aws_io_message_type message_type,
Expand Down
3 changes: 3 additions & 0 deletions source/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_IO(
AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED,
"An attempt was made to assign an io handle to an event loop, but the handle was already assigned."),
AWS_DEFINE_ERROR_INFO_IO(
AWS_IO_EVENT_LOOP_SHUTDOWN,
"Event loop has shutdown and a resource was still using it, the resource has been removed from the loop."),
AWS_DEFINE_ERROR_INFO_IO(
AWS_IO_SYS_CALL_FAILURE,
"System call failure"),
Expand Down
7 changes: 5 additions & 2 deletions source/posix/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,20 +444,23 @@ static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_
socket_args->socket->io_handle.data.fd);

socket_args->socket->state = TIMEDOUT;
int error_code = AWS_IO_SOCKET_TIMEOUT;

if (status == AWS_TASK_STATUS_RUN_READY) {
aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle);
} else {
error_code = AWS_IO_EVENT_LOOP_SHUTDOWN;
aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle);
}
socket_args->socket->event_loop = NULL;
struct posix_socket *socket_impl = socket_args->socket->impl;
socket_impl->currently_subscribed = false;
aws_raise_error(AWS_IO_SOCKET_TIMEOUT);
aws_raise_error(error_code);
struct aws_socket *socket = socket_args->socket;
/*socket close sets socket_args->socket to NULL and
* socket_impl->connect_args to NULL. */
aws_socket_close(socket);
s_on_connection_error(socket, AWS_IO_SOCKET_TIMEOUT);
s_on_connection_error(socket, error_code);
}

aws_mem_release(socket_args->allocator, socket_args);
Expand Down
90 changes: 52 additions & 38 deletions source/windows/iocp/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -853,23 +853,33 @@ void s_socket_connection_completion(
connection is considered timedout. */
static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) {
(void)task;
(void)status;
struct socket_connect_args *socket_args = args;

AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task);
if (status == AWS_TASK_STATUS_RUN_READY) {
if (socket_args->socket) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: timed out, shutting down.",
(void *)socket_args->socket,
(void *)socket_args->socket->io_handle.data.handle);
socket_args->socket->state = TIMEDOUT;
aws_raise_error(AWS_IO_SOCKET_TIMEOUT);
struct aws_socket *socket = socket_args->socket;
/* socket close will set the connection args to NULL etc...*/
aws_socket_close(socket);
socket->connection_result_fn(socket, AWS_IO_SOCKET_TIMEOUT, socket->connect_accept_user_data);
if (socket_args->socket) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: timed out, shutting down.",
(void *)socket_args->socket,
(void *)socket_args->socket->io_handle.data.handle);
socket_args->socket->state = TIMEDOUT;
struct aws_socket *socket = socket_args->socket;
int error_code = AWS_IO_SOCKET_TIMEOUT;

/* since the task is canceled the event-loop is gone and the iocp will not trigger, so go ahead
and tell the socket cleanup stuff that the iocp handle is no longer pending operations. */
if (status == AWS_TASK_STATUS_CANCELED) {
struct iocp_socket *iocp_socket = socket->impl;
iocp_socket->read_io_data->in_use = false;
error_code = AWS_IO_EVENT_LOOP_SHUTDOWN;
}

aws_raise_error(error_code);

/* socket close will set the connection args to NULL etc...*/
aws_socket_close(socket);
socket->connection_result_fn(socket, error_code, socket->connect_accept_user_data);
}

struct aws_allocator *allocator = socket_args->allocator;
Expand Down Expand Up @@ -1090,6 +1100,8 @@ static int s_ipv6_stream_connect(
/* simply moves the connection_success notification into the event-loop's thread. */
static void s_connection_success_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
(void)task;
(void)task_status;

struct io_operation_data *io_data = arg;

if (!io_data->socket) {
Expand All @@ -1101,11 +1113,9 @@ static void s_connection_success_task(struct aws_task *task, void *arg, enum aws
io_data->sequential_task_storage.arg = NULL;
io_data->in_use = false;

if (task_status == AWS_TASK_STATUS_RUN_READY) {
struct aws_socket *socket = io_data->socket;
struct iocp_socket *socket_impl = socket->impl;
socket_impl->vtable->connection_success(socket);
}
struct aws_socket *socket = io_data->socket;
struct iocp_socket *socket_impl = socket->impl;
socket_impl->vtable->connection_success(socket);
}

/* initiate the client end of a named pipe. */
Expand Down Expand Up @@ -1958,19 +1968,18 @@ static int s_stream_stop_accept(struct aws_socket *socket);

static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) {
(void)task;
(void)status;

if (status == AWS_TASK_STATUS_RUN_READY) {
struct stop_accept_args *stop_accept_args = arg;
aws_mutex_lock(&stop_accept_args->mutex);
stop_accept_args->ret_code = AWS_OP_SUCCESS;
struct stop_accept_args *stop_accept_args = arg;
aws_mutex_lock(&stop_accept_args->mutex);
stop_accept_args->ret_code = AWS_OP_SUCCESS;

if (aws_socket_stop_accept(stop_accept_args->socket)) {
stop_accept_args->ret_code = aws_last_error();
}
stop_accept_args->invoked = true;
aws_condition_variable_notify_one(&stop_accept_args->condition_var);
aws_mutex_unlock(&stop_accept_args->mutex);
if (aws_socket_stop_accept(stop_accept_args->socket)) {
stop_accept_args->ret_code = aws_last_error();
}
stop_accept_args->invoked = true;
aws_condition_variable_notify_one(&stop_accept_args->condition_var);
aws_mutex_unlock(&stop_accept_args->mutex);
}

static int s_stream_stop_accept(struct aws_socket *socket) {
Expand Down Expand Up @@ -2274,18 +2283,23 @@ static int s_socket_close(struct aws_socket *socket);
static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) {
(void)task;

if (status == AWS_TASK_STATUS_RUN_READY) {
struct close_args *close_args = arg;
aws_mutex_lock(&close_args->mutex);
close_args->ret_code = AWS_OP_SUCCESS;
struct close_args *close_args = arg;
aws_mutex_lock(&close_args->mutex);
close_args->ret_code = AWS_OP_SUCCESS;

if (aws_socket_close(close_args->socket)) {
close_args->ret_code = aws_last_error();
}
close_args->invoked = true;
aws_condition_variable_notify_one(&close_args->condition_var);
aws_mutex_unlock(&close_args->mutex);
/* since the task is canceled the event-loop is gone and the iocp will not trigger, so go ahead
and tell the socket cleanup stuff that the iocp handle is no longer pending operations. */
if (status == AWS_TASK_STATUS_CANCELED) {
struct iocp_socket *iocp_socket = close_args->socket->impl;
iocp_socket->read_io_data->in_use = false;
}

if (aws_socket_close(close_args->socket)) {
close_args->ret_code = aws_last_error();
}
close_args->invoked = true;
aws_condition_variable_notify_one(&close_args->condition_var);
aws_mutex_unlock(&close_args->mutex);
}

static int s_wait_on_close(struct aws_socket *socket) {
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ add_test_case(local_socket_communication)
add_test_case(tcp_socket_communication)
add_test_case(udp_socket_communication)
add_net_test_case(connect_timeout)

# we're redoing libuv support, and this will be fixed then.
if (NOT USE_LIBUV)
add_net_test_case(connect_timeout_cancelation)
endif()
add_test_case(outgoing_local_sock_errors)
add_test_case(outgoing_tcp_sock_error)
add_test_case(incoming_tcp_sock_errors)
Expand Down
71 changes: 71 additions & 0 deletions tests/socket_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,77 @@ static int s_test_connect_timeout(struct aws_allocator *allocator, void *ctx) {

AWS_TEST_CASE(connect_timeout, s_test_connect_timeout)

static int s_test_connect_timeout_cancelation(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

struct aws_event_loop_group el_group;
ASSERT_SUCCESS(aws_event_loop_group_default_init(&el_group, allocator, 1));
struct aws_event_loop *event_loop = aws_event_loop_group_get_next_loop(&el_group);
ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error()));

struct aws_mutex mutex = AWS_MUTEX_INIT;
struct aws_condition_variable condition_variable = AWS_CONDITION_VARIABLE_INIT;

struct aws_socket_options options;
AWS_ZERO_STRUCT(options);
options.connect_timeout_ms = 1000;
options.type = AWS_SOCKET_STREAM;
options.domain = AWS_SOCKET_IPV4;

struct aws_host_resolver resolver;
ASSERT_SUCCESS(aws_host_resolver_init_default(&resolver, allocator, 2, &el_group));

struct aws_host_resolution_config resolution_config = {
.impl = aws_default_dns_resolve, .impl_data = NULL, .max_ttl = 1};

struct test_host_callback_data host_callback_data = {
.condition_variable = AWS_CONDITION_VARIABLE_INIT,
.invoked = false,
.has_a_address = false,
.mutex = &mutex,
};

/* This ec2 instance sits in a VPC that makes sure port 81 is black-holed (no TCP SYN should be received). */
struct aws_string *host_name = aws_string_new_from_c_str(allocator, "ec2-54-158-231-48.compute-1.amazonaws.com");
aws_mutex_lock(&mutex);
ASSERT_SUCCESS(aws_host_resolver_resolve_host(
&resolver, host_name, s_test_host_resolved_test_callback, &resolution_config, &host_callback_data));

aws_condition_variable_wait_pred(
&host_callback_data.condition_variable, &mutex, s_test_host_resolved_predicate, &host_callback_data);

aws_host_resolver_clean_up(&resolver);

ASSERT_TRUE(host_callback_data.has_a_address);

struct aws_socket_endpoint endpoint = {.port = 81};
sprintf(endpoint.address, "%s", aws_string_bytes(host_callback_data.a_address.address));

aws_string_destroy((void *)host_name);
aws_host_address_clean_up(&host_callback_data.a_address);

struct local_outgoing_args outgoing_args = {
.mutex = &mutex,
.condition_variable = &condition_variable,
.connect_invoked = false,
.error_invoked = false,
};

aws_mutex_unlock(&mutex);
struct aws_socket outgoing;
ASSERT_SUCCESS(aws_socket_init(&outgoing, allocator, &options));
ASSERT_SUCCESS(aws_socket_connect(&outgoing, &endpoint, event_loop, s_local_outgoing_connection, &outgoing_args));

aws_event_loop_group_clean_up(&el_group);

ASSERT_INT_EQUALS(AWS_IO_EVENT_LOOP_SHUTDOWN, outgoing_args.last_error);
aws_socket_clean_up(&outgoing);

return 0;
}

AWS_TEST_CASE(connect_timeout_cancelation, s_test_connect_timeout_cancelation)

struct error_test_args {
int error_code;
struct aws_mutex mutex;
Expand Down

0 comments on commit 9d70079

Please sign in to comment.