Skip to content

Commit

Permalink
Event loop shutdown fixes (#122)
Browse files Browse the repository at this point in the history
Event loops now report thread id accurately during shutdown.
  • Loading branch information
JonathanHenson authored Apr 9, 2019
1 parent 95ad1ca commit 596515e
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 15 deletions.
15 changes: 13 additions & 2 deletions source/bsd/kqueue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <aws/io/logging.h>

#include <aws/common/atomics.h>
#include <aws/common/clock.h>
#include <aws/common/mutex.h>
#include <aws/common/task_scheduler.h>
Expand Down Expand Up @@ -68,6 +69,7 @@ enum pipe_fd_index {

struct kqueue_loop {
struct aws_thread thread;
struct aws_atomic_var thread_id;
int kq_fd; /* kqueue file descriptor */

/* Pipe for signaling to event-thread that cross_thread_data has changed. */
Expand Down Expand Up @@ -161,6 +163,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw
if (!impl) {
goto clean_up;
}
/* intialize thread id to 0. It will be set when the event loop thread starts. */
aws_atomic_init_int(&impl->thread_id, (size_t)0);
clean_up_impl_mem = true;
AWS_ZERO_STRUCT(*impl);

Expand Down Expand Up @@ -294,6 +298,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
assert("Failed to destroy event-thread, resources have been leaked." == NULL);
return;
}
/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());

/* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
* Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
Expand Down Expand Up @@ -706,9 +712,9 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc

static bool s_is_event_thread(struct aws_event_loop *event_loop) {
struct kqueue_loop *impl = event_loop->impl_data;
assert(aws_thread_get_detach_state(&impl->thread) == AWS_THREAD_JOINABLE);

return aws_thread_current_thread_id() == aws_thread_get_id(&impl->thread);
uint64_t thread_id = aws_atomic_load_int(&impl->thread_id);
return aws_thread_current_thread_id() == thread_id;
}

/* Called from thread.
Expand Down Expand Up @@ -793,6 +799,9 @@ static void s_event_thread_main(void *user_data) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
struct kqueue_loop *impl = event_loop->impl_data;

/* set thread id to the event-loop's thread. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());

assert(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;

Expand Down Expand Up @@ -950,4 +959,6 @@ static void s_event_thread_main(void *user_data) {
}

AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
/* reset to 0. This should be updated again during destroy before tasks are canceled. */
aws_atomic_store_int(&impl->thread_id, 0);
}
22 changes: 20 additions & 2 deletions source/libuv/uv_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <aws/io/event_loop.h>

#include <aws/common/atomics.h>
#include <aws/common/clock.h>
#include <aws/common/mutex.h>
#include <aws/common/task_scheduler.h>
Expand Down Expand Up @@ -106,6 +107,7 @@ enum event_thread_state {
/** This struct is owned by libuv_loop if the uv_loop is owned by us */
struct libuv_owned {
struct aws_thread thread;
struct aws_atomic_var thread_id;
enum event_thread_state state;
};
static struct libuv_owned *s_owned(struct libuv_loop *loop) {
Expand Down Expand Up @@ -322,6 +324,11 @@ static void s_destroy(struct aws_event_loop *event_loop) {
/* Wait for completion */
s_wait_for_stop_completion(event_loop);

if (impl->owns_uv_loop) {
/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
aws_atomic_store_int(&impl->ownership_specific.uv_owned->thread_id, (size_t)aws_thread_current_thread_id());
}

/* Tasks in scheduler get cancelled */
aws_hash_table_foreach(&impl->active_thread_data.running_tasks, s_running_tasks_destroy, NULL);
aws_hash_table_clean_up(&impl->active_thread_data.running_tasks);
Expand Down Expand Up @@ -356,12 +363,16 @@ static void s_thread_loop(void *args) {
struct libuv_loop *impl = args;

s_owned(impl)->state = EVENT_THREAD_STATE_RUNNING;
/* while thread is running we want to report the event-loop thread */
aws_atomic_store_int(&s_owned(impl)->thread_id, (size_t)aws_thread_current_thread_id());

while (s_owned(impl)->state == EVENT_THREAD_STATE_RUNNING) {
uv_run(impl->uv_loop, UV_RUN_ONCE);
}

s_owned(impl)->state = EVENT_THREAD_STATE_READY_TO_RUN;
/* set back to 0 for sanity. it should be updated in destroy before canceling tasks. */
aws_atomic_store_int(&s_owned(impl)->thread_id, (size_t)0);
}

static void s_uv_task_timer_cb(uv_timer_t *handle UV_STATUS_PARAM) {
Expand Down Expand Up @@ -774,8 +785,13 @@ static void s_free_io_event_resources(void *user_data) {
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
struct libuv_loop *impl = event_loop->impl_data;

const uint64_t uv_tid =
impl->owns_uv_loop ? aws_thread_get_id(&s_owned(impl)->thread) : s_unowned(impl)->uv_thread_id;
uint64_t uv_tid = 0;

if (impl->owns_uv_loop) {
uv_tid = aws_atomic_load_int(&s_owned(impl)->thread_id);
} else {
uv_tid = s_unowned(impl)->uv_thread_id;
}

return uv_tid == aws_thread_current_thread_id();
}
Expand Down Expand Up @@ -806,6 +822,7 @@ static int s_libuv_loop_init(
}

impl->uv_loop = uv_loop;
/* init thread id to 0 until the event-loop thread starts. */
aws_atomic_init_int(&impl->num_open_handles, 0);

/* Init cross-thread data */
Expand Down Expand Up @@ -877,6 +894,7 @@ struct aws_event_loop *aws_event_loop_new_libuv(struct aws_allocator *alloc, aws

impl->owns_uv_loop = true;
impl->ownership_specific.uv_owned = owned;
aws_atomic_init_int(&impl->ownership_specific.uv_owned->thread_id, 0);

#if UV_VERSION_MAJOR == 0
uv_loop = uv_loop_new();
Expand Down
18 changes: 15 additions & 3 deletions source/linux/epoll_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <aws/io/event_loop.h>

#include <aws/common/atomics.h>
#include <aws/common/clock.h>
#include <aws/common/mutex.h>
#include <aws/common/task_scheduler.h>
Expand Down Expand Up @@ -88,14 +89,15 @@ static struct aws_event_loop_vtable s_vtable = {
struct epoll_loop {
struct aws_task_scheduler scheduler;
struct aws_thread thread;
struct aws_atomic_var thread_id;
struct aws_io_handle read_task_handle;
struct aws_io_handle write_task_handle;
struct aws_mutex task_pre_queue_mutex;
struct aws_linked_list task_pre_queue;
bool should_process_task_pre_queue;
struct aws_task stop_task;
int epoll_fd;
bool should_process_task_pre_queue;
bool should_continue;
struct aws_task stop_task;
};

struct epoll_event_data {
Expand Down Expand Up @@ -135,6 +137,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw
}

AWS_ZERO_STRUCT(*epoll_loop);
/* initialize thread id to 0, it should be updated when the event loop thread starts. */
aws_atomic_init_int(&epoll_loop->thread_id, 0);

aws_linked_list_init(&epoll_loop->task_pre_queue);
epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
Expand Down Expand Up @@ -232,6 +236,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
aws_event_loop_stop(event_loop);
s_wait_for_stop_completion(event_loop);

/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id());
aws_task_scheduler_clean_up(&epoll_loop->scheduler);

while (!aws_linked_list_empty(&epoll_loop->task_pre_queue)) {
Expand Down Expand Up @@ -460,7 +466,8 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
struct epoll_loop *epoll_loop = event_loop->impl_data;

return aws_thread_current_thread_id() == aws_thread_get_id(&epoll_loop->thread);
uint64_t thread_id = aws_atomic_load_int(&epoll_loop->thread_id);
return aws_thread_current_thread_id() == thread_id;
}

/* We treat the pipe fd with a subscription to io events just like any other managed file descriptor.
Expand Down Expand Up @@ -529,6 +536,9 @@ static void s_main_loop(void *args) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
struct epoll_loop *epoll_loop = event_loop->impl_data;

/* set thread id to the thread of the event loop */
aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id());

int err = s_subscribe_to_io_events(
event_loop, &epoll_loop->read_task_handle, AWS_IO_EVENT_TYPE_READABLE, s_on_tasks_to_schedule, NULL);
if (err) {
Expand Down Expand Up @@ -640,4 +650,6 @@ static void s_main_loop(void *args) {

AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
s_unsubscribe_from_io_events(event_loop, &epoll_loop->read_task_handle);
/* set thread id back to 0. This should be updated again in destroy, before tasks are canceled. */
aws_atomic_store_int(&epoll_loop->thread_id, (size_t)0);
}
16 changes: 14 additions & 2 deletions source/windows/iocp/iocp_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
#include <aws/io/event_loop.h>

#include <aws/common/atomics.h>
#include <aws/common/clock.h>
#include <aws/common/mutex.h>
#include <aws/common/task_scheduler.h>
Expand Down Expand Up @@ -70,6 +71,7 @@ typedef enum event_thread_state {
struct iocp_loop {
HANDLE iocp_handle;
struct aws_thread thread;
struct aws_atomic_var thread_id;

/* synced_data holds things that must be communicated across threads.
* When the event-thread is running, the mutex must be locked while anyone touches anything in synced_data.
Expand Down Expand Up @@ -190,6 +192,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw
goto clean_up;
}
AWS_ZERO_STRUCT(*impl);
/* initialize thread id to 0. This will be updated once the event loop thread starts. */
aws_atomic_init_int(&impl->thread_id, 0);

impl->iocp_handle = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, /* FileHandle: passing invalid handle creates a new IOCP */
Expand Down Expand Up @@ -285,6 +289,9 @@ static void s_destroy(struct aws_event_loop *event_loop) {
return;
}

/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());

/* Clean up task-related stuff first.
* It's possible the a cancelled task adds further tasks to this event_loop, these new tasks would end up in
* synced_data.tasks_to_schedule, so clean that up last */
Expand Down Expand Up @@ -476,9 +483,9 @@ static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *ta
static bool s_is_event_thread(struct aws_event_loop *event_loop) {
struct iocp_loop *impl = event_loop->impl_data;
assert(impl);
assert(aws_thread_get_detach_state(&impl->thread) == AWS_THREAD_JOINABLE);

return aws_thread_get_id(&impl->thread) == aws_thread_current_thread_id();
uint64_t el_thread_id = aws_atomic_load_int(&impl->thread_id);
return el_thread_id == aws_thread_current_thread_id();
}

/* Called from any thread */
Expand Down Expand Up @@ -610,6 +617,9 @@ static void s_event_thread_main(void *user_data) {

struct iocp_loop *impl = event_loop->impl_data;

/* Set thread id to event loop thread id. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());

assert(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;

Expand Down Expand Up @@ -712,4 +722,6 @@ static void s_event_thread_main(void *user_data) {
}
}
AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
/* set back to 0. This should be updated again in destroy, right before task cancelation happens. */
aws_atomic_store_int(&impl->thread_id, (size_t)0);
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ add_pipe_test_case(pipe_writes_are_fifo)
add_pipe_test_case(pipe_clean_up_cancels_pending_writes)

add_test_case(event_loop_xthread_scheduled_tasks_execute)
add_test_case(event_loop_canceled_tasks_run_in_el_thread)
if (USE_IO_COMPLETION_PORTS)
add_test_case(event_loop_completion_events)
else ()
Expand Down
Loading

0 comments on commit 596515e

Please sign in to comment.