diff --git a/source/bsd/kqueue_event_loop.c b/source/bsd/kqueue_event_loop.c index cd3354e64..61edd5a0b 100644 --- a/source/bsd/kqueue_event_loop.c +++ b/source/bsd/kqueue_event_loop.c @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -68,6 +69,7 @@ enum pipe_fd_index { struct kqueue_loop { struct aws_thread thread; + struct aws_atomic_var thread_id; int kq_fd; /* kqueue file descriptor */ /* Pipe for signaling to event-thread that cross_thread_data has changed. */ @@ -161,6 +163,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw if (!impl) { goto clean_up; } + /* intialize thread id to 0. It will be set when the event loop thread starts. */ + aws_atomic_init_int(&impl->thread_id, (size_t)0); clean_up_impl_mem = true; AWS_ZERO_STRUCT(*impl); @@ -294,6 +298,8 @@ static void s_destroy(struct aws_event_loop *event_loop) { assert("Failed to destroy event-thread, resources have been leaked." == NULL); return; } + /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */ + aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id()); /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop. * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */ @@ -706,9 +712,9 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc static bool s_is_event_thread(struct aws_event_loop *event_loop) { struct kqueue_loop *impl = event_loop->impl_data; - assert(aws_thread_get_detach_state(&impl->thread) == AWS_THREAD_JOINABLE); - return aws_thread_current_thread_id() == aws_thread_get_id(&impl->thread); + uint64_t thread_id = aws_atomic_load_int(&impl->thread_id); + return aws_thread_current_thread_id() == thread_id; } /* Called from thread. @@ -793,6 +799,9 @@ static void s_event_thread_main(void *user_data) { AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop); struct kqueue_loop *impl = event_loop->impl_data; + /* set thread id to the event-loop's thread. */ + aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id()); + assert(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN); impl->thread_data.state = EVENT_THREAD_STATE_RUNNING; @@ -950,4 +959,6 @@ static void s_event_thread_main(void *user_data) { } AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop); + /* reset to 0. This should be updated again during destroy before tasks are canceled. */ + aws_atomic_store_int(&impl->thread_id, 0); } diff --git a/source/libuv/uv_event_loop.c b/source/libuv/uv_event_loop.c index 7d0a645f7..bb3b5d1e8 100644 --- a/source/libuv/uv_event_loop.c +++ b/source/libuv/uv_event_loop.c @@ -15,6 +15,7 @@ #include +#include #include #include #include @@ -106,6 +107,7 @@ enum event_thread_state { /** This struct is owned by libuv_loop if the uv_loop is owned by us */ struct libuv_owned { struct aws_thread thread; + struct aws_atomic_var thread_id; enum event_thread_state state; }; static struct libuv_owned *s_owned(struct libuv_loop *loop) { @@ -322,6 +324,11 @@ static void s_destroy(struct aws_event_loop *event_loop) { /* Wait for completion */ s_wait_for_stop_completion(event_loop); + if (impl->owns_uv_loop) { + /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */ + aws_atomic_store_int(&impl->ownership_specific.uv_owned->thread_id, (size_t)aws_thread_current_thread_id()); + } + /* Tasks in scheduler get cancelled */ aws_hash_table_foreach(&impl->active_thread_data.running_tasks, s_running_tasks_destroy, NULL); aws_hash_table_clean_up(&impl->active_thread_data.running_tasks); @@ -356,12 +363,16 @@ static void s_thread_loop(void *args) { struct libuv_loop *impl = args; s_owned(impl)->state = EVENT_THREAD_STATE_RUNNING; + /* while thread is running we want to report the event-loop thread */ + aws_atomic_store_int(&s_owned(impl)->thread_id, (size_t)aws_thread_current_thread_id()); while (s_owned(impl)->state == EVENT_THREAD_STATE_RUNNING) { uv_run(impl->uv_loop, UV_RUN_ONCE); } s_owned(impl)->state = EVENT_THREAD_STATE_READY_TO_RUN; + /* set back to 0 for sanity. it should be updated in destroy before canceling tasks. */ + aws_atomic_store_int(&s_owned(impl)->thread_id, (size_t)0); } static void s_uv_task_timer_cb(uv_timer_t *handle UV_STATUS_PARAM) { @@ -774,8 +785,13 @@ static void s_free_io_event_resources(void *user_data) { static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) { struct libuv_loop *impl = event_loop->impl_data; - const uint64_t uv_tid = - impl->owns_uv_loop ? aws_thread_get_id(&s_owned(impl)->thread) : s_unowned(impl)->uv_thread_id; + uint64_t uv_tid = 0; + + if (impl->owns_uv_loop) { + uv_tid = aws_atomic_load_int(&s_owned(impl)->thread_id); + } else { + uv_tid = s_unowned(impl)->uv_thread_id; + } return uv_tid == aws_thread_current_thread_id(); } @@ -806,6 +822,7 @@ static int s_libuv_loop_init( } impl->uv_loop = uv_loop; + /* init thread id to 0 until the event-loop thread starts. */ aws_atomic_init_int(&impl->num_open_handles, 0); /* Init cross-thread data */ @@ -877,6 +894,7 @@ struct aws_event_loop *aws_event_loop_new_libuv(struct aws_allocator *alloc, aws impl->owns_uv_loop = true; impl->ownership_specific.uv_owned = owned; + aws_atomic_init_int(&impl->ownership_specific.uv_owned->thread_id, 0); #if UV_VERSION_MAJOR == 0 uv_loop = uv_loop_new(); diff --git a/source/linux/epoll_event_loop.c b/source/linux/epoll_event_loop.c index 2c52c280f..612df56f4 100644 --- a/source/linux/epoll_event_loop.c +++ b/source/linux/epoll_event_loop.c @@ -15,6 +15,7 @@ #include +#include #include #include #include @@ -88,14 +89,15 @@ static struct aws_event_loop_vtable s_vtable = { struct epoll_loop { struct aws_task_scheduler scheduler; struct aws_thread thread; + struct aws_atomic_var thread_id; struct aws_io_handle read_task_handle; struct aws_io_handle write_task_handle; struct aws_mutex task_pre_queue_mutex; struct aws_linked_list task_pre_queue; - bool should_process_task_pre_queue; + struct aws_task stop_task; int epoll_fd; + bool should_process_task_pre_queue; bool should_continue; - struct aws_task stop_task; }; struct epoll_event_data { @@ -135,6 +137,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw } AWS_ZERO_STRUCT(*epoll_loop); + /* initialize thread id to 0, it should be updated when the event loop thread starts. */ + aws_atomic_init_int(&epoll_loop->thread_id, 0); aws_linked_list_init(&epoll_loop->task_pre_queue); epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT; @@ -232,6 +236,8 @@ static void s_destroy(struct aws_event_loop *event_loop) { aws_event_loop_stop(event_loop); s_wait_for_stop_completion(event_loop); + /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */ + aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id()); aws_task_scheduler_clean_up(&epoll_loop->scheduler); while (!aws_linked_list_empty(&epoll_loop->task_pre_queue)) { @@ -460,7 +466,8 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) { struct epoll_loop *epoll_loop = event_loop->impl_data; - return aws_thread_current_thread_id() == aws_thread_get_id(&epoll_loop->thread); + uint64_t thread_id = aws_atomic_load_int(&epoll_loop->thread_id); + return aws_thread_current_thread_id() == thread_id; } /* We treat the pipe fd with a subscription to io events just like any other managed file descriptor. @@ -529,6 +536,9 @@ static void s_main_loop(void *args) { AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop); struct epoll_loop *epoll_loop = event_loop->impl_data; + /* set thread id to the thread of the event loop */ + aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id()); + int err = s_subscribe_to_io_events( event_loop, &epoll_loop->read_task_handle, AWS_IO_EVENT_TYPE_READABLE, s_on_tasks_to_schedule, NULL); if (err) { @@ -640,4 +650,6 @@ static void s_main_loop(void *args) { AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop); s_unsubscribe_from_io_events(event_loop, &epoll_loop->read_task_handle); + /* set thread id back to 0. This should be updated again in destroy, before tasks are canceled. */ + aws_atomic_store_int(&epoll_loop->thread_id, (size_t)0); } diff --git a/source/windows/iocp/iocp_event_loop.c b/source/windows/iocp/iocp_event_loop.c index d0313ce52..228ea8a49 100644 --- a/source/windows/iocp/iocp_event_loop.c +++ b/source/windows/iocp/iocp_event_loop.c @@ -14,6 +14,7 @@ */ #include +#include #include #include #include @@ -70,6 +71,7 @@ typedef enum event_thread_state { struct iocp_loop { HANDLE iocp_handle; struct aws_thread thread; + struct aws_atomic_var thread_id; /* synced_data holds things that must be communicated across threads. * When the event-thread is running, the mutex must be locked while anyone touches anything in synced_data. @@ -190,6 +192,8 @@ struct aws_event_loop *aws_event_loop_new_system(struct aws_allocator *alloc, aw goto clean_up; } AWS_ZERO_STRUCT(*impl); + /* initialize thread id to 0. This will be updated once the event loop thread starts. */ + aws_atomic_init_int(&impl->thread_id, 0); impl->iocp_handle = CreateIoCompletionPort( INVALID_HANDLE_VALUE, /* FileHandle: passing invalid handle creates a new IOCP */ @@ -285,6 +289,9 @@ static void s_destroy(struct aws_event_loop *event_loop) { return; } + /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */ + aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id()); + /* Clean up task-related stuff first. * It's possible the a cancelled task adds further tasks to this event_loop, these new tasks would end up in * synced_data.tasks_to_schedule, so clean that up last */ @@ -476,9 +483,9 @@ static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *ta static bool s_is_event_thread(struct aws_event_loop *event_loop) { struct iocp_loop *impl = event_loop->impl_data; assert(impl); - assert(aws_thread_get_detach_state(&impl->thread) == AWS_THREAD_JOINABLE); - return aws_thread_get_id(&impl->thread) == aws_thread_current_thread_id(); + uint64_t el_thread_id = aws_atomic_load_int(&impl->thread_id); + return el_thread_id == aws_thread_current_thread_id(); } /* Called from any thread */ @@ -610,6 +617,9 @@ static void s_event_thread_main(void *user_data) { struct iocp_loop *impl = event_loop->impl_data; + /* Set thread id to event loop thread id. */ + aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id()); + assert(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN); impl->thread_data.state = EVENT_THREAD_STATE_RUNNING; @@ -712,4 +722,6 @@ static void s_event_thread_main(void *user_data) { } } AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop); + /* set back to 0. This should be updated again in destroy, right before task cancelation happens. */ + aws_atomic_store_int(&impl->thread_id, (size_t)0); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index bdcfa43e6..fcedd7ce3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -27,6 +27,7 @@ add_pipe_test_case(pipe_writes_are_fifo) add_pipe_test_case(pipe_clean_up_cancels_pending_writes) add_test_case(event_loop_xthread_scheduled_tasks_execute) +add_test_case(event_loop_canceled_tasks_run_in_el_thread) if (USE_IO_COMPLETION_PORTS) add_test_case(event_loop_completion_events) else () diff --git a/tests/event_loop_test.c b/tests/event_loop_test.c index dffa46efe..d89dbd70a 100644 --- a/tests/event_loop_test.c +++ b/tests/event_loop_test.c @@ -19,23 +19,30 @@ #include #include +#include #include struct task_args { bool invoked; + bool was_in_thread; + uint64_t thread_id; + struct aws_event_loop *loop; + enum aws_task_status status; struct aws_mutex mutex; struct aws_condition_variable condition_variable; }; static void s_test_task(struct aws_task *task, void *user_data, enum aws_task_status status) { (void)task; - (void)status; struct task_args *args = user_data; aws_mutex_lock(&args->mutex); + args->thread_id = aws_thread_current_thread_id(); args->invoked = true; - aws_condition_variable_notify_one(&args->condition_variable); + args->status = status; + args->was_in_thread = aws_event_loop_thread_is_callers_thread(args->loop); aws_mutex_unlock((&args->mutex)); + aws_condition_variable_notify_one(&args->condition_variable); } static bool s_task_ran_predicate(void *args) { @@ -53,8 +60,13 @@ static int s_test_event_loop_xthread_scheduled_tasks_execute(struct aws_allocato ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error())); ASSERT_SUCCESS(aws_event_loop_run(event_loop)); - struct task_args task_args = { - .condition_variable = AWS_CONDITION_VARIABLE_INIT, .mutex = AWS_MUTEX_INIT, .invoked = false}; + struct task_args task_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT, + .mutex = AWS_MUTEX_INIT, + .invoked = false, + .was_in_thread = false, + .status = -1, + .loop = event_loop, + .thread_id = 0}; struct aws_task task; aws_task_init(&task, s_test_task, &task_args); @@ -71,6 +83,8 @@ static int s_test_event_loop_xthread_scheduled_tasks_execute(struct aws_allocato ASSERT_TRUE(task_args.invoked); aws_mutex_unlock(&task_args.mutex); + ASSERT_FALSE(task_args.thread_id == aws_thread_current_thread_id()); + /* Test "now" tasks */ task_args.invoked = false; ASSERT_SUCCESS(aws_mutex_lock(&task_args.mutex)); @@ -89,6 +103,75 @@ static int s_test_event_loop_xthread_scheduled_tasks_execute(struct aws_allocato AWS_TEST_CASE(event_loop_xthread_scheduled_tasks_execute, s_test_event_loop_xthread_scheduled_tasks_execute) +static bool s_test_cancel_thread_task_predicate(void *args) { + struct task_args *task_args = args; + return task_args->invoked; +} +/* + * Test that a scheduled task from a non-event loop owned thread executes. + */ +static int s_test_event_loop_canceled_tasks_run_in_el_thread(struct aws_allocator *allocator, void *ctx) { + + (void)ctx; + struct aws_event_loop *event_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks); + + ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error())); + ASSERT_SUCCESS(aws_event_loop_run(event_loop)); + + struct task_args task1_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT, + .mutex = AWS_MUTEX_INIT, + .invoked = false, + .was_in_thread = false, + .status = -1, + .loop = event_loop, + .thread_id = 0}; + struct task_args task2_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT, + .mutex = AWS_MUTEX_INIT, + .invoked = false, + .was_in_thread = false, + .status = -1, + .loop = event_loop, + .thread_id = 0}; + + struct aws_task task1; + aws_task_init(&task1, s_test_task, &task1_args); + struct aws_task task2; + aws_task_init(&task2, s_test_task, &task2_args); + + aws_event_loop_schedule_task_now(event_loop, &task1); + uint64_t now; + ASSERT_SUCCESS(aws_event_loop_current_clock_time(event_loop, &now)); + aws_event_loop_schedule_task_future(event_loop, &task2, now + 10000000000); + + ASSERT_FALSE(aws_event_loop_thread_is_callers_thread(event_loop)); + + ASSERT_SUCCESS(aws_mutex_lock(&task1_args.mutex)); + ASSERT_SUCCESS(aws_condition_variable_wait_pred( + &task1_args.condition_variable, &task1_args.mutex, s_task_ran_predicate, &task1_args)); + ASSERT_TRUE(task1_args.invoked); + ASSERT_TRUE(task1_args.was_in_thread); + ASSERT_FALSE(task1_args.thread_id == aws_thread_current_thread_id()); + ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task1_args.status); + aws_mutex_unlock(&task1_args.mutex); + + aws_event_loop_destroy(event_loop); + + aws_mutex_lock(&task1_args.mutex); + + ASSERT_SUCCESS(aws_condition_variable_wait_pred( + &task2_args.condition_variable, &task2_args.mutex, s_test_cancel_thread_task_predicate, &task2_args)); + ASSERT_TRUE(task2_args.invoked); + aws_mutex_unlock(&task2_args.mutex); + + ASSERT_TRUE(task2_args.was_in_thread); + ASSERT_TRUE(task2_args.thread_id == aws_thread_current_thread_id()); + ASSERT_INT_EQUALS(AWS_TASK_STATUS_CANCELED, task2_args.status); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(event_loop_canceled_tasks_run_in_el_thread, s_test_event_loop_canceled_tasks_run_in_el_thread) + #if AWS_USE_IO_COMPLETION_PORTS int aws_pipe_get_unique_name(char *dst, size_t dst_size); @@ -896,8 +979,13 @@ static int s_event_loop_test_stop_then_restart(struct aws_allocator *allocator, ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error())); ASSERT_SUCCESS(aws_event_loop_run(event_loop)); - struct task_args task_args = { - .condition_variable = AWS_CONDITION_VARIABLE_INIT, .mutex = AWS_MUTEX_INIT, .invoked = false}; + struct task_args task_args = {.condition_variable = AWS_CONDITION_VARIABLE_INIT, + .mutex = AWS_MUTEX_INIT, + .invoked = false, + .was_in_thread = false, + .status = -1, + .loop = event_loop, + .thread_id = 0}; struct aws_task task; aws_task_init(&task, s_test_task, &task_args);