From a871442dc3bde545494cdb9a3be4cb92270cd5e7 Mon Sep 17 00:00:00 2001 From: James Harrison Date: Fri, 15 Oct 2021 11:55:40 +0100 Subject: [PATCH] MB-48816: Avoid unsafe use of cookie from background tasks Previously, StatCheckpointTask and StatDCPTask immediately wrote responses when collecting stats while on a background thread. TSAN reported this as unsafe; no locks prevent potential racing with a frontend thread manipulating the cookie. Change both tasks to accumulate task values, but leave the frontend thread to actually write the responses when it resumes the ewouldblock'ed operation. TSAN Report: WARNING: ThreadSanitizer: data race (pid=24371) Read of size 8 at 0x7b54000a2df0 by thread T62: #0 Cookie::getHeader() const kv_engine/daemon/cookie.cc:201 (memcached+0x6508ac) #1 append_stats kv_engine/daemon/protocol/mcbp/stats_context.cc:95 (memcached+0x71fd6c) .... #19 void StatCollector::addStat(cb::stats::Key&&, unsigned long const&) const ../kv_engine/include/statistics/collector.h:336 (memcached+0x7e50e5) #20 EventuallyPersistentEngine::addAggregatedProducerStats(BucketStatCollector const&, ConnCounter const&) kv_engine/engines/ep/src/ep_engine.cc:4038 (memcached+0x7e50e5) #21 EventuallyPersistentEngine::doDcpStatsInner(CookieIface const*, std::function >, std::basic_string_view >, void const*)> const&, std::basic_string_view >) kv_engine/engines/ep/src/ep_engine.cc:4030 (memcached+0x81bd05) Previous write of size 8 at 0x7b54000a2df0 by thread T21 (mutexes: write M3843): #0 Cookie::setPacket(cb::mcbp::Header const&, bool) kv_engine/daemon/cookie.cc:186 (memcached+0x65080e) #1 Cookie::preserveRequest() kv_engine/daemon/cookie.h:225 (memcached+0x696aa7) #2 Connection::executeCommandPipeline() kv_engine/daemon/connection.cc:581 (memcached+0x696aa7) #3 Connection::executeCommandsCallback() kv_engine/daemon/connection.cc:793 (memcached+0x696be8) #4 Connection::rw_callback(bufferevent*, void*) kv_engine/daemon/connection.cc:942 (memcached+0x697851) #5 bufferevent_run_deferred_callbacks_unlocked /home/couchbase/jenkins/workspace/cbdeps-platform-build-old/deps/packages/build/libevent/libevent-prefix/src/libevent/bufferevent.c:208 (libevent_core-2.1.so.7+0xf71d) #6 folly::EventBase::loopBody(int, bool) folly/io/async/EventBase.cpp:397 (memcached+0xfc9b52) #7 folly::EventBase::loop() folly/io/async/EventBase.cpp:315 (memcached+0xfcb06b) #8 folly::EventBase::loopForever() folly/io/async/EventBase.cpp:538 (memcached+0xfcb06b) #9 worker_libevent kv_engine/daemon/thread.cc:115 (memcached+0x6c16af) #10 CouchbaseThread::run() platform/src/cb_pthreads.cc:51 (memcached+0xf217d5) #11 platform_thread_wrap platform/src/cb_pthreads.cc:64 (memcached+0xf217d5) Change-Id: I3fbd8d51e174a7d19c5cb608a969795e445b8e86 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/163709 Tested-by: Build Bot Reviewed-by: Dave Rigby --- engines/ep/CMakeLists.txt | 1 + engines/ep/src/background_stat_task.cc | 58 +++++++++++++++ engines/ep/src/background_stat_task.h | 79 +++++++++++++++++++++ engines/ep/src/ep_engine.cc | 97 ++++++++++++++------------ engines/ep/src/ep_engine.h | 30 ++++++++ 5 files changed, 220 insertions(+), 45 deletions(-) create mode 100644 engines/ep/src/background_stat_task.cc create mode 100644 engines/ep/src/background_stat_task.h diff --git a/engines/ep/CMakeLists.txt b/engines/ep/CMakeLists.txt index a57f2ae9de..5695167563 100644 --- a/engines/ep/CMakeLists.txt +++ b/engines/ep/CMakeLists.txt @@ -292,6 +292,7 @@ set_source_files_properties(src/crc32.c ADD_LIBRARY(ep_objs OBJECT src/access_scanner.cc + src/background_stat_task.cc src/bgfetcher.cc src/blob.cc src/bloomfilter.cc diff --git a/engines/ep/src/background_stat_task.cc b/engines/ep/src/background_stat_task.cc new file mode 100644 index 0000000000..4e776edb1e --- /dev/null +++ b/engines/ep/src/background_stat_task.cc @@ -0,0 +1,58 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2021-Present Couchbase, Inc. + * + * Use of this software is governed by the Business Source License included + * in the file licenses/BSL-Couchbase.txt. As of the Change Date specified + * in that file, in accordance with the Business Source License, use of this + * software will be governed by the Apache License, Version 2.0, included in + * the file licenses/APL2.txt. + */ + +#include "background_stat_task.h" + +#include "bucket_logger.h" +#include "ep_engine.h" +#include +#include + +BackgroundStatTask::BackgroundStatTask(EventuallyPersistentEngine* e, + const CookieIface* cookie, + TaskId taskId) + : GlobalTask(e, taskId, 0, false), e(e), cookie(cookie) { +} + +bool BackgroundStatTask::run() { + TRACE_EVENT0("ep-engine/task", "BackgroundStatTask"); + try { + status = collectStats(); + } catch (const std::exception& e) { + EP_LOG_WARN( + "BackgroundStatTask: callback threw exception: \"{}\" task " + "desc:\"{}\"", + e.what(), + getDescription()); + } + // _must_ notify success to ensure the frontend calls back into + // getStats - the second call is required to avoid leaking data + // allocated to store in the engine specific. + // The real status is stored and shall be retrieved later. + e->notifyIOComplete(cookie, cb::engine_errc::success); + return false; +} + +cb::engine_errc BackgroundStatTask::maybeWriteResponse( + const AddStatFn& addStat) const { + if (status == cb::engine_errc::success) { + for (const auto& [key, value] : stats) { + addStat(key, value, cookie); + } + } + return status; +} + +AddStatFn BackgroundStatTask::getDeferredAddStat() { + return [this](std::string_view key, std::string_view value, const void*) { + this->stats.emplace_back(key, value); + }; +} \ No newline at end of file diff --git a/engines/ep/src/background_stat_task.h b/engines/ep/src/background_stat_task.h new file mode 100644 index 0000000000..f6a35049fa --- /dev/null +++ b/engines/ep/src/background_stat_task.h @@ -0,0 +1,79 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2021-Present Couchbase, Inc. + * + * Use of this software is governed by the Business Source License included + * in the file licenses/BSL-Couchbase.txt. As of the Change Date specified + * in that file, in accordance with the Business Source License, use of this + * software will be governed by the Apache License, Version 2.0, included in + * the file licenses/APL2.txt. + */ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include + +// forward decl +enum class TaskId : int; + +/** + * Base type for tasks which gather stats on a background task. + * + * For use when generating stats is likely to be expensive, to avoid + * taking up frontend thread time. + * Users should construct and schedule the task, then store it in the + * cookie engine specific with + * EventuallyPersistentEngine::storeStatTask(...) + * Once the task has notified the frontend, the task should be retrieved with + * EventuallyPersistentEngine::retrieveStatTask(...) + */ +class BackgroundStatTask : public GlobalTask { +public: + using Callback = std::function; + BackgroundStatTask(EventuallyPersistentEngine* e, + const CookieIface* cookie, + TaskId taskId); + + bool run() override; + + /** + * If the task was successful, write all gathered stats as responses. + * + * Should only be called from a frontend thread. + * @param addStat frontend provided callback + * @return errc reflecting status of the operation + */ + cb::engine_errc maybeWriteResponse(const AddStatFn& addStat) const; + +protected: + /** + * Do potentially expensive work to collect stats in a background task. + * @return status of operation + */ + virtual cb::engine_errc collectStats() = 0; + + /** + * Get a callback used to store stats which have been computed by the + * background task. + * + * Stats cannot be immediately written as responses from the background + * task as doing so could be racy. Instead, store them for when the + * frontend thread revisits this operation. + */ + AddStatFn getDeferredAddStat(); + + EventuallyPersistentEngine* e; + const CookieIface* cookie; + + // stats which have been collected while running in a background task + std::vector> stats; + cb::engine_errc status = cb::engine_errc::success; +}; \ No newline at end of file diff --git a/engines/ep/src/ep_engine.cc b/engines/ep/src/ep_engine.cc index 8298a62a50..92f8526f30 100644 --- a/engines/ep/src/ep_engine.cc +++ b/engines/ep/src/ep_engine.cc @@ -1848,6 +1848,31 @@ void* EventuallyPersistentEngine::getEngineSpecific(const CookieIface* cookie) { return cookie->getEngineStorage(); } +void EventuallyPersistentEngine::storeStatTask( + const CookieIface* cookie, std::shared_ptr task) { + // store a ptr to a shared_ptr to the task (must ensure the task is not + // destroyed before the connection retrieves the result). + // Once the frontend thread is notified, it must call retrieveStatTask, + // which will free the heap-allocated shared_ptr. + auto wrapper = std::make_unique>(task); + storeEngineSpecific(cookie, wrapper.release()); +} + +std::shared_ptr +EventuallyPersistentEngine::retrieveStatTask(const CookieIface* cookie) { + void* data = getEngineSpecific(cookie); + if (data) { + // take back ownership of the task ptr + auto ptr = std::unique_ptr>( + reinterpret_cast*>(data)); + // clear the engine specific - it's not valid to retrieve the + // task twice (would lead to a double free) + storeEngineSpecific(cookie, nullptr); + return *ptr; + } + return nullptr; +} + bool EventuallyPersistentEngine::isDatatypeSupported( const CookieIface* cookie, protocol_binary_datatype_t datatype) { return cookie->isDatatypeSupported(datatype); @@ -3756,23 +3781,20 @@ class StatCheckpointVisitor : public VBucketVisitor { AddStatFn add_stat; }; - -class StatCheckpointTask : public GlobalTask { +class StatCheckpointTask : public BackgroundStatTask { public: StatCheckpointTask(EventuallyPersistentEngine* e, const CookieIface* c, AddStatFn a) - : GlobalTask(e, TaskId::StatCheckpointTask, 0, false), - ep(e), - cookie(c), - add_stat(std::move(a)) { + : BackgroundStatTask(e, c, TaskId::StatCheckpointTask) { } - bool run() override { + + cb::engine_errc collectStats() override { TRACE_EVENT0("ep-engine/task", "StatsCheckpointTask"); - StatCheckpointVisitor scv(ep->getKVBucket(), cookie, add_stat); - ep->getKVBucket()->visit(scv); - ep->notifyIOComplete(cookie, cb::engine_errc::success); - return false; + StatCheckpointVisitor scv( + e->getKVBucket(), cookie, getDeferredAddStat()); + e->getKVBucket()->visit(scv); + return cb::engine_errc::success; } std::string getDescription() const override { @@ -3785,11 +3807,6 @@ class StatCheckpointTask : public GlobalTask { // take /too/ long, so set limit of 100ms. return std::chrono::milliseconds(100); } - -private: - EventuallyPersistentEngine *ep; - const CookieIface* cookie; - AddStatFn add_stat; }; /// @endcond @@ -3799,15 +3816,14 @@ cb::engine_errc EventuallyPersistentEngine::doCheckpointStats( const char* stat_key, int nkey) { if (nkey == 10) { - void* es = getEngineSpecific(cookie); - if (es == nullptr) { - ExTask task = std::make_shared( - this, cookie, add_stat); + auto task = retrieveStatTask(cookie); + if (!task) { + task = std::make_shared(this, cookie, add_stat); ExecutorPool::get()->schedule(task); - storeEngineSpecific(cookie, this); + storeStatTask(cookie, task); return cb::engine_errc::would_block; } else { - storeEngineSpecific(cookie, nullptr); + return task->maybeWriteResponse(add_stat); } } else if (nkey > 11) { std::string vbid(&stat_key[11], nkey - 11); @@ -4034,7 +4050,7 @@ static void showConnAggStat(const std::string& connType, } } -class StatDCPTask : public GlobalTask { +class StatDCPTask : public BackgroundStatTask { public: using Callback = std::function; @@ -4042,13 +4058,11 @@ class StatDCPTask : public GlobalTask { const CookieIface* cookie, std::string_view description, Callback callback) - : GlobalTask(e, TaskId::StatDCPTask, 0, false), - e(e), - cookie(cookie), + : BackgroundStatTask(e, cookie, TaskId::StatDCPTask), description(description), callback(std::move(callback)) { } - bool run() override { + cb::engine_errc collectStats() override { TRACE_EVENT0("ep-engine/task", "StatDCPTask"); cb::engine_errc result = cb::engine_errc::failed; try { @@ -4060,8 +4074,7 @@ class StatDCPTask : public GlobalTask { e.what(), getDescription()); } - e->notifyIOComplete(cookie, result); - return false; + return result; } std::string getDescription() const override { @@ -4075,8 +4088,6 @@ class StatDCPTask : public GlobalTask { } private: - EventuallyPersistentEngine* e; - const CookieIface* cookie; const std::string description; Callback callback; }; @@ -4085,9 +4096,9 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats( const CookieIface* cookie, const AddStatFn& add_stat, std::string_view sep) { - void* engineSpecific = getEngineSpecific(cookie); - if (engineSpecific == nullptr) { - ExTask task = std::make_shared( + auto task = retrieveStatTask(cookie); + if (!task) { + task = std::make_shared( this, cookie, "Aggregated DCP stats", @@ -4100,13 +4111,11 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats( return cb::engine_errc::success; }); ExecutorPool::get()->schedule(task); - storeEngineSpecific(cookie, this); + storeStatTask(cookie, task); return cb::engine_errc::would_block; } else { - storeEngineSpecific(cookie, nullptr); + return task->maybeWriteResponse(add_stat); } - - return cb::engine_errc::success; } cb::engine_errc EventuallyPersistentEngine::doConnAggStatsInner( @@ -4137,9 +4146,9 @@ cb::engine_errc EventuallyPersistentEngine::doDcpStats( const CookieIface* cookie, const AddStatFn& add_stat, std::string_view value) { - void* engineSpecific = getEngineSpecific(cookie); - if (engineSpecific == nullptr) { - ExTask task = std::make_shared( + auto task = retrieveStatTask(cookie); + if (!task) { + task = std::make_shared( this, cookie, "Summarised bucket-wide DCP stats", @@ -4150,13 +4159,11 @@ cb::engine_errc EventuallyPersistentEngine::doDcpStats( return cb::engine_errc::success; }); ExecutorPool::get()->schedule(task); - storeEngineSpecific(cookie, this); + storeStatTask(cookie, task); return cb::engine_errc::would_block; } else { - storeEngineSpecific(cookie, nullptr); + return task->maybeWriteResponse(add_stat); } - - return cb::engine_errc::success; } void EventuallyPersistentEngine::doDcpStatsInner(const CookieIface* cookie, diff --git a/engines/ep/src/ep_engine.h b/engines/ep/src/ep_engine.h index 2c8c8b5ca2..aef3d78c08 100644 --- a/engines/ep/src/ep_engine.h +++ b/engines/ep/src/ep_engine.h @@ -11,6 +11,7 @@ #pragma once +#include "background_stat_task.h" #include "configuration.h" #include "ep_engine_public.h" #include "error_handler.h" @@ -538,6 +539,35 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface { void* getEngineSpecific(const CookieIface* cookie); + /** + * Store a task pointer as the engine specific data. + * + * Allocates a wrapper holding a shared_ptr; storing the raw task ptr + * would be insufficient - the task needs to be owned to ensure it has not + * been destroyed when the frontend thread needs to retrieve it's result. + * + * Caller is responsible for scheduling the task, and later calling + * retrieveStatTask to destroy the wrapper and retrieve the task. + * + * @param cookie cookie from frontend + * @param task shared_ptr to background task + */ + void storeStatTask(const CookieIface* cookie, + std::shared_ptr task); + + /** + * Get and clear the engine specific data, then attempt to + * extract a BackgroundStatTask. + * + * Frees the wrapper allocated by storeStatTask, if one was found in the + * engine specific data. + * + * @param cookie cookie from frontend + * @return (possibly null) shared_ptr to background task + */ + std::shared_ptr retrieveStatTask( + const CookieIface* cookie); + bool isDatatypeSupported(const CookieIface* cookie, protocol_binary_datatype_t datatype);