Skip to content

Commit

Permalink
Limit rare race condition in topo.Server (vitessio#17165)
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
Signed-off-by: Renan Rangel <[email protected]>
  • Loading branch information
frouioui authored and rvrangel committed Nov 21, 2024
1 parent a3255b7 commit 3dc1d40
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 8 deletions.
3 changes: 0 additions & 3 deletions go/cmd/vttablet/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ import (
// When starting, the TabletManager checks if it needs to restore, in tm.handleRestore but this step will
// fail if we do not provide a cnf file and if the flag --restore_from_backup is provided.
func TestRunFailsToStartTabletManager(t *testing.T) {
// Skipping the test for now, the test is unveiling some race conditions in the code.
// While working on a fix, this test will be skipped: https://github.com/vitessio/vitess/pull/17165
t.Skip()
ts, factory := memorytopo.NewServerAndFactory(context.Background(), "cell")
topo.RegisterFactory("test", factory)

Expand Down
19 changes: 18 additions & 1 deletion go/vt/topo/cell_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func pathForCellInfo(cell string) string {
// GetCellInfoNames returns the names of the existing cells. They are
// sorted by name.
func (ts *Server) GetCellInfoNames(ctx context.Context) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
entries, err := ts.globalCell.ListDir(ctx, CellsPath, false /*full*/)
switch {
case IsErrType(err, NoNode):
Expand All @@ -59,10 +62,10 @@ func (ts *Server) GetCellInfoNames(ctx context.Context) ([]string, error) {

// GetCellInfo reads a CellInfo from the global Conn.
func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool) (*topodatapb.CellInfo, error) {
conn := ts.globalCell
if ctx.Err() != nil {
return nil, ctx.Err()
}
conn := ts.globalCell
if !strongRead {
conn = ts.globalReadOnlyCell
}
Expand All @@ -83,6 +86,9 @@ func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool)

// CreateCellInfo creates a new CellInfo with the provided content.
func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatapb.CellInfo) error {
if ctx.Err() != nil {
return ctx.Err()
}
// Pack the content.
contents, err := ci.MarshalVT()
if err != nil {
Expand All @@ -103,6 +109,10 @@ func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatap
func (ts *Server) UpdateCellInfoFields(ctx context.Context, cell string, update func(*topodatapb.CellInfo) error) error {
filePath := pathForCellInfo(cell)
for {
if ctx.Err() != nil {
return ctx.Err()
}

ci := &topodatapb.CellInfo{}

// Read the file, unpack the contents.
Expand Down Expand Up @@ -142,6 +152,9 @@ func (ts *Server) UpdateCellInfoFields(ctx context.Context, cell string, update
// We first try to make sure no Shard record points to the cell,
// but we'll continue regardless if 'force' is true.
func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) error {
if ctx.Err() != nil {
return ctx.Err()
}
srvKeyspaces, err := ts.GetSrvKeyspaceNames(ctx, cell)
switch {
case err == nil:
Expand Down Expand Up @@ -180,6 +193,10 @@ func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) e
// TODO(alainjobart) once the cell map is migrated to this generic
// package, we can do better than this.
func (ts *Server) GetKnownCells(ctx context.Context) ([]string, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

// Note we use the global read-only cell here, as the result
// is not time sensitive.
entries, err := ts.globalReadOnlyCell.ListDir(ctx, CellsPath, false /*full*/)
Expand Down
16 changes: 16 additions & 0 deletions go/vt/topo/cells_aliases.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func pathForCellsAlias(alias string) string {
// GetCellsAliases returns the names of the existing cells. They are
// sorted by name.
func (ts *Server) GetCellsAliases(ctx context.Context, strongRead bool) (ret map[string]*topodatapb.CellsAlias, err error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
conn := ts.globalCell
if !strongRead {
conn = ts.globalReadOnlyCell
Expand Down Expand Up @@ -75,6 +78,9 @@ func (ts *Server) GetCellsAliases(ctx context.Context, strongRead bool) (ret map

// GetCellsAlias returns the CellsAlias that matches the given name.
func (ts *Server) GetCellsAlias(ctx context.Context, name string, strongRead bool) (*topodatapb.CellsAlias, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
conn := ts.globalCell
if !strongRead {
conn = ts.globalReadOnlyCell
Expand All @@ -97,6 +103,9 @@ func (ts *Server) GetCellsAlias(ctx context.Context, name string, strongRead boo

// DeleteCellsAlias deletes the specified CellsAlias
func (ts *Server) DeleteCellsAlias(ctx context.Context, alias string) error {
if ctx.Err() != nil {
return ctx.Err()
}
ts.clearCellAliasesCache()

filePath := pathForCellsAlias(alias)
Expand All @@ -123,6 +132,9 @@ func (ts *Server) CreateCellsAlias(ctx context.Context, alias string, cellsAlias
}

// Save it.
if ctx.Err() != nil {
return ctx.Err()
}
filePath := pathForCellsAlias(alias)
_, err = ts.globalCell.Create(ctx, filePath, contents)
return err
Expand All @@ -134,6 +146,10 @@ func (ts *Server) UpdateCellsAlias(ctx context.Context, alias string, update fun

filePath := pathForCellsAlias(alias)
for {
if ctx.Err() != nil {
return ctx.Err()
}

cellsAlias := &topodatapb.CellsAlias{}

// Read the file, unpack the contents.
Expand Down
17 changes: 16 additions & 1 deletion go/vt/topo/external_vitess_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func GetExternalVitessClusterPath(clusterName string) string {

// CreateExternalVitessCluster creates a topo record for the passed vitess cluster
func (ts *Server) CreateExternalVitessCluster(ctx context.Context, clusterName string, value *topodatapb.ExternalVitessCluster) error {
if ctx.Err() != nil {
return ctx.Err()
}
data, err := value.MarshalVT()
if err != nil {
return err
Expand All @@ -66,6 +69,9 @@ func (ts *Server) CreateExternalVitessCluster(ctx context.Context, clusterName s

// GetExternalVitessCluster returns a topo record for the named vitess cluster
func (ts *Server) GetExternalVitessCluster(ctx context.Context, clusterName string) (*ExternalVitessClusterInfo, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
data, version, err := ts.globalCell.Get(ctx, GetExternalVitessClusterPath(clusterName))
switch {
case IsErrType(err, NoNode):
Expand All @@ -88,7 +94,10 @@ func (ts *Server) GetExternalVitessCluster(ctx context.Context, clusterName stri

// UpdateExternalVitessCluster updates the topo record for the named vitess cluster
func (ts *Server) UpdateExternalVitessCluster(ctx context.Context, vc *ExternalVitessClusterInfo) error {
//FIXME: check for cluster lock
if ctx.Err() != nil {
return ctx.Err()
}
// FIXME: check for cluster lock
data, err := vc.ExternalVitessCluster.MarshalVT()
if err != nil {
return err
Expand All @@ -109,6 +118,9 @@ func (ts *Server) UpdateExternalVitessCluster(ctx context.Context, vc *ExternalV

// DeleteExternalVitessCluster deletes the topo record for the named vitess cluster
func (ts *Server) DeleteExternalVitessCluster(ctx context.Context, clusterName string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if err := ts.globalCell.Delete(ctx, GetExternalVitessClusterPath(clusterName), nil); err != nil {
return err
}
Expand All @@ -123,6 +135,9 @@ func (ts *Server) DeleteExternalVitessCluster(ctx context.Context, clusterName s

// GetExternalVitessClusters returns the list of external vitess clusters in the topology.
func (ts *Server) GetExternalVitessClusters(ctx context.Context) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
children, err := ts.globalCell.ListDir(ctx, GetExternalVitessClusterDir(), false /*full*/)
switch {
case err == nil:
Expand Down
28 changes: 28 additions & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func ValidateKeyspaceName(name string) error {
// CreateKeyspace wraps the underlying Conn.Create
// and dispatches the event.
func (ts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *topodatapb.Keyspace) error {
if ctx.Err() != nil {
return ctx.Err()
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return vterrors.Wrapf(err, "CreateKeyspace: %s", err)
}
Expand All @@ -112,6 +116,10 @@ func (ts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *to

// GetKeyspace reads the given keyspace and returns it
func (ts *Server) GetKeyspace(ctx context.Context, keyspace string) (*KeyspaceInfo, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return nil, vterrors.Wrapf(err, "GetKeyspace: %s", err)
}
Expand Down Expand Up @@ -169,6 +177,10 @@ func (ts *Server) GetThrottlerConfig(ctx context.Context, keyspace string) (*top

// UpdateKeyspace updates the keyspace data. It checks the keyspace is locked.
func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
if ctx.Err() != nil {
return ctx.Err()
}

// make sure it is locked first
if err := CheckKeyspaceLocked(ctx, ki.keyspace); err != nil {
return err
Expand Down Expand Up @@ -207,6 +219,10 @@ type FindAllShardsInKeyspaceOptions struct {
// If opt is non-nil, it is used to configure the method's behavior. Otherwise,
// the default options are used.
func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt *FindAllShardsInKeyspaceOptions) (map[string]*ShardInfo, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

// Apply any necessary defaults.
if opt == nil {
opt = &FindAllShardsInKeyspaceOptions{}
Expand Down Expand Up @@ -372,6 +388,10 @@ func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo
// DeleteKeyspace wraps the underlying Conn.Delete
// and dispatches the event.
func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error {
if ctx.Err() != nil {
return ctx.Err()
}

keyspacePath := path.Join(KeyspacesPath, keyspace, KeyspaceFile)
if err := ts.globalCell.Delete(ctx, keyspacePath, nil); err != nil {
return err
Expand All @@ -393,6 +413,10 @@ func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error {

// GetKeyspaces returns the list of keyspaces in the topology.
func (ts *Server) GetKeyspaces(ctx context.Context) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

children, err := ts.globalCell.ListDir(ctx, KeyspacesPath, false /*full*/)
switch {
case err == nil:
Expand All @@ -406,6 +430,10 @@ func (ts *Server) GetKeyspaces(ctx context.Context) ([]string, error) {

// GetShardNames returns the list of shards in a keyspace.
func (ts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath)
children, err := ts.globalCell.ListDir(ctx, shardsPath, false /*full*/)
if IsErrType(err, NoNode) {
Expand Down
7 changes: 7 additions & 0 deletions go/vt/topo/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package topo
import (
"context"
"encoding/json"
"errors"
"os"
"os/user"
"sync"
Expand Down Expand Up @@ -175,6 +176,12 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockO
return nil, err
}

if err := ctx.Err(); err != nil {
return nil, err
}
if ts.globalCell == nil {
return nil, errors.New("no global cell connection on the topo server")
}
switch l.Options.lockType {
case NonBlocking:
return ts.globalCell.TryLock(ctx, lt.Path(), j)
Expand Down
16 changes: 16 additions & 0 deletions go/vt/topo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (

// UpsertMetadata sets the key/value in the metadata if it doesn't exist, otherwise it updates the content
func (ts *Server) UpsertMetadata(ctx context.Context, key string, val string) error {
if err := ctx.Err(); err != nil {
return err
}

keyPath := path.Join(MetadataPath, key)

_, _, err := ts.globalCell.Get(ctx, keyPath)
Expand All @@ -52,6 +56,10 @@ func (ts *Server) UpsertMetadata(ctx context.Context, key string, val string) er

// GetMetadata retrieves all metadata value that matches the given key regular expression. If empty all values are returned.
func (ts *Server) GetMetadata(ctx context.Context, keyFilter string) (map[string]string, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

keys, err := ts.globalCell.ListDir(ctx, MetadataPath, false)
if err != nil {
return nil, err
Expand All @@ -77,6 +85,10 @@ func (ts *Server) GetMetadata(ctx context.Context, keyFilter string) (map[string

// DeleteMetadata deletes the key in the metadata
func (ts *Server) DeleteMetadata(ctx context.Context, key string) error {
if err := ctx.Err(); err != nil {
return err
}

keyPath := path.Join(MetadataPath, key)

// nil version means that it will insert if keyPath does not exist
Expand All @@ -89,6 +101,10 @@ func (ts *Server) DeleteMetadata(ctx context.Context, key string) error {
}

func (ts *Server) getMetadata(ctx context.Context, key string) (string, error) {
if err := ctx.Err(); err != nil {
return "", err
}

keyPath := path.Join(MetadataPath, key)
contents, _, err := ts.globalCell.Get(ctx, keyPath)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go/vt/topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,12 @@ func Open() *Server {
// ConnForCell returns a Conn object for the given cell.
// It caches Conn objects from previously requested cells.
func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

// Global cell is the easy case.
if cell == GlobalCell {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return ts.globalCell, nil
}

Expand Down
20 changes: 20 additions & 0 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (si *ShardInfo) SetPrimaryTermStartTime(t time.Time) {
// GetShard is a high level function to read shard data.
// It generates trace spans.
func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardInfo, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,6 +205,10 @@ func (ts *Server) updateShard(ctx context.Context, si *ShardInfo) error {
span.Annotate("shard", si.shardName)
defer span.Finish()

if err := ctx.Err(); err != nil {
return err
}

data, err := si.Shard.MarshalVT()
if err != nil {
return err
Expand Down Expand Up @@ -252,6 +260,10 @@ func (ts *Server) UpdateShardFields(ctx context.Context, keyspace, shard string,
// This will lock the Keyspace, as we may be looking at other shard servedTypes.
// Using GetOrCreateShard is probably a better idea for most use cases.
func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err error) {
if err := ctx.Err(); err != nil {
return err
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return err
}
Expand Down Expand Up @@ -355,6 +367,10 @@ func (ts *Server) GetOrCreateShard(ctx context.Context, keyspace, shard string)
// DeleteShard wraps the underlying conn.Delete
// and dispatches the event.
func (ts *Server) DeleteShard(ctx context.Context, keyspace, shard string) error {
if err := ctx.Err(); err != nil {
return err
}

shardPath := shardFilePath(keyspace, shard)
if err := ts.globalCell.Delete(ctx, shardPath, nil); err != nil {
return err
Expand Down Expand Up @@ -728,6 +744,10 @@ type WatchShardData struct {
// It has the same contract as conn.Watch, but it also unpacks the
// contents into a Shard object
func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, error) {
if err := ctx.Err(); err != nil {
return nil, nil, err
}

shardPath := shardFilePath(keyspace, shard)
ctx, cancel := context.WithCancel(ctx)

Expand Down
Loading

0 comments on commit 3dc1d40

Please sign in to comment.