-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fix DefaultExecutor destruction deadlock if worker thread owns the Executor #3288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d8d7232
to
0697427
Compare
cv.wait_for(lock, std::chrono::seconds(60), [&taskStarted]() { return taskStarted; }); | ||
} | ||
ASSERT_TRUE(taskStarted) << "Async task has not started within 60 seconds!"; | ||
if (!taskStarted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if task has not started the assertion will end the test in the previous line, so I don't think this is necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it may let the abandoned thread to be run, resulting in the test application continuing to run.
This is just a safeguard from stuck CI jobs.
|
||
ASSERT_TRUE(taskCanContinue) << "Async task has not been allowed to continue withing 60 seconds!"; | ||
ASSERT_TRUE(pExec); | ||
ASSERT_EQ(1, pExec.use_count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how this one works, because inside the lambda the shared ptr passed by value should have increased ref count to 2. maybe some clarifying inline comments will be very useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see it waits for another signal where one reference count will be decreased
if (m_executor) { | ||
m_executor->Detach(std::this_thread::get_id()); | ||
} | ||
Aws::Delete(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im not sure i love this because a segfault for calling execute twice seems like a bad failure mode for this. could we maybe place a level indirection above the task that prevents it from being called twice ever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I has the same thought... if you think about it more, after the task has been destructed, you cannot trust anything under this->
, as it is already destroyed.
The indirection here is
using DefaultExecutorTaskPair = std::pair<std::thread, DefaultExecutorTask*>;
Aws::UnorderedMap<std::thread::id, DefaultExecutorTaskPair> m_tasks;
I.e. the indirection is thread_id used as a key.
We need this structure to update the task after it has been launched, so we need to have access to the task context from the Executor, but also we need to allow the Task to self-destroy.
There is another potential issue: Executor cannot access/update any DefaultExecutorTask (this will create a race condition), it can do it if being called from the same thread of the Task.
{ | ||
// If somehow the shared_ptr of the Executor gets owned by a worker thread of that Executor - Executor shutdown must not deadlock | ||
auto pExec = Aws::MakeShared<PooledThreadExecutor>("WorkerThreadTheOnlyOwner", 4); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Food for thought: Is it possible to simplify the test a little more for ease of maintenance and debug? Conceptually I get that you want to test the detached worker thread to solely own the ptr to executor and then shut it down. Instead of signally back and forth in between the main and detached thread, can we not just signal from the main thread to the detached thread once . The main thread creates and submit task to detached thread, resets its ownership and just wait for test case finish. The worker thread just waits till the signal , ensures sole ownership and shuts down. So main thread has one notify, one wait. Worker thread has one wait and then one notify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrapped synchronization blocks into sendSignal
and waitForSignal
functions, it should make the test more readable.
can we not just signal from the main thread to the detached thread once . The main thread creates and submit task to detached thread, resets its ownership and just wait for test case finish
This creates a race condition in the test, potentially not testing the intended idea.
By not waiting in the task, there a chance that async task will finish before main thread releases the executor.
The current synchronization logic always forces the tested scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would imagine that after the worker thread is started (in immediate dispatch mode) then the race condition u mentioned can be avoided but sure I see the point why its complex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, computers/implementations are different, and "immediate" is surprisingly in fact not always immediate.
DefaultExecutorTask(DefaultExecutorTask&&) = default; | ||
|
||
void Execute() { | ||
assert(m_task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will m_task ever be null here? its already validated in the constructor where it was moved to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very good comment!
The only issue is that...... the owning executor class is not aware if it is inside shared pointer or an independent object.
@@ -31,14 +31,39 @@ namespace Aws | |||
|
|||
void WaitUntilStopped() override; | |||
protected: | |||
struct DefaultExecutorTask { | |||
DefaultExecutorTask(std::function<void()>&& task, DefaultExecutor* executor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explicit is needed right? else DefaultExecutorTask() can cause issue in execute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is multi-argument constructor, implicit construction will happen only in case of brace-init.
} | ||
|
||
std::function<void()> m_task; | ||
DefaultExecutor* m_executor = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we not use reference to shared ptr and then use weak ptr to access it for more thread safety .
otherwise
m_executor->Detach is not really thread safe as of now.
0697427
to
45031d6
Compare
WaitUntilStopped(); | ||
pImpl.reset(); | ||
} | ||
return *this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the executor is in invalid state after this method, need to re-create impl.
return false; | ||
} | ||
|
||
auto* task = Aws::New<DefaultExecutorTask>(DEFAULT_EXECUTOR_LOG_TAG, std::move(fx), spThis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Executor is in Shutdown after this line - it will be a memory leak.
void Execute() { | ||
assert(m_task); | ||
m_task(); | ||
if (const auto spExecutor = m_executor.lock()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if m_executor pimpl is being destructed, the lock() will return null
and Aws::Delete(this)
will result in Detach being skipped, then this task is deleted
then the destructor of pimpl may try to call because the task is still present on the task map.
taskItem.second.second->DoNotDetach();
6e928a7
to
2baebbb
Compare
2baebbb
to
c05068e
Compare
…ecutor (#3288) * Fix Executors destruction deadlock if worker thread owns the Executor * Refactor DefaultExecutor into PIMPL * Remove spin lock from thread executor, catch concurrent task submission and shutdown
Issue #, if available:
#3282
Description of changes:
Built-in SDK thread executors to handle the case when one of their workers/tasks is the only/final owner of the executor class.
Avoids the deadlock on the cyclic destruction.
Also, additionally, force log flush after FATAL log event has been logged to avoid logs buffering.
Check all that applies:
Check which platforms you have built SDK on to verify the correctness of this PR.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.