Skip to content

Fix DefaultExecutor destruction deadlock if worker thread owns the Executor #3288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 12, 2025

Conversation

SergeyRyabinin
Copy link
Contributor

@SergeyRyabinin SergeyRyabinin commented Feb 5, 2025

Issue #, if available:
#3282
Description of changes:
Built-in SDK thread executors to handle the case when one of their workers/tasks is the only/final owner of the executor class.
Avoids the deadlock on the cyclic destruction.

Also, additionally, force log flush after FATAL log event has been logged to avoid logs buffering.

Check all that applies:

  • Did a review by yourself.
  • Added proper tests to cover this PR. (If tests are not applicable, explain.)
  • Checked if this PR is a breaking (APIs have been changed) change.
  • Checked if this PR will not introduce cross-platform inconsistent behavior.
  • Checked if this PR would require a ReadMe/Wiki update.

Check which platforms you have built SDK on to verify the correctness of this PR.

  • Linux
  • Windows
  • Android
  • MacOS
  • IOS
  • Other Platforms

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@SergeyRyabinin SergeyRyabinin marked this pull request as ready for review February 6, 2025 00:07
@SergeyRyabinin SergeyRyabinin changed the title [draft] Fix DefaultExecutor destruction deadlock if worker thread owns the Executor Fix DefaultExecutor destruction deadlock if worker thread owns the Executor Feb 6, 2025
cv.wait_for(lock, std::chrono::seconds(60), [&taskStarted]() { return taskStarted; });
}
ASSERT_TRUE(taskStarted) << "Async task has not started within 60 seconds!";
if (!taskStarted) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if task has not started the assertion will end the test in the previous line, so I don't think this is necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it may let the abandoned thread to be run, resulting in the test application continuing to run.
This is just a safeguard from stuck CI jobs.


ASSERT_TRUE(taskCanContinue) << "Async task has not been allowed to continue withing 60 seconds!";
ASSERT_TRUE(pExec);
ASSERT_EQ(1, pExec.use_count());
Copy link
Contributor

@sbera87 sbera87 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how this one works, because inside the lambda the shared ptr passed by value should have increased ref count to 2. maybe some clarifying inline comments will be very useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see it waits for another signal where one reference count will be decreased

if (m_executor) {
m_executor->Detach(std::this_thread::get_id());
}
Aws::Delete(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure i love this because a segfault for calling execute twice seems like a bad failure mode for this. could we maybe place a level indirection above the task that prevents it from being called twice ever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I has the same thought... if you think about it more, after the task has been destructed, you cannot trust anything under this->, as it is already destroyed.

The indirection here is

using DefaultExecutorTaskPair = std::pair<std::thread, DefaultExecutorTask*>;
Aws::UnorderedMap<std::thread::id, DefaultExecutorTaskPair> m_tasks;

I.e. the indirection is thread_id used as a key.
We need this structure to update the task after it has been launched, so we need to have access to the task context from the Executor, but also we need to allow the Task to self-destroy.
There is another potential issue: Executor cannot access/update any DefaultExecutorTask (this will create a race condition), it can do it if being called from the same thread of the Task.

{
// If somehow the shared_ptr of the Executor gets owned by a worker thread of that Executor - Executor shutdown must not deadlock
auto pExec = Aws::MakeShared<PooledThreadExecutor>("WorkerThreadTheOnlyOwner", 4);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought: Is it possible to simplify the test a little more for ease of maintenance and debug? Conceptually I get that you want to test the detached worker thread to solely own the ptr to executor and then shut it down. Instead of signally back and forth in between the main and detached thread, can we not just signal from the main thread to the detached thread once . The main thread creates and submit task to detached thread, resets its ownership and just wait for test case finish. The worker thread just waits till the signal , ensures sole ownership and shuts down. So main thread has one notify, one wait. Worker thread has one wait and then one notify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrapped synchronization blocks into sendSignal and waitForSignal functions, it should make the test more readable.

can we not just signal from the main thread to the detached thread once . The main thread creates and submit task to detached thread, resets its ownership and just wait for test case finish

This creates a race condition in the test, potentially not testing the intended idea.
By not waiting in the task, there a chance that async task will finish before main thread releases the executor.
The current synchronization logic always forces the tested scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would imagine that after the worker thread is started (in immediate dispatch mode) then the race condition u mentioned can be avoided but sure I see the point why its complex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, computers/implementations are different, and "immediate" is surprisingly in fact not always immediate.

DefaultExecutorTask(DefaultExecutorTask&&) = default;

void Execute() {
assert(m_task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will m_task ever be null here? its already validated in the constructor where it was moved to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good comment!
The only issue is that...... the owning executor class is not aware if it is inside shared pointer or an independent object.

@@ -31,14 +31,39 @@ namespace Aws

void WaitUntilStopped() override;
protected:
struct DefaultExecutorTask {
DefaultExecutorTask(std::function<void()>&& task, DefaultExecutor* executor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit is needed right? else DefaultExecutorTask() can cause issue in execute

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is multi-argument constructor, implicit construction will happen only in case of brace-init.

}

std::function<void()> m_task;
DefaultExecutor* m_executor = nullptr;
Copy link
Contributor

@sbera87 sbera87 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we not use reference to shared ptr and then use weak ptr to access it for more thread safety .
otherwise
m_executor->Detach is not really thread safe as of now.

WaitUntilStopped();
pImpl.reset();
}
return *this;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the executor is in invalid state after this method, need to re-create impl.

return false;
}

auto* task = Aws::New<DefaultExecutorTask>(DEFAULT_EXECUTOR_LOG_TAG, std::move(fx), spThis);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Executor is in Shutdown after this line - it will be a memory leak.

void Execute() {
assert(m_task);
m_task();
if (const auto spExecutor = m_executor.lock()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if m_executor pimpl is being destructed, the lock() will return null
and Aws::Delete(this) will result in Detach being skipped, then this task is deleted

then the destructor of pimpl may try to call because the task is still present on the task map.

taskItem.second.second->DoNotDetach();

@SergeyRyabinin SergeyRyabinin force-pushed the sr/workerOwnsExecutor branch 4 times, most recently from 6e928a7 to 2baebbb Compare February 11, 2025 21:21
@SergeyRyabinin SergeyRyabinin merged commit a35ebd9 into main Feb 12, 2025
2 of 3 checks passed
@SergeyRyabinin SergeyRyabinin deleted the sr/workerOwnsExecutor branch February 12, 2025 17:28
sbera87 pushed a commit that referenced this pull request Feb 13, 2025
…ecutor (#3288)

* Fix Executors destruction deadlock if worker thread owns the Executor

* Refactor DefaultExecutor into PIMPL

* Remove spin lock from thread executor, catch concurrent task submission and shutdown
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants