Skip to content

Commit

Permalink
Fix Concurrent deadlock (#621)
Browse files Browse the repository at this point in the history
* Fix 'Concurrent' deadlock

* cleanup
  • Loading branch information
onelxj authored May 13, 2024
1 parent ea75f5a commit 1f7acf2
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 16 deletions.
69 changes: 55 additions & 14 deletions eventuals/concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ struct _Concurrent final {
// that have completed but haven't yet been pruned (see
// 'CreateOrReuseFiber()').
struct TypeErasedFiber {
void Reuse() {
done = false;
// Need to reinitialize the interrupt so that the
// previous eventual that registered with this
// interrupt won't get invoked as a handler!
interrupt.~Interrupt();
new (&interrupt) class Interrupt();
}
virtual bool Reuse() = 0;

virtual ~TypeErasedFiber() = default;

Expand Down Expand Up @@ -178,18 +171,18 @@ struct _Concurrent final {
if (!fibers_) {
fibers_.reset(CreateFiber());
fiber = fibers_.get();
} else if (fibers_->done) {
// Need to release next before we reset so it
// doesn't get deallocated as part of reset.
fibers_.reset(fibers_->next.release());
} else {
fiber = fibers_.get();
CHECK_NOTNULL(fiber);
for (;;) {
if (fiber->done) {
fiber->Reuse();
if (fiber->Reuse()) {
break;
} else if (!fiber->next) {
// TODO(benh): we will create an "infinite"
// number of fibers if none are ever done, we
// should consider adding some max number of
// concurrency and then never create more than
// that.
fiber->next.reset(CreateFiber());
fiber = fiber->next.get();
break;
Expand Down Expand Up @@ -290,6 +283,12 @@ struct _Concurrent final {
Eventual<void>()
.context(std::move(stopped_or_error))
.start([this, fiber](auto& /* stopped_or_error */, auto& k) {
CHECK_EQ(
&fiber->context.value(),
Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();

fiber->done = true;

fibers_done_ = FibersDone();
Expand All @@ -305,6 +304,12 @@ struct _Concurrent final {
auto& stopped_or_error,
auto& k,
auto&& error) {
CHECK_EQ(
&fiber->context.value(),
Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();

fiber->done = true;

if (!stopped_or_error->has_value()) {
Expand All @@ -322,6 +327,12 @@ struct _Concurrent final {
k.Start(); // Exits the synchronized block!
})
.stop([this, fiber](auto& stopped_or_error, auto& k) {
CHECK_EQ(
&fiber->context.value(),
Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();

fiber->done = true;

if (!stopped_or_error->has_value()) {
Expand Down Expand Up @@ -474,6 +485,34 @@ struct _Concurrent final {
// start for each upstream value.
template <typename E_>
struct Fiber : TypeErasedFiber {
bool Reuse() override {
if (!done) {
return false;
}

CHECK(context.has_value());

if (context->in_use() || context->blocked()) {
return false;
}

done = false;

// Need to reinitialize the interrupt so that the
// previous eventual that registered with this
// interrupt won't get invoked as a handler!
interrupt.~Interrupt();
new (&interrupt) class Interrupt();

// We should reset 'k' before 'context', because 'k' may contain
// a borrowed reference to 'context', which may lead to a deadlock.
k.reset();

context.reset();

return true;
}

using K = decltype(Build(std::declval<E_>()));
std::optional<K> k;
};
Expand Down Expand Up @@ -523,6 +562,8 @@ struct _Concurrent final {
fiber->context->scheduler()->Submit(
[fiber]() {
CHECK_EQ(&fiber->context.value(), Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();
static_cast<Fiber<E>*>(fiber)->k->Register(fiber->interrupt);
static_cast<Fiber<E>*>(fiber)->k->Start();
},
Expand Down
3 changes: 3 additions & 0 deletions eventuals/event-loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ void EventLoop::Check() {

context->unblock();

context->use();

stout::borrowed_ref<Context> previous =
Context::Switch(std::move(waiter->context).reference());

Expand All @@ -350,6 +352,7 @@ void EventLoop::Check() {
////////////////////////////////////////////////////

CHECK_EQ(context, Context::Switch(std::move(previous)).get());
context->unuse();
}
} while (waiter != nullptr);
}
Expand Down
6 changes: 6 additions & 0 deletions eventuals/event-loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -1216,9 +1216,11 @@ struct _EventLoopSchedule final {
if (loop()->InEventLoop()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Start(std::forward<Args>(args)...);
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
if constexpr (!std::is_void_v<Arg_>) {
arg_.emplace(std::forward<Args>(args)...);
Expand Down Expand Up @@ -1246,9 +1248,11 @@ struct _EventLoopSchedule final {
if (loop()->InEventLoop()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Fail(std::forward<Error>(error));
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
// TODO(benh): avoid allocating on heap by storing args in
// pre-allocated buffer based on composing with Errors.
Expand Down Expand Up @@ -1280,9 +1284,11 @@ struct _EventLoopSchedule final {
if (loop()->InEventLoop()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Stop();
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
loop()->Submit(
this->Borrow([this]() {
Expand Down
5 changes: 5 additions & 0 deletions eventuals/grpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class ServerReader {
};
}

EVENTUALS_GRPC_LOG(1)
<< "Reading requests for call (" << context_ << ")"
<< " for host = " << context_->host()
<< " and path = " << context_->method();

context_->stream()->Read(&data.buffer, &callback);
});
}
Expand Down
10 changes: 10 additions & 0 deletions eventuals/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,19 @@ class DefaultScheduler final : public Scheduler {
EVENTUALS_LOG(1)
<< "'" << context.name() << "' preempted '" << previous->name() << "'";

context.use();

callback();

CHECK_EQ(&context, Context::Switch(std::move(previous)).get());

context.unuse();
EVENTUALS_LOG(1)
<< "'"
<< Context::Get()->name()
<< "' switched back with '"
<< context.name()
<< "'";
}

void Clone(Context& context) override {
Expand Down
19 changes: 19 additions & 0 deletions eventuals/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ class Scheduler {
return blocked_;
}

bool in_use() const {
return in_use_.load() != 0;
}

const std::string& name() const {
return name_;
}
Expand All @@ -114,8 +118,10 @@ class Scheduler {
void Continue(F&& f) {
if (scheduler()->Continuable(*this)) {
auto previous = Switch(Borrow());
use();
f();
Switch(std::move(previous));
unuse();
} else {
scheduler()->Submit(std::forward<F>(f), *this);
}
Expand All @@ -125,13 +131,24 @@ class Scheduler {
void Continue(F&& f, G&& g) {
if (scheduler()->Continuable(*this)) {
auto previous = Switch(Borrow());
use();
f();
Switch(std::move(previous));
unuse();
} else {
scheduler()->Submit(g(), *this);
}
}

void use() {
in_use_.fetch_add(1);
}

void unuse() {
CHECK(in_use_.load() > 0);
in_use_.fetch_sub(1);
}

// For schedulers that need to store arbitrary data.
void* data = nullptr;

Expand All @@ -144,6 +161,8 @@ class Scheduler {

Scheduler* scheduler_ = nullptr;

std::atomic<int> in_use_ = 0;

// There is the most common set of variables to create contexts.
bool blocked_ = false;

Expand Down
4 changes: 4 additions & 0 deletions eventuals/static-thread-pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ StaticThreadPool::StaticThreadPool()

context->unblock();

context->use();

stout::borrowed_ref<Context> previous =
Context::Switch(std::move(waiter->context).reference());

Expand All @@ -92,6 +94,8 @@ StaticThreadPool::StaticThreadPool()
////////////////////////////////////////////////////

CHECK_EQ(context, Context::Switch(std::move(previous)).get());

context->unuse();
}
} while (!shutdown_.load());
});
Expand Down
13 changes: 13 additions & 0 deletions eventuals/static-thread-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Start(std::forward<Args>(args)...);
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
if constexpr (!std::is_void_v<Arg_>) {
arg_.emplace(std::forward<Args>(args)...);
Expand Down Expand Up @@ -290,9 +292,12 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Fail(std::forward<Error>(error));
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
;
} else {
// TODO(benh): avoid allocating on heap by storing args in
// pre-allocated buffer based on composing with Errors.
Expand Down Expand Up @@ -341,9 +346,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Stop();
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
EVENTUALS_LOG(1)
<< "Schedule submitting '" << context_->name() << "'";
Expand Down Expand Up @@ -378,9 +385,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Begin(*CHECK_NOTNULL(stream_));
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
EVENTUALS_LOG(1)
<< "Schedule submitting '" << context_->name() << "'";
Expand Down Expand Up @@ -417,9 +426,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Body(std::forward<Args>(args)...);
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
if constexpr (!std::is_void_v<Arg_>) {
arg_.emplace(std::forward<Args>(args)...);
Expand Down Expand Up @@ -464,9 +475,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Ended();
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
EVENTUALS_LOG(1)
<< "Schedule submitting '" << context_->name() << "'";
Expand Down
14 changes: 13 additions & 1 deletion test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ cc_library(
],
)

cc_library(
name = "event-loop-test",
hdrs = [
"event-loop-test.h",
],
visibility = ["//visibility:public"],
deps = [
"//eventuals",
"@com_github_google_googletest//:gtest",
],
)

cc_library(
name = "http-mock-server",
testonly = True,
Expand All @@ -52,7 +64,6 @@ cc_test(
"control-loop.cc",
"dns-resolver.cc",
"do-all.cc",
"event-loop-test.h",
"eventual.cc",
"executor.cc",
"expected.cc",
Expand Down Expand Up @@ -97,6 +108,7 @@ cc_test(
":generate-test-task-name",
":http-mock-server",
":promisify-for-test",
":event-loop-test",
"//eventuals",
"//test/concurrent",
"@com_github_google_googletest//:gtest_main",
Expand Down
Loading

0 comments on commit 1f7acf2

Please sign in to comment.