From 38e1f8440e4b70359ee076e1ff9775090cc63014 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Fri, 15 Nov 2024 11:19:10 +0530 Subject: [PATCH] [release-21.0] Fix deadlock in messager and health streamer (#17230) (#17235) Signed-off-by: Manan Gupta Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> --- .../vttablet/tabletserver/health_streamer.go | 60 +++++++++++-------- .../tabletserver/health_streamer_test.go | 40 +++++++++++++ .../vttablet/tabletserver/messager/engine.go | 30 ++++++---- .../tabletserver/messager/engine_test.go | 33 +++++++++- go/vt/vttablet/tabletserver/schema/engine.go | 8 +++ 5 files changed, 134 insertions(+), 37 deletions(-) diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index cfc5ea5e974..f9f65d197b2 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -66,11 +66,18 @@ type healthStreamer struct { degradedThreshold time.Duration unhealthyThreshold atomic.Int64 - mu sync.Mutex - ctx context.Context - cancel context.CancelFunc - clients map[chan *querypb.StreamHealthResponse]struct{} - state *querypb.StreamHealthResponse + // cancelMu is a mutex used to protect the cancel variable + // and for ensuring we don't call setup functions in parallel. + cancelMu sync.Mutex + ctx context.Context + cancel context.CancelFunc + + // fieldsMu is used to protect access to the fields below. + // We require two separate mutexes, so that we don't have to acquire the same mutex + // in Close and reload that can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610. + fieldsMu sync.Mutex + clients map[chan *querypb.StreamHealthResponse]struct{} + state *querypb.StreamHealthResponse // isServingPrimary stores if this tablet is currently the serving primary or not. isServingPrimary bool @@ -110,8 +117,8 @@ func (hs *healthStreamer) InitDBConfig(target *querypb.Target) { } func (hs *healthStreamer) Open() { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.cancelMu.Lock() + defer hs.cancelMu.Unlock() if hs.cancel != nil { return @@ -120,8 +127,8 @@ func (hs *healthStreamer) Open() { } func (hs *healthStreamer) Close() { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.cancelMu.Lock() + defer hs.cancelMu.Unlock() if hs.cancel != nil { hs.se.UnregisterNotifier("healthStreamer") @@ -158,13 +165,16 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str } func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.cancelMu.Lock() + defer hs.cancelMu.Unlock() if hs.cancel == nil { return nil, nil } + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() + ch := make(chan *querypb.StreamHealthResponse, streamHealthBufferSize) hs.clients[ch] = struct{}{} @@ -174,15 +184,15 @@ func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, contex } func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() delete(hs.clients, ch) } func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, ptsTimestamp time.Time, lag time.Duration, err error, serving bool) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() hs.state.Target.TabletType = tabletType if tabletType == topodatapb.TabletType_PRIMARY { @@ -236,8 +246,8 @@ func (hs *healthStreamer) broadCastToClients(shr *querypb.StreamHealthResponse) } func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() if hs.state.Target.TabletType == topodatapb.TabletType_PRIMARY { return details } @@ -282,8 +292,8 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) { // MakePrimary tells the healthstreamer that the current tablet is now the primary, // so it can read and write to the MySQL instance for schema-tracking. func (hs *healthStreamer) MakePrimary(serving bool) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() hs.isServingPrimary = serving // We register for notifications from the schema Engine only when schema tracking is enabled, // and we are going to a serving primary state. @@ -298,15 +308,15 @@ func (hs *healthStreamer) MakePrimary(serving bool) { // MakeNonPrimary tells the healthstreamer that the current tablet is now not a primary. func (hs *healthStreamer) MakeNonPrimary() { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() hs.isServingPrimary = false } // reload reloads the schema from the underlying mysql for the tables that we get the alert on. func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfsChanged bool) error { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() // Schema Reload to happen only on primary when it is serving. // We can be in a state when the primary is not serving after we have run DemotePrimary. In that case, // we don't want to run any queries in MySQL, so we shouldn't reload anything in the healthStreamer. @@ -349,8 +359,8 @@ func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfs // sendUnresolvedTransactionSignal sends broadcast message about unresolved transactions. func (hs *healthStreamer) sendUnresolvedTransactionSignal() { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() // send signal only when primary is serving. if !hs.isServingPrimary { return diff --git a/go/vt/vttablet/tabletserver/health_streamer_test.go b/go/vt/vttablet/tabletserver/health_streamer_test.go index 95517880339..09867e7a51b 100644 --- a/go/vt/vttablet/tabletserver/health_streamer_test.go +++ b/go/vt/vttablet/tabletserver/health_streamer_test.go @@ -543,3 +543,43 @@ func testStream(hs *healthStreamer) (<-chan *querypb.StreamHealthResponse, conte func testBlpFunc() (int64, int32) { return 1, 2 } + +// TestDeadlockBwCloseAndReload tests the deadlock observed between Close and Reload +// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610. +func TestDeadlockBwCloseAndReload(t *testing.T) { + cfg := newConfig(nil) + env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestNotServingPrimary") + alias := &topodatapb.TabletAlias{ + Cell: "cell", + Uid: 1, + } + se := schema.NewEngineForTests() + // Create a new health streamer and set it to a serving primary state + hs := newHealthStreamer(env, alias, se) + hs.signalWhenSchemaChange = true + hs.Open() + hs.MakePrimary(true) + defer hs.Close() + + wg := sync.WaitGroup{} + wg.Add(2) + // Try running Close and reload in parallel multiple times. + // This reproduces the deadlock quite readily. + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + hs.Close() + hs.Open() + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + se.BroadcastForTesting(nil, nil, nil, true) + } + }() + + // Wait for wait group to finish. + wg.Wait() +} diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 2e526fcdc3d..9d8b09e819a 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -52,9 +52,16 @@ type VStreamer interface { // Engine is the engine for handling messages. type Engine struct { - mu sync.Mutex - isOpen bool - managers map[string]*messageManager + // mu is a mutex used to protect the isOpen variable + // and for ensuring we don't call setup functions in parallel. + mu sync.Mutex + isOpen bool + + // managersMu is a mutex used to protect the managers field. + // We require two separate mutexes, so that we don't have to acquire the same mutex + // in Close and schemaChanged which can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229. + managersMu sync.Mutex + managers map[string]*messageManager tsv TabletService se *schema.Engine @@ -76,15 +83,12 @@ func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine { // Open starts the Engine service. func (me *Engine) Open() { me.mu.Lock() + defer me.mu.Unlock() if me.isOpen { - me.mu.Unlock() return } me.isOpen = true - me.mu.Unlock() log.Info("Messager: opening") - // Unlock before invoking RegisterNotifier because it - // obtains the same lock. me.se.RegisterNotifier("messages", me.schemaChanged, true) } @@ -102,6 +106,8 @@ func (me *Engine) Close() { log.Infof("messager Engine - unregistering notifiers") me.se.UnregisterNotifier("messages") log.Infof("messager Engine - closing all managers") + me.managersMu.Lock() + defer me.managersMu.Unlock() for _, mm := range me.managers { mm.Close() } @@ -110,8 +116,8 @@ func (me *Engine) Close() { } func (me *Engine) GetGenerator(name string) (QueryGenerator, error) { - me.mu.Lock() - defer me.mu.Unlock() + me.managersMu.Lock() + defer me.managersMu.Unlock() mm := me.managers[name] if mm == nil { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found in schema", name) @@ -132,6 +138,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype if !me.isOpen { return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "messager engine is closed, probably because this is not a primary any more") } + me.managersMu.Lock() + defer me.managersMu.Unlock() mm := me.managers[name] if mm == nil { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found", name) @@ -140,8 +148,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype } func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table, _ bool) { - me.mu.Lock() - defer me.mu.Unlock() + me.managersMu.Lock() + defer me.managersMu.Unlock() for _, table := range append(dropped, altered...) { name := table.Name.String() mm := me.managers[name] diff --git a/go/vt/vttablet/tabletserver/messager/engine_test.go b/go/vt/vttablet/tabletserver/messager/engine_test.go index 30e849ac73b..124e6a9a380 100644 --- a/go/vt/vttablet/tabletserver/messager/engine_test.go +++ b/go/vt/vttablet/tabletserver/messager/engine_test.go @@ -19,6 +19,7 @@ package messager import ( "context" "reflect" + "sync" "testing" "vitess.io/vitess/go/sqltypes" @@ -156,7 +157,7 @@ func newTestEngine() *Engine { tsv := &fakeTabletServer{ Env: tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "MessagerTest"), } - se := schema.NewEngine(tsv) + se := schema.NewEngineForTests() te := NewEngine(tsv, se, newFakeVStreamer()) te.Open() return te @@ -169,3 +170,33 @@ func newEngineReceiver() (f func(qr *sqltypes.Result) error, ch chan *sqltypes.R return nil }, ch } + +// TestDeadlockBwCloseAndSchemaChange tests the deadlock observed between Close and schemaChanged +// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229. +func TestDeadlockBwCloseAndSchemaChange(t *testing.T) { + engine := newTestEngine() + defer engine.Close() + se := engine.se + + wg := sync.WaitGroup{} + wg.Add(2) + // Try running Close and schemaChanged in parallel multiple times. + // This reproduces the deadlock quite readily. + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + engine.Close() + engine.Open() + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + se.BroadcastForTesting(nil, nil, nil, true) + } + }() + + // Wait for wait group to finish. + wg.Wait() +} diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index aadba5739c8..bf137886e37 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -803,6 +803,13 @@ func (se *Engine) broadcast(created, altered, dropped []*Table, udfsChanged bool } } +// BroadcastForTesting is meant to be a testing function that triggers a broadcast call. +func (se *Engine) BroadcastForTesting(created, altered, dropped []*Table, udfsChanged bool) { + se.mu.Lock() + defer se.mu.Unlock() + se.broadcast(created, altered, dropped, udfsChanged) +} + // GetTable returns the info for a table. func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table { se.mu.Lock() @@ -889,6 +896,7 @@ func NewEngineForTests() *Engine { tables: make(map[string]*Table), historian: newHistorian(false, 0, nil), env: tabletenv.NewEnv(vtenv.NewTestEnv(), tabletenv.NewDefaultConfig(), "SchemaEngineForTests"), + notifiers: make(map[string]notifier), } return se }