Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit rare race condition in topo.Server #17165

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions go/cmd/vttablet/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ import (
// When starting, the TabletManager checks if it needs to restore, in tm.handleRestore but this step will
// fail if we do not provide a cnf file and if the flag --restore_from_backup is provided.
func TestRunFailsToStartTabletManager(t *testing.T) {
// Skipping the test for now, the test is unveiling some race conditions in the code.
// While working on a fix, this test will be skipped: https://github.com/vitessio/vitess/pull/17165
t.Skip()
ts, factory := memorytopo.NewServerAndFactory(context.Background(), "cell")
topo.RegisterFactory("test", factory)

Expand Down
19 changes: 18 additions & 1 deletion go/vt/topo/cell_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func pathForCellInfo(cell string) string {
// GetCellInfoNames returns the names of the existing cells. They are
// sorted by name.
func (ts *Server) GetCellInfoNames(ctx context.Context) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
entries, err := ts.globalCell.ListDir(ctx, CellsPath, false /*full*/)
switch {
case IsErrType(err, NoNode):
Expand All @@ -59,10 +62,10 @@ func (ts *Server) GetCellInfoNames(ctx context.Context) ([]string, error) {

// GetCellInfo reads a CellInfo from the global Conn.
func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool) (*topodatapb.CellInfo, error) {
conn := ts.globalCell
if ctx.Err() != nil {
return nil, ctx.Err()
}
conn := ts.globalCell
if !strongRead {
conn = ts.globalReadOnlyCell
}
Expand All @@ -83,6 +86,9 @@ func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool)

// CreateCellInfo creates a new CellInfo with the provided content.
func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatapb.CellInfo) error {
if ctx.Err() != nil {
return ctx.Err()
}
// Pack the content.
contents, err := ci.MarshalVT()
if err != nil {
Expand All @@ -103,6 +109,10 @@ func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatap
func (ts *Server) UpdateCellInfoFields(ctx context.Context, cell string, update func(*topodatapb.CellInfo) error) error {
filePath := pathForCellInfo(cell)
for {
if ctx.Err() != nil {
return ctx.Err()
}

ci := &topodatapb.CellInfo{}

// Read the file, unpack the contents.
Expand Down Expand Up @@ -142,6 +152,9 @@ func (ts *Server) UpdateCellInfoFields(ctx context.Context, cell string, update
// We first try to make sure no Shard record points to the cell,
// but we'll continue regardless if 'force' is true.
func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) error {
if ctx.Err() != nil {
return ctx.Err()
}
srvKeyspaces, err := ts.GetSrvKeyspaceNames(ctx, cell)
switch {
case err == nil:
Expand Down Expand Up @@ -180,6 +193,10 @@ func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) e
// TODO(alainjobart) once the cell map is migrated to this generic
// package, we can do better than this.
func (ts *Server) GetKnownCells(ctx context.Context) ([]string, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

// Note we use the global read-only cell here, as the result
// is not time sensitive.
entries, err := ts.globalReadOnlyCell.ListDir(ctx, CellsPath, false /*full*/)
Expand Down
16 changes: 16 additions & 0 deletions go/vt/topo/cells_aliases.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func pathForCellsAlias(alias string) string {
// GetCellsAliases returns the names of the existing cells. They are
// sorted by name.
func (ts *Server) GetCellsAliases(ctx context.Context, strongRead bool) (ret map[string]*topodatapb.CellsAlias, err error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
conn := ts.globalCell
if !strongRead {
conn = ts.globalReadOnlyCell
Expand Down Expand Up @@ -75,6 +78,9 @@ func (ts *Server) GetCellsAliases(ctx context.Context, strongRead bool) (ret map

// GetCellsAlias returns the CellsAlias that matches the given name.
func (ts *Server) GetCellsAlias(ctx context.Context, name string, strongRead bool) (*topodatapb.CellsAlias, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
conn := ts.globalCell
if !strongRead {
conn = ts.globalReadOnlyCell
Expand All @@ -97,6 +103,9 @@ func (ts *Server) GetCellsAlias(ctx context.Context, name string, strongRead boo

// DeleteCellsAlias deletes the specified CellsAlias
func (ts *Server) DeleteCellsAlias(ctx context.Context, alias string) error {
if ctx.Err() != nil {
return ctx.Err()
}
ts.clearCellAliasesCache()

filePath := pathForCellsAlias(alias)
Expand All @@ -123,6 +132,9 @@ func (ts *Server) CreateCellsAlias(ctx context.Context, alias string, cellsAlias
}

// Save it.
if ctx.Err() != nil {
return ctx.Err()
}
filePath := pathForCellsAlias(alias)
_, err = ts.globalCell.Create(ctx, filePath, contents)
return err
Expand All @@ -134,6 +146,10 @@ func (ts *Server) UpdateCellsAlias(ctx context.Context, alias string, update fun

filePath := pathForCellsAlias(alias)
for {
if ctx.Err() != nil {
return ctx.Err()
}

cellsAlias := &topodatapb.CellsAlias{}

// Read the file, unpack the contents.
Expand Down
17 changes: 16 additions & 1 deletion go/vt/topo/external_vitess_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func GetExternalVitessClusterPath(clusterName string) string {

// CreateExternalVitessCluster creates a topo record for the passed vitess cluster
func (ts *Server) CreateExternalVitessCluster(ctx context.Context, clusterName string, value *topodatapb.ExternalVitessCluster) error {
if ctx.Err() != nil {
return ctx.Err()
}
data, err := value.MarshalVT()
if err != nil {
return err
Expand All @@ -66,6 +69,9 @@ func (ts *Server) CreateExternalVitessCluster(ctx context.Context, clusterName s

// GetExternalVitessCluster returns a topo record for the named vitess cluster
func (ts *Server) GetExternalVitessCluster(ctx context.Context, clusterName string) (*ExternalVitessClusterInfo, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
data, version, err := ts.globalCell.Get(ctx, GetExternalVitessClusterPath(clusterName))
switch {
case IsErrType(err, NoNode):
Expand All @@ -88,7 +94,10 @@ func (ts *Server) GetExternalVitessCluster(ctx context.Context, clusterName stri

// UpdateExternalVitessCluster updates the topo record for the named vitess cluster
func (ts *Server) UpdateExternalVitessCluster(ctx context.Context, vc *ExternalVitessClusterInfo) error {
//FIXME: check for cluster lock
if ctx.Err() != nil {
return ctx.Err()
}
// FIXME: check for cluster lock
data, err := vc.ExternalVitessCluster.MarshalVT()
if err != nil {
return err
Expand All @@ -109,6 +118,9 @@ func (ts *Server) UpdateExternalVitessCluster(ctx context.Context, vc *ExternalV

// DeleteExternalVitessCluster deletes the topo record for the named vitess cluster
func (ts *Server) DeleteExternalVitessCluster(ctx context.Context, clusterName string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if err := ts.globalCell.Delete(ctx, GetExternalVitessClusterPath(clusterName), nil); err != nil {
return err
}
Expand All @@ -123,6 +135,9 @@ func (ts *Server) DeleteExternalVitessCluster(ctx context.Context, clusterName s

// GetExternalVitessClusters returns the list of external vitess clusters in the topology.
func (ts *Server) GetExternalVitessClusters(ctx context.Context) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
children, err := ts.globalCell.ListDir(ctx, GetExternalVitessClusterDir(), false /*full*/)
switch {
case err == nil:
Expand Down
28 changes: 28 additions & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func ValidateKeyspaceName(name string) error {
// CreateKeyspace wraps the underlying Conn.Create
// and dispatches the event.
func (ts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *topodatapb.Keyspace) error {
if ctx.Err() != nil {
return ctx.Err()
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return vterrors.Wrapf(err, "CreateKeyspace: %s", err)
}
Expand All @@ -112,6 +116,10 @@ func (ts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *to

// GetKeyspace reads the given keyspace and returns it
func (ts *Server) GetKeyspace(ctx context.Context, keyspace string) (*KeyspaceInfo, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return nil, vterrors.Wrapf(err, "GetKeyspace: %s", err)
}
Expand Down Expand Up @@ -169,6 +177,10 @@ func (ts *Server) GetThrottlerConfig(ctx context.Context, keyspace string) (*top

// UpdateKeyspace updates the keyspace data. It checks the keyspace is locked.
func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
if ctx.Err() != nil {
return ctx.Err()
}

// make sure it is locked first
if err := CheckKeyspaceLocked(ctx, ki.keyspace); err != nil {
return err
Expand Down Expand Up @@ -207,6 +219,10 @@ type FindAllShardsInKeyspaceOptions struct {
// If opt is non-nil, it is used to configure the method's behavior. Otherwise,
// the default options are used.
func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt *FindAllShardsInKeyspaceOptions) (map[string]*ShardInfo, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

// Apply any necessary defaults.
if opt == nil {
opt = &FindAllShardsInKeyspaceOptions{}
Expand Down Expand Up @@ -372,6 +388,10 @@ func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo
// DeleteKeyspace wraps the underlying Conn.Delete
// and dispatches the event.
func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error {
if ctx.Err() != nil {
return ctx.Err()
}

keyspacePath := path.Join(KeyspacesPath, keyspace, KeyspaceFile)
if err := ts.globalCell.Delete(ctx, keyspacePath, nil); err != nil {
return err
Expand All @@ -393,6 +413,10 @@ func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error {

// GetKeyspaces returns the list of keyspaces in the topology.
func (ts *Server) GetKeyspaces(ctx context.Context) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

children, err := ts.globalCell.ListDir(ctx, KeyspacesPath, false /*full*/)
switch {
case err == nil:
Expand All @@ -406,6 +430,10 @@ func (ts *Server) GetKeyspaces(ctx context.Context) ([]string, error) {

// GetShardNames returns the list of shards in a keyspace.
func (ts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath)
children, err := ts.globalCell.ListDir(ctx, shardsPath, false /*full*/)
if IsErrType(err, NoNode) {
Expand Down
7 changes: 7 additions & 0 deletions go/vt/topo/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package topo
import (
"context"
"encoding/json"
"errors"
"os"
"os/user"
"sync"
Expand Down Expand Up @@ -175,6 +176,12 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockO
return nil, err
}

if err := ctx.Err(); err != nil {
return nil, err
}
if ts.globalCell == nil {
return nil, errors.New("no global cell connection on the topo server")
}
switch l.Options.lockType {
case NonBlocking:
return ts.globalCell.TryLock(ctx, lt.Path(), j)
Expand Down
16 changes: 16 additions & 0 deletions go/vt/topo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (

// UpsertMetadata sets the key/value in the metadata if it doesn't exist, otherwise it updates the content
func (ts *Server) UpsertMetadata(ctx context.Context, key string, val string) error {
if err := ctx.Err(); err != nil {
return err
}

keyPath := path.Join(MetadataPath, key)

_, _, err := ts.globalCell.Get(ctx, keyPath)
Expand All @@ -52,6 +56,10 @@ func (ts *Server) UpsertMetadata(ctx context.Context, key string, val string) er

// GetMetadata retrieves all metadata value that matches the given key regular expression. If empty all values are returned.
func (ts *Server) GetMetadata(ctx context.Context, keyFilter string) (map[string]string, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

keys, err := ts.globalCell.ListDir(ctx, MetadataPath, false)
if err != nil {
return nil, err
Expand All @@ -77,6 +85,10 @@ func (ts *Server) GetMetadata(ctx context.Context, keyFilter string) (map[string

// DeleteMetadata deletes the key in the metadata
func (ts *Server) DeleteMetadata(ctx context.Context, key string) error {
if err := ctx.Err(); err != nil {
return err
}

keyPath := path.Join(MetadataPath, key)

// nil version means that it will insert if keyPath does not exist
Expand All @@ -89,6 +101,10 @@ func (ts *Server) DeleteMetadata(ctx context.Context, key string) error {
}

func (ts *Server) getMetadata(ctx context.Context, key string) (string, error) {
if err := ctx.Err(); err != nil {
return "", err
}

keyPath := path.Join(MetadataPath, key)
contents, _, err := ts.globalCell.Get(ctx, keyPath)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go/vt/topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,12 @@ func Open() *Server {
// ConnForCell returns a Conn object for the given cell.
// It caches Conn objects from previously requested cells.
func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

// Global cell is the easy case.
if cell == GlobalCell {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return ts.globalCell, nil
}

Expand Down
20 changes: 20 additions & 0 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (si *ShardInfo) SetPrimaryTermStartTime(t time.Time) {
// GetShard is a high level function to read shard data.
// It generates trace spans.
func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardInfo, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,6 +205,10 @@ func (ts *Server) updateShard(ctx context.Context, si *ShardInfo) error {
span.Annotate("shard", si.shardName)
defer span.Finish()

if err := ctx.Err(); err != nil {
return err
}

data, err := si.Shard.MarshalVT()
if err != nil {
return err
Expand Down Expand Up @@ -252,6 +260,10 @@ func (ts *Server) UpdateShardFields(ctx context.Context, keyspace, shard string,
// This will lock the Keyspace, as we may be looking at other shard servedTypes.
// Using GetOrCreateShard is probably a better idea for most use cases.
func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err error) {
if err := ctx.Err(); err != nil {
return err
}

if err := ValidateKeyspaceName(keyspace); err != nil {
return err
}
Expand Down Expand Up @@ -355,6 +367,10 @@ func (ts *Server) GetOrCreateShard(ctx context.Context, keyspace, shard string)
// DeleteShard wraps the underlying conn.Delete
// and dispatches the event.
func (ts *Server) DeleteShard(ctx context.Context, keyspace, shard string) error {
if err := ctx.Err(); err != nil {
return err
}

shardPath := shardFilePath(keyspace, shard)
if err := ts.globalCell.Delete(ctx, shardPath, nil); err != nil {
return err
Expand Down Expand Up @@ -728,6 +744,10 @@ type WatchShardData struct {
// It has the same contract as conn.Watch, but it also unpacks the
// contents into a Shard object
func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, error) {
if err := ctx.Err(); err != nil {
return nil, nil, err
}

shardPath := shardFilePath(keyspace, shard)
ctx, cancel := context.WithCancel(ctx)

Expand Down
Loading
Loading