Skip to content

Commit

Permalink
Address deadlock situation while canceling the consumer's processer task
Browse files Browse the repository at this point in the history
- As part of the cancelling the processer task in ExecutorPool's
stopTaskGroup, the consumer's destructor will be invoked as the reference
has been released.
- The consumer's destructor will try to cancel the processer task, which
tries to acquire ExecutorPool::tMutex, but the same lock was already
acquired by this very thread as part of stopTaskGroup.

  #0  __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135
  #1  0x00007fea5d5f5657 in _L_lock_909 () from /lib/x86_64-linux-gnu/libpthread.so.0
  #2  0x00007fea5d5f5480 in __GI___pthread_mutex_lock (mutex=0x7d5000018120) at ../nptl/pthread_mutex_lock.c:79
  #3  0x00007fea5e0c1cd5 in pthread_mutex_lock ()
  #4  0x00007fea5da10ea9 in cb_mutex_enter (mutex=0x7d5000018120) at /home/abhinav/couchbase/platform/src/cb_pthreads.c:115
  #5  0x00007fea588bce2a in Mutex::acquire (this=0x7d5000018118) at /home/abhinav/couchbase/ep-engine/src/mutex.cc:31
  #6  0x00007fea58892277 in lock (this=<optimized out>) at /home/abhinav/couchbase/ep-engine/src/locks.h:70
  #7  LockHolder (m=..., tryLock=false, this=<optimized out>) at /home/abhinav/couchbase/ep-engine/src/locks.h:47
  #8  ExecutorPool::_cancel (this=0x7d5000018000, taskId=41, eraseTask=<optimized out>) at /home/abhinav/couchbase/ep-engine/src/executorpool.cc:295
  #9  0x00007fea58892a38 in ExecutorPool::cancel (this=0x7d5000018120, taskId=128, eraseTask=false) at /home/abhinav/couchbase/ep-engine/src/executorpool.cc:328
  #10 0x00007fea587fe9c7 in cancelTask (this=0x7d5000016800) at /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:114
  #11 DcpConsumer::~DcpConsumer (this=0x7d5000016800) at /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:104
  #12 0x00007fea587ff126 in DcpConsumer::~DcpConsumer (this=0x7d5000016800) at /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:103
  #13 0x00007fea58804e14 in ~SingleThreadedRCPtr (this=<optimized out>) at /home/abhinav/couchbase/ep-engine/src/atomic.h:271
  #14 ~Processer (this=0x7d140000e600) at /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:35
  #15 Processer::~Processer (this=0x7d140000e600) at /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:35
  #16 0x00007fea58897596 in ~SingleThreadedRCPtr (this=0x7d140000e600) at /home/abhinav/couchbase/ep-engine/src/atomic.h:271
  #17 ExecutorPool::_stopTaskGroup (this=<optimized out>, taskGID=<optimized out>, taskType=<optimized out>, force=<optimized out>) at /home/abhinav/couchbase/ep-engine/src/executorpool.cc:576
  #18 0x00007fea58897d7f in ExecutorPool::stopTaskGroup (this=0x7d5000018120, taskGID=128, taskType=WRITER_TASK_IDX, force=false) at /home/abhinav/couchbase/ep-engine/src/executorpool.cc:585
  #19 0x00007fea588283a7 in EventuallyPersistentStore::~EventuallyPersistentStore (this=0x7d540000fc80) at /home/abhinav/couchbase/ep-engine/src/ep.cc:468
  #20 0x00007fea58878e4b in EventuallyPersistentEngine::~EventuallyPersistentEngine (this=0x7d780000f000) at /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:6336
  #21 0x00007fea58858aad in EvpDestroy (handle=0x7d780000f000, force=<optimized out>) at /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:142
  #22 0x00007fea5e1070e8 in mock_destroy (handle=<optimized out>, force=false) at /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:98
  #23 0x00007fea5e1065ef in destroy_bucket (handle=0x7d480000b880, handle_v1=0x7d480000b880, force=false) at /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:995
  #24 execute_test (engine=<optimized out>, default_cfg=<optimized out>, test=...) at /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:1112
  #25 main (argc=<optimized out>, argv=<optimized out>) at /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:1439

Change-Id: Iaebb3d0ed8d0cfbcef13dafd689e7c08f94069d8
Reviewed-on: http://review.couchbase.org/55884
Reviewed-by: Chiyoung Seo <[email protected]>
Tested-by: buildbot <[email protected]>
  • Loading branch information
abhinavdangeti committed Oct 8, 2015
1 parent 653d919 commit d24f99c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
18 changes: 15 additions & 3 deletions src/dcp/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Processer : public GlobalTask {

std::string getDescription();

~Processer();

private:
connection_t conn;
};
Expand Down Expand Up @@ -75,11 +77,16 @@ std::string Processer::getDescription() {
return ss.str();
}

Processer::~Processer() {
DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
consumer->taskCancelled();
}

DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
const std::string &name)
: Consumer(engine, cookie, name), opaqueCounter(0), processerTaskId(0),
itemsToProcess(false), lastNoopTime(ep_current_time()), backoffs(0),
taskCancelled(false), flowControl(engine, this)
taskAlreadyCancelled(false), flowControl(engine, this)
{
Configuration& config = engine.getConfiguration();
streams = new passive_stream_t[config.getMaxVbuckets()];
Expand Down Expand Up @@ -109,12 +116,17 @@ DcpConsumer::~DcpConsumer() {


void DcpConsumer::cancelTask() {
if (!taskCancelled) {
bool inverse = false;
if (taskAlreadyCancelled.compare_exchange_strong(inverse, true)) {
ExecutorPool::get()->cancel(processerTaskId);
taskCancelled = true;
}
}

void DcpConsumer::taskCancelled() {
bool inverse = false;
taskAlreadyCancelled.compare_exchange_strong(inverse, true);
}

ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
uint32_t flags) {
LockHolder lh(readyMutex);
Expand Down
4 changes: 3 additions & 1 deletion src/dcp/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ typedef std::map<uint32_t, std::pair<uint32_t, uint16_t> > opaque_map;

void cancelTask();

void taskCancelled();

private:

DcpResponse* getNextItem();
Expand Down Expand Up @@ -153,7 +155,7 @@ typedef std::map<uint32_t, std::pair<uint32_t, uint16_t> > opaque_map;
bool pendingEnableExtMetaData;
bool pendingEnableValueCompression;
bool pendingSupportCursorDropping;
bool taskCancelled;
AtomicValue<bool> taskAlreadyCancelled;

FlowControl flowControl;

Expand Down

0 comments on commit d24f99c

Please sign in to comment.