From 596515efe3e1db7c34c1cb9cef9c7b7374409765 Mon Sep 17 00:00:00 2001
From: "Jonathan M. Henson" <jonathan.michael.henson@gmail.com>
Date: Mon, 8 Apr 2019 17:10:17 -0700
Subject: [PATCH] Event loop shutdown fixes (#122)

Event loops now report thread id accurately during shutdown.
---
 source/bsd/kqueue_event_loop.c        |  15 +++-
 source/libuv/uv_event_loop.c          |  22 +++++-
 source/linux/epoll_event_loop.c       |  18 ++++-
 source/windows/iocp/iocp_event_loop.c |  16 ++++-
 tests/CMakeLists.txt                  |   1 +
 tests/event_loop_test.c               | 100 ++++++++++++++++++++++++--
 6 files changed, 157 insertions(+), 15 deletions(-)

diff --git a/source/bsd/kqueue_event_loop.c b/source/bsd/kqueue_event_loop.c
index cd3354e64..61edd5a0b 100644
--- a/source/bsd/kqueue_event_loop.c
+++ b/source/bsd/kqueue_event_loop.c
@@ -17,6 +17,7 @@
 
 #include <aws/io/logging.h>
 
+#include <aws/common/atomics.h>
 #include <aws/common/clock.h>
 #include <aws/common/mutex.h>
 #include <aws/common/task_scheduler.h>
@@ -68,6 +69,7 @@ enum pipe_fd_index {
 
 struct kqueue_loop {
     struct aws_thread thread;
+    struct aws_atomic_var thread_id;
     int kq_fd; /* kqueue file descriptor */
 
     /* Pipe for signaling to event-thread that cross_thread_data has changed. */
@@ -161,6 +163,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw
     if (!impl) {
         goto clean_up;
     }
+    /* intialize thread id to 0. It will be set when the event loop thread starts. */
+    aws_atomic_init_int(&impl->thread_id, (size_t)0);
     clean_up_impl_mem = true;
     AWS_ZERO_STRUCT(*impl);
 
@@ -294,6 +298,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
         assert("Failed to destroy event-thread, resources have been leaked." == NULL);
         return;
     }
+    /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
+    aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
 
     /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
      * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
@@ -706,9 +712,9 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
 
 static bool s_is_event_thread(struct aws_event_loop *event_loop) {
     struct kqueue_loop *impl = event_loop->impl_data;
-    assert(aws_thread_get_detach_state(&impl->thread) == AWS_THREAD_JOINABLE);
 
-    return aws_thread_current_thread_id() == aws_thread_get_id(&impl->thread);
+    uint64_t thread_id = aws_atomic_load_int(&impl->thread_id);
+    return aws_thread_current_thread_id() == thread_id;
 }
 
 /* Called from thread.
@@ -793,6 +799,9 @@ static void s_event_thread_main(void *user_data) {
     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
     struct kqueue_loop *impl = event_loop->impl_data;
 
+    /* set thread id to the event-loop's thread. */
+    aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
+
     assert(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
     impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
 
@@ -950,4 +959,6 @@ static void s_event_thread_main(void *user_data) {
     }
 
     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
+    /* reset to 0. This should be updated again during destroy before tasks are canceled. */
+    aws_atomic_store_int(&impl->thread_id, 0);
 }
diff --git a/source/libuv/uv_event_loop.c b/source/libuv/uv_event_loop.c
index 7d0a645f7..bb3b5d1e8 100644
--- a/source/libuv/uv_event_loop.c
+++ b/source/libuv/uv_event_loop.c
@@ -15,6 +15,7 @@
 
 #include <aws/io/event_loop.h>
 
+#include <aws/common/atomics.h>
 #include <aws/common/clock.h>
 #include <aws/common/mutex.h>
 #include <aws/common/task_scheduler.h>
@@ -106,6 +107,7 @@ enum event_thread_state {
 /** This struct is owned by libuv_loop if the uv_loop is owned by us */
 struct libuv_owned {
     struct aws_thread thread;
+    struct aws_atomic_var thread_id;
     enum event_thread_state state;
 };
 static struct libuv_owned *s_owned(struct libuv_loop *loop) {
@@ -322,6 +324,11 @@ static void s_destroy(struct aws_event_loop *event_loop) {
     /* Wait for completion */
     s_wait_for_stop_completion(event_loop);
 
+    if (impl->owns_uv_loop) {
+        /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
+        aws_atomic_store_int(&impl->ownership_specific.uv_owned->thread_id, (size_t)aws_thread_current_thread_id());
+    }
+
     /* Tasks in scheduler get cancelled */
     aws_hash_table_foreach(&impl->active_thread_data.running_tasks, s_running_tasks_destroy, NULL);
     aws_hash_table_clean_up(&impl->active_thread_data.running_tasks);
@@ -356,12 +363,16 @@ static void s_thread_loop(void *args) {
     struct libuv_loop *impl = args;
 
     s_owned(impl)->state = EVENT_THREAD_STATE_RUNNING;
+    /* while thread is running we want to report the event-loop thread */
+    aws_atomic_store_int(&s_owned(impl)->thread_id, (size_t)aws_thread_current_thread_id());
 
     while (s_owned(impl)->state == EVENT_THREAD_STATE_RUNNING) {
         uv_run(impl->uv_loop, UV_RUN_ONCE);
     }
 
     s_owned(impl)->state = EVENT_THREAD_STATE_READY_TO_RUN;
+    /* set back to 0 for sanity. it should be updated in destroy before canceling tasks. */
+    aws_atomic_store_int(&s_owned(impl)->thread_id, (size_t)0);
 }
 
 static void s_uv_task_timer_cb(uv_timer_t *handle UV_STATUS_PARAM) {
@@ -774,8 +785,13 @@ static void s_free_io_event_resources(void *user_data) {
 static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
     struct libuv_loop *impl = event_loop->impl_data;
 
-    const uint64_t uv_tid =
-        impl->owns_uv_loop ? aws_thread_get_id(&s_owned(impl)->thread) : s_unowned(impl)->uv_thread_id;
+    uint64_t uv_tid = 0;
+
+    if (impl->owns_uv_loop) {
+        uv_tid = aws_atomic_load_int(&s_owned(impl)->thread_id);
+    } else {
+        uv_tid = s_unowned(impl)->uv_thread_id;
+    }
 
     return uv_tid == aws_thread_current_thread_id();
 }
@@ -806,6 +822,7 @@ static int s_libuv_loop_init(
     }
 
     impl->uv_loop = uv_loop;
+    /* init thread id to 0 until the event-loop thread starts. */
     aws_atomic_init_int(&impl->num_open_handles, 0);
 
     /* Init cross-thread data */
@@ -877,6 +894,7 @@ struct aws_event_loop *aws_event_loop_new_libuv(struct aws_allocator *alloc, aws
 
     impl->owns_uv_loop = true;
     impl->ownership_specific.uv_owned = owned;
+    aws_atomic_init_int(&impl->ownership_specific.uv_owned->thread_id, 0);
 
 #if UV_VERSION_MAJOR == 0
     uv_loop = uv_loop_new();
diff --git a/source/linux/epoll_event_loop.c b/source/linux/epoll_event_loop.c
index 2c52c280f..612df56f4 100644
--- a/source/linux/epoll_event_loop.c
+++ b/source/linux/epoll_event_loop.c
@@ -15,6 +15,7 @@
 
 #include <aws/io/event_loop.h>
 
+#include <aws/common/atomics.h>
 #include <aws/common/clock.h>
 #include <aws/common/mutex.h>
 #include <aws/common/task_scheduler.h>
@@ -88,14 +89,15 @@ static struct aws_event_loop_vtable s_vtable = {
 struct epoll_loop {
     struct aws_task_scheduler scheduler;
     struct aws_thread thread;
+    struct aws_atomic_var thread_id;
     struct aws_io_handle read_task_handle;
     struct aws_io_handle write_task_handle;
     struct aws_mutex task_pre_queue_mutex;
     struct aws_linked_list task_pre_queue;
-    bool should_process_task_pre_queue;
+    struct aws_task stop_task;
     int epoll_fd;
+    bool should_process_task_pre_queue;
     bool should_continue;
-    struct aws_task stop_task;
 };
 
 struct epoll_event_data {
@@ -135,6 +137,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw
     }
 
     AWS_ZERO_STRUCT(*epoll_loop);
+    /* initialize thread id to 0, it should be updated when the event loop thread starts. */
+    aws_atomic_init_int(&epoll_loop->thread_id, 0);
 
     aws_linked_list_init(&epoll_loop->task_pre_queue);
     epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
@@ -232,6 +236,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
     aws_event_loop_stop(event_loop);
     s_wait_for_stop_completion(event_loop);
 
+    /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
+    aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id());
     aws_task_scheduler_clean_up(&epoll_loop->scheduler);
 
     while (!aws_linked_list_empty(&epoll_loop->task_pre_queue)) {
@@ -460,7 +466,8 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
 static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
     struct epoll_loop *epoll_loop = event_loop->impl_data;
 
-    return aws_thread_current_thread_id() == aws_thread_get_id(&epoll_loop->thread);
+    uint64_t thread_id = aws_atomic_load_int(&epoll_loop->thread_id);
+    return aws_thread_current_thread_id() == thread_id;
 }
 
 /* We treat the pipe fd with a subscription to io events just like any other managed file descriptor.
@@ -529,6 +536,9 @@ static void s_main_loop(void *args) {
     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
     struct epoll_loop *epoll_loop = event_loop->impl_data;
 
+    /* set thread id to the thread of the event loop */
+    aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id());
+
     int err = s_subscribe_to_io_events(
         event_loop, &epoll_loop->read_task_handle, AWS_IO_EVENT_TYPE_READABLE, s_on_tasks_to_schedule, NULL);
     if (err) {
@@ -640,4 +650,6 @@ static void s_main_loop(void *args) {
 
     AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
     s_unsubscribe_from_io_events(event_loop, &epoll_loop->read_task_handle);
+    /* set thread id back to 0. This should be updated again in destroy, before tasks are canceled. */
+    aws_atomic_store_int(&epoll_loop->thread_id, (size_t)0);
 }
diff --git a/source/windows/iocp/iocp_event_loop.c b/source/windows/iocp/iocp_event_loop.c
index d0313ce52..228ea8a49 100644
--- a/source/windows/iocp/iocp_event_loop.c
+++ b/source/windows/iocp/iocp_event_loop.c
@@ -14,6 +14,7 @@
  */
 #include <aws/io/event_loop.h>
 
+#include <aws/common/atomics.h>
 #include <aws/common/clock.h>
 #include <aws/common/mutex.h>
 #include <aws/common/task_scheduler.h>
@@ -70,6 +71,7 @@ typedef enum event_thread_state {
 struct iocp_loop {
     HANDLE iocp_handle;
     struct aws_thread thread;
+    struct aws_atomic_var thread_id;
 
     /* synced_data holds things that must be communicated across threads.
      * When the event-thread is running, the mutex must be locked while anyone touches anything in synced_data.
@@ -190,6 +192,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw
         goto clean_up;
     }
     AWS_ZERO_STRUCT(*impl);
+    /* initialize thread id to 0. This will be updated once the event loop thread starts. */
+    aws_atomic_init_int(&impl->thread_id, 0);
 
     impl->iocp_handle = CreateIoCompletionPort(
         INVALID_HANDLE_VALUE, /* FileHandle: passing invalid handle creates a new IOCP */
@@ -285,6 +289,9 @@ static void s_destroy(struct aws_event_loop *event_loop) {
         return;
     }
 
+    /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
+    aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
+
     /* Clean up task-related stuff first.
      * It's possible the a cancelled task adds further tasks to this event_loop, these new tasks would end up in
      * synced_data.tasks_to_schedule, so clean that up last */
@@ -476,9 +483,9 @@ static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *ta
 static bool s_is_event_thread(struct aws_event_loop *event_loop) {
     struct iocp_loop *impl = event_loop->impl_data;
     assert(impl);
-    assert(aws_thread_get_detach_state(&impl->thread) == AWS_THREAD_JOINABLE);
 
-    return aws_thread_get_id(&impl->thread) == aws_thread_current_thread_id();
+    uint64_t el_thread_id = aws_atomic_load_int(&impl->thread_id);
+    return el_thread_id == aws_thread_current_thread_id();
 }
 
 /* Called from any thread */
@@ -610,6 +617,9 @@ static void s_event_thread_main(void *user_data) {
 
     struct iocp_loop *impl = event_loop->impl_data;
 
+    /* Set thread id to event loop thread id. */
+    aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
+
     assert(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
     impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
 
@@ -712,4 +722,6 @@ static void s_event_thread_main(void *user_data) {
         }
     }
     AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
+    /* set back to 0. This should be updated again in destroy, right before task cancelation happens. */
+    aws_atomic_store_int(&impl->thread_id, (size_t)0);
 }
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index bdcfa43e6..fcedd7ce3 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -27,6 +27,7 @@ add_pipe_test_case(pipe_writes_are_fifo)
 add_pipe_test_case(pipe_clean_up_cancels_pending_writes)
 
 add_test_case(event_loop_xthread_scheduled_tasks_execute)
+add_test_case(event_loop_canceled_tasks_run_in_el_thread)
 if (USE_IO_COMPLETION_PORTS)
     add_test_case(event_loop_completion_events)
 else ()
diff --git a/tests/event_loop_test.c b/tests/event_loop_test.c
index dffa46efe..d89dbd70a 100644
--- a/tests/event_loop_test.c
+++ b/tests/event_loop_test.c
@@ -19,23 +19,30 @@
 #include <aws/common/task_scheduler.h>
 #include <aws/io/event_loop.h>
 
+#include <aws/common/thread.h>
 #include <aws/testing/aws_test_harness.h>
 
 struct task_args {
     bool invoked;
+    bool was_in_thread;
+    uint64_t thread_id;
+    struct aws_event_loop *loop;
+    enum aws_task_status status;
     struct aws_mutex mutex;
     struct aws_condition_variable condition_variable;
 };
 
 static void s_test_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
     (void)task;
-    (void)status;
     struct task_args *args = user_data;
 
     aws_mutex_lock(&args->mutex);
+    args->thread_id = aws_thread_current_thread_id();
     args->invoked = true;
-    aws_condition_variable_notify_one(&args->condition_variable);
+    args->status = status;
+    args->was_in_thread = aws_event_loop_thread_is_callers_thread(args->loop);
     aws_mutex_unlock((&args->mutex));
+    aws_condition_variable_notify_one(&args->condition_variable);
 }
 
 static bool s_task_ran_predicate(void *args) {
@@ -53,8 +60,13 @@ static int s_test_event_loop_xthread_scheduled_tasks_execute(struct aws_allocato
     ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error()));
     ASSERT_SUCCESS(aws_event_loop_run(event_loop));
 
-    struct task_args task_args = {
-        .condition_variable = AWS_CONDITION_VARIABLE_INIT, .mutex = AWS_MUTEX_INIT, .invoked = false};
+    struct task_args task_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT,
+                                  .mutex = AWS_MUTEX_INIT,
+                                  .invoked = false,
+                                  .was_in_thread = false,
+                                  .status = -1,
+                                  .loop = event_loop,
+                                  .thread_id = 0};
 
     struct aws_task task;
     aws_task_init(&task, s_test_task, &task_args);
@@ -71,6 +83,8 @@ static int s_test_event_loop_xthread_scheduled_tasks_execute(struct aws_allocato
     ASSERT_TRUE(task_args.invoked);
     aws_mutex_unlock(&task_args.mutex);
 
+    ASSERT_FALSE(task_args.thread_id == aws_thread_current_thread_id());
+
     /* Test "now" tasks */
     task_args.invoked = false;
     ASSERT_SUCCESS(aws_mutex_lock(&task_args.mutex));
@@ -89,6 +103,75 @@ static int s_test_event_loop_xthread_scheduled_tasks_execute(struct aws_allocato
 
 AWS_TEST_CASE(event_loop_xthread_scheduled_tasks_execute, s_test_event_loop_xthread_scheduled_tasks_execute)
 
+static bool s_test_cancel_thread_task_predicate(void *args) {
+    struct task_args *task_args = args;
+    return task_args->invoked;
+}
+/*
+ * Test that a scheduled task from a non-event loop owned thread executes.
+ */
+static int s_test_event_loop_canceled_tasks_run_in_el_thread(struct aws_allocator *allocator, void *ctx) {
+
+    (void)ctx;
+    struct aws_event_loop *event_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks);
+
+    ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error()));
+    ASSERT_SUCCESS(aws_event_loop_run(event_loop));
+
+    struct task_args task1_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT,
+                                   .mutex = AWS_MUTEX_INIT,
+                                   .invoked = false,
+                                   .was_in_thread = false,
+                                   .status = -1,
+                                   .loop = event_loop,
+                                   .thread_id = 0};
+    struct task_args task2_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT,
+                                   .mutex = AWS_MUTEX_INIT,
+                                   .invoked = false,
+                                   .was_in_thread = false,
+                                   .status = -1,
+                                   .loop = event_loop,
+                                   .thread_id = 0};
+
+    struct aws_task task1;
+    aws_task_init(&task1, s_test_task, &task1_args);
+    struct aws_task task2;
+    aws_task_init(&task2, s_test_task, &task2_args);
+
+    aws_event_loop_schedule_task_now(event_loop, &task1);
+    uint64_t now;
+    ASSERT_SUCCESS(aws_event_loop_current_clock_time(event_loop, &now));
+    aws_event_loop_schedule_task_future(event_loop, &task2, now + 10000000000);
+
+    ASSERT_FALSE(aws_event_loop_thread_is_callers_thread(event_loop));
+
+    ASSERT_SUCCESS(aws_mutex_lock(&task1_args.mutex));
+    ASSERT_SUCCESS(aws_condition_variable_wait_pred(
+        &task1_args.condition_variable, &task1_args.mutex, s_task_ran_predicate, &task1_args));
+    ASSERT_TRUE(task1_args.invoked);
+    ASSERT_TRUE(task1_args.was_in_thread);
+    ASSERT_FALSE(task1_args.thread_id == aws_thread_current_thread_id());
+    ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task1_args.status);
+    aws_mutex_unlock(&task1_args.mutex);
+
+    aws_event_loop_destroy(event_loop);
+
+    aws_mutex_lock(&task1_args.mutex);
+
+    ASSERT_SUCCESS(aws_condition_variable_wait_pred(
+        &task2_args.condition_variable, &task2_args.mutex, s_test_cancel_thread_task_predicate, &task2_args));
+    ASSERT_TRUE(task2_args.invoked);
+    aws_mutex_unlock(&task2_args.mutex);
+
+    ASSERT_TRUE(task2_args.was_in_thread);
+    ASSERT_TRUE(task2_args.thread_id == aws_thread_current_thread_id());
+    ASSERT_INT_EQUALS(AWS_TASK_STATUS_CANCELED, task2_args.status);
+
+    return AWS_OP_SUCCESS;
+}
+
+AWS_TEST_CASE(event_loop_canceled_tasks_run_in_el_thread, s_test_event_loop_canceled_tasks_run_in_el_thread)
+
 #if AWS_USE_IO_COMPLETION_PORTS
 
 int aws_pipe_get_unique_name(char *dst, size_t dst_size);
@@ -896,8 +979,13 @@ static int s_event_loop_test_stop_then_restart(struct aws_allocator *allocator,
     ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error()));
     ASSERT_SUCCESS(aws_event_loop_run(event_loop));
 
-    struct task_args task_args = {
-        .condition_variable = AWS_CONDITION_VARIABLE_INIT, .mutex = AWS_MUTEX_INIT, .invoked = false};
+    struct task_args task_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT,
+                                  .mutex = AWS_MUTEX_INIT,
+                                  .invoked = false,
+                                  .was_in_thread = false,
+                                  .status = -1,
+                                  .loop = event_loop,
+                                  .thread_id = 0};
 
     struct aws_task task;
     aws_task_init(&task, s_test_task, &task_args);