Skip to content

Commit

Permalink
Fix: Separate Lock for Keyspace to Update Controller Mapping in Schem…
Browse files Browse the repository at this point in the history
…a Tracking (#17873)

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Mar 3, 2025
1 parent 79dc8e3 commit 1582d5b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
26 changes: 19 additions & 7 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
signal func() // a function that we'll call whenever we have new schema data

// map of keyspace currently tracked
trackedMu sync.Mutex
tracked map[keyspaceStr]*updateController
consumeDelay time.Duration

Expand Down Expand Up @@ -96,7 +97,7 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T
return err
}

t.tracked[target.Keyspace].setLoaded(true)
t.setLoaded(target.Keyspace, true)
return nil
}

Expand Down Expand Up @@ -209,8 +210,8 @@ func (t *Tracker) Start() {
// getKeyspaceUpdateController returns the updateController for the given keyspace
// the updateController will be created if there was none.
func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updateController {
t.mu.Lock()
defer t.mu.Unlock()
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

ksUpdater, exists := t.tracked[th.Target.Keyspace]
if !exists {
Expand All @@ -224,6 +225,16 @@ func (t *Tracker) newUpdateController() *updateController {
return &updateController{update: t.updateSchema, reloadKeyspace: t.initKeyspace, signal: t.signal, consumeDelay: t.consumeDelay}
}

// setLoaded sets the loaded status for the given keyspace.
func (t *Tracker) setLoaded(ks keyspaceStr, loaded bool) {
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

if ksUpdater, exists := t.tracked[ks]; exists {
ksUpdater.setLoaded(loaded)
}
}

func (t *Tracker) initKeyspace(th *discovery.TabletHealth) error {
err := t.LoadKeyspace(th.Conn, th.Target)
if err != nil {
Expand Down Expand Up @@ -343,7 +354,7 @@ func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool {
return nil
})
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
t.setLoaded(th.Target.Keyspace, false)
// TODO: optimize for the tables that got errored out.
log.Warningf("error fetching new schema for %v, making them non-authoritative: %v", tablesUpdated, err)
return false
Expand Down Expand Up @@ -451,7 +462,7 @@ func (t *Tracker) updatedViewSchema(th *discovery.TabletHealth) bool {
return nil
})
if err != nil {
t.tracked[th.Target.Keyspace].setLoaded(false)
t.setLoaded(th.Target.Keyspace, false)
// TODO: optimize for the views that got errored out.
log.Warningf("error fetching new views definition for %v", viewsUpdated, err)
return false
Expand All @@ -467,8 +478,9 @@ func (t *Tracker) updateViews(keyspace string, res map[string]string) {

// RegisterSignalReceiver allows a function to register to be called when new schema is available
func (t *Tracker) RegisterSignalReceiver(f func()) {
t.mu.Lock()
defer t.mu.Unlock()
t.trackedMu.Lock()
defer t.trackedMu.Unlock()

for _, controller := range t.tracked {
controller.signal = f
}
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,38 @@ func TestTrackerGetKeyspaceUpdateController(t *testing.T) {
assert.Nil(t, ks3.reloadKeyspace, "ks3 already initialized")
}

// TestTrackerNoLock tests that processing of health check is not blocked while tracking is making GetSchema rpc calls.
func TestTrackerNoLock(t *testing.T) {
ch := make(chan *discovery.TabletHealth)
tracker := NewTracker(ch, true, false, sqlparser.NewTestParser())
tracker.consumeDelay = 1 * time.Millisecond
tracker.Start()
defer tracker.Stop()

target := &querypb.Target{Cell: cell, Keyspace: keyspace, Shard: "-80", TabletType: topodatapb.TabletType_PRIMARY}
tablet := &topodatapb.Tablet{Keyspace: target.Keyspace, Shard: target.Shard, Type: target.TabletType}

sbc := sandboxconn.NewSandboxConn(tablet)
sbc.GetSchemaDelayResponse = 100 * time.Millisecond

th := &discovery.TabletHealth{
Conn: sbc,
Tablet: tablet,
Target: target,
Serving: true,
Stats: &querypb.RealtimeStats{TableSchemaChanged: []string{"t1"}},
}

for i := 0; i < 500000; i++ {
select {
case ch <- th:
case <-time.After(5 * time.Millisecond):
t.Fatalf("failed to send health check to tracker")
}
}
require.GreaterOrEqual(t, sbc.GetSchemaCount.Load(), int64(1), "GetSchema rpc should be called")
}

type myTable struct {
name, create string
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type SandboxConn struct {
ReserveCount atomic.Int64
ReleaseCount atomic.Int64
GetSchemaCount atomic.Int64
GetSchemaDelayResponse time.Duration

queriesRequireLocking bool
queriesMu sync.Mutex
Expand Down Expand Up @@ -740,6 +741,9 @@ func (sbc *SandboxConn) Release(ctx context.Context, target *querypb.Target, tra
// GetSchema implements the QueryService interface
func (sbc *SandboxConn) GetSchema(ctx context.Context, target *querypb.Target, tableType querypb.SchemaTableType, tableNames []string, callback func(schemaRes *querypb.GetSchemaResponse) error) error {
sbc.GetSchemaCount.Add(1)
if sbc.GetSchemaDelayResponse > 0 {
time.Sleep(sbc.GetSchemaDelayResponse)
}
if len(sbc.getSchemaResult) == 0 {
return nil
}
Expand Down

0 comments on commit 1582d5b

Please sign in to comment.