diff --git a/go/cmd/vttablet/cli/cli_test.go b/go/cmd/vttablet/cli/cli_test.go index 603234b3c64..c88ebd1b8ae 100644 --- a/go/cmd/vttablet/cli/cli_test.go +++ b/go/cmd/vttablet/cli/cli_test.go @@ -32,9 +32,6 @@ import ( // When starting, the TabletManager checks if it needs to restore, in tm.handleRestore but this step will // fail if we do not provide a cnf file and if the flag --restore_from_backup is provided. func TestRunFailsToStartTabletManager(t *testing.T) { - // Skipping the test for now, the test is unveiling some race conditions in the code. - // While working on a fix, this test will be skipped: https://github.com/vitessio/vitess/pull/17165 - t.Skip() ts, factory := memorytopo.NewServerAndFactory(context.Background(), "cell") topo.RegisterFactory("test", factory) diff --git a/go/vt/topo/cell_info.go b/go/vt/topo/cell_info.go index 4a8112084cb..960d0a8e0ae 100644 --- a/go/vt/topo/cell_info.go +++ b/go/vt/topo/cell_info.go @@ -46,6 +46,9 @@ func pathForCellInfo(cell string) string { // GetCellInfoNames returns the names of the existing cells. They are // sorted by name. func (ts *Server) GetCellInfoNames(ctx context.Context) ([]string, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } entries, err := ts.globalCell.ListDir(ctx, CellsPath, false /*full*/) switch { case IsErrType(err, NoNode): @@ -59,10 +62,10 @@ func (ts *Server) GetCellInfoNames(ctx context.Context) ([]string, error) { // GetCellInfo reads a CellInfo from the global Conn. func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool) (*topodatapb.CellInfo, error) { - conn := ts.globalCell if ctx.Err() != nil { return nil, ctx.Err() } + conn := ts.globalCell if !strongRead { conn = ts.globalReadOnlyCell } @@ -83,6 +86,9 @@ func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool) // CreateCellInfo creates a new CellInfo with the provided content. func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatapb.CellInfo) error { + if ctx.Err() != nil { + return ctx.Err() + } // Pack the content. contents, err := ci.MarshalVT() if err != nil { @@ -103,6 +109,10 @@ func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatap func (ts *Server) UpdateCellInfoFields(ctx context.Context, cell string, update func(*topodatapb.CellInfo) error) error { filePath := pathForCellInfo(cell) for { + if ctx.Err() != nil { + return ctx.Err() + } + ci := &topodatapb.CellInfo{} // Read the file, unpack the contents. @@ -142,6 +152,9 @@ func (ts *Server) UpdateCellInfoFields(ctx context.Context, cell string, update // We first try to make sure no Shard record points to the cell, // but we'll continue regardless if 'force' is true. func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) error { + if ctx.Err() != nil { + return ctx.Err() + } srvKeyspaces, err := ts.GetSrvKeyspaceNames(ctx, cell) switch { case err == nil: @@ -180,6 +193,10 @@ func (ts *Server) DeleteCellInfo(ctx context.Context, cell string, force bool) e // TODO(alainjobart) once the cell map is migrated to this generic // package, we can do better than this. func (ts *Server) GetKnownCells(ctx context.Context) ([]string, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + // Note we use the global read-only cell here, as the result // is not time sensitive. entries, err := ts.globalReadOnlyCell.ListDir(ctx, CellsPath, false /*full*/) diff --git a/go/vt/topo/cells_aliases.go b/go/vt/topo/cells_aliases.go index 683fa08f6ba..4d5161ffe38 100644 --- a/go/vt/topo/cells_aliases.go +++ b/go/vt/topo/cells_aliases.go @@ -40,6 +40,9 @@ func pathForCellsAlias(alias string) string { // GetCellsAliases returns the names of the existing cells. They are // sorted by name. func (ts *Server) GetCellsAliases(ctx context.Context, strongRead bool) (ret map[string]*topodatapb.CellsAlias, err error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } conn := ts.globalCell if !strongRead { conn = ts.globalReadOnlyCell @@ -75,6 +78,9 @@ func (ts *Server) GetCellsAliases(ctx context.Context, strongRead bool) (ret map // GetCellsAlias returns the CellsAlias that matches the given name. func (ts *Server) GetCellsAlias(ctx context.Context, name string, strongRead bool) (*topodatapb.CellsAlias, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } conn := ts.globalCell if !strongRead { conn = ts.globalReadOnlyCell @@ -97,6 +103,9 @@ func (ts *Server) GetCellsAlias(ctx context.Context, name string, strongRead boo // DeleteCellsAlias deletes the specified CellsAlias func (ts *Server) DeleteCellsAlias(ctx context.Context, alias string) error { + if ctx.Err() != nil { + return ctx.Err() + } ts.clearCellAliasesCache() filePath := pathForCellsAlias(alias) @@ -123,6 +132,9 @@ func (ts *Server) CreateCellsAlias(ctx context.Context, alias string, cellsAlias } // Save it. + if ctx.Err() != nil { + return ctx.Err() + } filePath := pathForCellsAlias(alias) _, err = ts.globalCell.Create(ctx, filePath, contents) return err @@ -134,6 +146,10 @@ func (ts *Server) UpdateCellsAlias(ctx context.Context, alias string, update fun filePath := pathForCellsAlias(alias) for { + if ctx.Err() != nil { + return ctx.Err() + } + cellsAlias := &topodatapb.CellsAlias{} // Read the file, unpack the contents. diff --git a/go/vt/topo/external_vitess_cluster.go b/go/vt/topo/external_vitess_cluster.go index a7d0a69bf2c..936a30a4533 100644 --- a/go/vt/topo/external_vitess_cluster.go +++ b/go/vt/topo/external_vitess_cluster.go @@ -47,6 +47,9 @@ func GetExternalVitessClusterPath(clusterName string) string { // CreateExternalVitessCluster creates a topo record for the passed vitess cluster func (ts *Server) CreateExternalVitessCluster(ctx context.Context, clusterName string, value *topodatapb.ExternalVitessCluster) error { + if ctx.Err() != nil { + return ctx.Err() + } data, err := value.MarshalVT() if err != nil { return err @@ -66,6 +69,9 @@ func (ts *Server) CreateExternalVitessCluster(ctx context.Context, clusterName s // GetExternalVitessCluster returns a topo record for the named vitess cluster func (ts *Server) GetExternalVitessCluster(ctx context.Context, clusterName string) (*ExternalVitessClusterInfo, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } data, version, err := ts.globalCell.Get(ctx, GetExternalVitessClusterPath(clusterName)) switch { case IsErrType(err, NoNode): @@ -88,7 +94,10 @@ func (ts *Server) GetExternalVitessCluster(ctx context.Context, clusterName stri // UpdateExternalVitessCluster updates the topo record for the named vitess cluster func (ts *Server) UpdateExternalVitessCluster(ctx context.Context, vc *ExternalVitessClusterInfo) error { - //FIXME: check for cluster lock + if ctx.Err() != nil { + return ctx.Err() + } + // FIXME: check for cluster lock data, err := vc.ExternalVitessCluster.MarshalVT() if err != nil { return err @@ -109,6 +118,9 @@ func (ts *Server) UpdateExternalVitessCluster(ctx context.Context, vc *ExternalV // DeleteExternalVitessCluster deletes the topo record for the named vitess cluster func (ts *Server) DeleteExternalVitessCluster(ctx context.Context, clusterName string) error { + if ctx.Err() != nil { + return ctx.Err() + } if err := ts.globalCell.Delete(ctx, GetExternalVitessClusterPath(clusterName), nil); err != nil { return err } @@ -123,6 +135,9 @@ func (ts *Server) DeleteExternalVitessCluster(ctx context.Context, clusterName s // GetExternalVitessClusters returns the list of external vitess clusters in the topology. func (ts *Server) GetExternalVitessClusters(ctx context.Context) ([]string, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } children, err := ts.globalCell.ListDir(ctx, GetExternalVitessClusterDir(), false /*full*/) switch { case err == nil: diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 08b1c59f4cc..743d8fd6dc0 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -88,6 +88,10 @@ func ValidateKeyspaceName(name string) error { // CreateKeyspace wraps the underlying Conn.Create // and dispatches the event. func (ts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *topodatapb.Keyspace) error { + if ctx.Err() != nil { + return ctx.Err() + } + if err := ValidateKeyspaceName(keyspace); err != nil { return vterrors.Wrapf(err, "CreateKeyspace: %s", err) } @@ -112,6 +116,10 @@ func (ts *Server) CreateKeyspace(ctx context.Context, keyspace string, value *to // GetKeyspace reads the given keyspace and returns it func (ts *Server) GetKeyspace(ctx context.Context, keyspace string) (*KeyspaceInfo, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if err := ValidateKeyspaceName(keyspace); err != nil { return nil, vterrors.Wrapf(err, "GetKeyspace: %s", err) } @@ -169,6 +177,10 @@ func (ts *Server) GetThrottlerConfig(ctx context.Context, keyspace string) (*top // UpdateKeyspace updates the keyspace data. It checks the keyspace is locked. func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error { + if ctx.Err() != nil { + return ctx.Err() + } + // make sure it is locked first if err := CheckKeyspaceLocked(ctx, ki.keyspace); err != nil { return err @@ -207,6 +219,10 @@ type FindAllShardsInKeyspaceOptions struct { // If opt is non-nil, it is used to configure the method's behavior. Otherwise, // the default options are used. func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt *FindAllShardsInKeyspaceOptions) (map[string]*ShardInfo, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Apply any necessary defaults. if opt == nil { opt = &FindAllShardsInKeyspaceOptions{} @@ -372,6 +388,10 @@ func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo // DeleteKeyspace wraps the underlying Conn.Delete // and dispatches the event. func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error { + if ctx.Err() != nil { + return ctx.Err() + } + keyspacePath := path.Join(KeyspacesPath, keyspace, KeyspaceFile) if err := ts.globalCell.Delete(ctx, keyspacePath, nil); err != nil { return err @@ -393,6 +413,10 @@ func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error { // GetKeyspaces returns the list of keyspaces in the topology. func (ts *Server) GetKeyspaces(ctx context.Context) ([]string, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + children, err := ts.globalCell.ListDir(ctx, KeyspacesPath, false /*full*/) switch { case err == nil: @@ -406,6 +430,10 @@ func (ts *Server) GetKeyspaces(ctx context.Context) ([]string, error) { // GetShardNames returns the list of shards in a keyspace. func (ts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath) children, err := ts.globalCell.ListDir(ctx, shardsPath, false /*full*/) if IsErrType(err, NoNode) { diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index f46e5f06e4b..967d3f7f2e4 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -19,6 +19,7 @@ package topo import ( "context" "encoding/json" + "errors" "os" "os/user" "sync" @@ -175,6 +176,12 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockO return nil, err } + if err := ctx.Err(); err != nil { + return nil, err + } + if ts.globalCell == nil { + return nil, errors.New("no global cell connection on the topo server") + } switch l.Options.lockType { case NonBlocking: return ts.globalCell.TryLock(ctx, lt.Path(), j) diff --git a/go/vt/topo/metadata.go b/go/vt/topo/metadata.go index 4d24630a309..1c569f3fb10 100644 --- a/go/vt/topo/metadata.go +++ b/go/vt/topo/metadata.go @@ -28,6 +28,10 @@ import ( // UpsertMetadata sets the key/value in the metadata if it doesn't exist, otherwise it updates the content func (ts *Server) UpsertMetadata(ctx context.Context, key string, val string) error { + if err := ctx.Err(); err != nil { + return err + } + keyPath := path.Join(MetadataPath, key) _, _, err := ts.globalCell.Get(ctx, keyPath) @@ -52,6 +56,10 @@ func (ts *Server) UpsertMetadata(ctx context.Context, key string, val string) er // GetMetadata retrieves all metadata value that matches the given key regular expression. If empty all values are returned. func (ts *Server) GetMetadata(ctx context.Context, keyFilter string) (map[string]string, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + keys, err := ts.globalCell.ListDir(ctx, MetadataPath, false) if err != nil { return nil, err @@ -77,6 +85,10 @@ func (ts *Server) GetMetadata(ctx context.Context, keyFilter string) (map[string // DeleteMetadata deletes the key in the metadata func (ts *Server) DeleteMetadata(ctx context.Context, key string) error { + if err := ctx.Err(); err != nil { + return err + } + keyPath := path.Join(MetadataPath, key) // nil version means that it will insert if keyPath does not exist @@ -89,6 +101,10 @@ func (ts *Server) DeleteMetadata(ctx context.Context, key string) error { } func (ts *Server) getMetadata(ctx context.Context, key string) (string, error) { + if err := ctx.Err(); err != nil { + return "", err + } + keyPath := path.Join(MetadataPath, key) contents, _, err := ts.globalCell.Get(ctx, keyPath) if err != nil { diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index a0dba42006d..4a3c2e6bb27 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -262,11 +262,12 @@ func Open() *Server { // ConnForCell returns a Conn object for the given cell. // It caches Conn objects from previously requested cells. func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + // Global cell is the easy case. if cell == GlobalCell { - if ctx.Err() != nil { - return nil, ctx.Err() - } return ts.globalCell, nil } diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 974a4fdbc6a..7df6dc64b88 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -165,6 +165,10 @@ func (si *ShardInfo) SetPrimaryTermStartTime(t time.Time) { // GetShard is a high level function to read shard data. // It generates trace spans. func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardInfo, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + if err := ValidateKeyspaceName(keyspace); err != nil { return nil, err } @@ -201,6 +205,10 @@ func (ts *Server) updateShard(ctx context.Context, si *ShardInfo) error { span.Annotate("shard", si.shardName) defer span.Finish() + if err := ctx.Err(); err != nil { + return err + } + data, err := si.Shard.MarshalVT() if err != nil { return err @@ -252,6 +260,10 @@ func (ts *Server) UpdateShardFields(ctx context.Context, keyspace, shard string, // This will lock the Keyspace, as we may be looking at other shard servedTypes. // Using GetOrCreateShard is probably a better idea for most use cases. func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err error) { + if err := ctx.Err(); err != nil { + return err + } + if err := ValidateKeyspaceName(keyspace); err != nil { return err } @@ -355,6 +367,10 @@ func (ts *Server) GetOrCreateShard(ctx context.Context, keyspace, shard string) // DeleteShard wraps the underlying conn.Delete // and dispatches the event. func (ts *Server) DeleteShard(ctx context.Context, keyspace, shard string) error { + if err := ctx.Err(); err != nil { + return err + } + shardPath := shardFilePath(keyspace, shard) if err := ts.globalCell.Delete(ctx, shardPath, nil); err != nil { return err @@ -728,6 +744,10 @@ type WatchShardData struct { // It has the same contract as conn.Watch, but it also unpacks the // contents into a Shard object func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, error) { + if err := ctx.Err(); err != nil { + return nil, nil, err + } + shardPath := shardFilePath(keyspace, shard) ctx, cancel := context.WithCancel(ctx) diff --git a/go/vt/topo/vschema.go b/go/vt/topo/vschema.go index d9802da2c35..21192e1aacb 100644 --- a/go/vt/topo/vschema.go +++ b/go/vt/topo/vschema.go @@ -31,6 +31,10 @@ import ( // SaveVSchema saves a Vschema. A valid Vschema should be passed in. It does not verify its correctness. // If the VSchema is empty, just remove it. func (ts *Server) SaveVSchema(ctx context.Context, keyspace string, vschema *vschemapb.Keyspace) error { + if err := ctx.Err(); err != nil { + return err + } + nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile) data, err := vschema.MarshalVT() if err != nil { @@ -49,12 +53,19 @@ func (ts *Server) SaveVSchema(ctx context.Context, keyspace string, vschema *vsc // DeleteVSchema delete the keyspace if it exists func (ts *Server) DeleteVSchema(ctx context.Context, keyspace string) error { log.Infof("deleting vschema for keyspace %s", keyspace) + if err := ctx.Err(); err != nil { + return err + } nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile) return ts.globalCell.Delete(ctx, nodePath, nil) } // GetVSchema fetches the vschema from the topo. func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Keyspace, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile) data, _, err := ts.globalCell.Get(ctx, nodePath) if err != nil { @@ -90,6 +101,10 @@ func (ts *Server) EnsureVSchema(ctx context.Context, keyspace string) error { // SaveRoutingRules saves the routing rules into the topo. func (ts *Server) SaveRoutingRules(ctx context.Context, routingRules *vschemapb.RoutingRules) error { + if err := ctx.Err(); err != nil { + return err + } + data, err := routingRules.MarshalVT() if err != nil { return err @@ -109,6 +124,10 @@ func (ts *Server) SaveRoutingRules(ctx context.Context, routingRules *vschemapb. // GetRoutingRules fetches the routing rules from the topo. func (ts *Server) GetRoutingRules(ctx context.Context) (*vschemapb.RoutingRules, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + rr := &vschemapb.RoutingRules{} data, _, err := ts.globalCell.Get(ctx, RoutingRulesFile) if err != nil { @@ -126,6 +145,10 @@ func (ts *Server) GetRoutingRules(ctx context.Context) (*vschemapb.RoutingRules, // SaveShardRoutingRules saves the shard routing rules into the topo. func (ts *Server) SaveShardRoutingRules(ctx context.Context, shardRoutingRules *vschemapb.ShardRoutingRules) error { + if err := ctx.Err(); err != nil { + return err + } + data, err := shardRoutingRules.MarshalVT() if err != nil { return err @@ -144,6 +167,10 @@ func (ts *Server) SaveShardRoutingRules(ctx context.Context, shardRoutingRules * // GetShardRoutingRules fetches the shard routing rules from the topo. func (ts *Server) GetShardRoutingRules(ctx context.Context) (*vschemapb.ShardRoutingRules, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + srr := &vschemapb.ShardRoutingRules{} data, _, err := ts.globalCell.Get(ctx, ShardRoutingRulesFile) if err != nil { @@ -161,6 +188,10 @@ func (ts *Server) GetShardRoutingRules(ctx context.Context) (*vschemapb.ShardRou // CreateKeyspaceRoutingRules wraps the underlying Conn.Create. func (ts *Server) CreateKeyspaceRoutingRules(ctx context.Context, value *vschemapb.KeyspaceRoutingRules) error { + if err := ctx.Err(); err != nil { + return err + } + data, err := value.MarshalVT() if err != nil { return err @@ -189,6 +220,10 @@ func (ts *Server) CreateKeyspaceRoutingRules(ctx context.Context, value *vschema // we may come up with a better model and apply it to the keyspace routing rules // as well. func (ts *Server) SaveKeyspaceRoutingRules(ctx context.Context, rules *vschemapb.KeyspaceRoutingRules) error { + if err := ctx.Err(); err != nil { + return err + } + data, err := rules.MarshalVT() if err != nil { return err @@ -198,6 +233,10 @@ func (ts *Server) SaveKeyspaceRoutingRules(ctx context.Context, rules *vschemapb } func (ts *Server) GetKeyspaceRoutingRules(ctx context.Context) (*vschemapb.KeyspaceRoutingRules, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + rules := &vschemapb.KeyspaceRoutingRules{} data, _, err := ts.globalCell.Get(ctx, ts.GetKeyspaceRoutingRulesPath()) if err != nil { @@ -215,6 +254,10 @@ func (ts *Server) GetKeyspaceRoutingRules(ctx context.Context) (*vschemapb.Keysp // GetMirrorRules fetches the mirror rules from the topo. func (ts *Server) GetMirrorRules(ctx context.Context) (*vschemapb.MirrorRules, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + rr := &vschemapb.MirrorRules{} data, _, err := ts.globalCell.Get(ctx, MirrorRulesFile) if err != nil { @@ -232,6 +275,10 @@ func (ts *Server) GetMirrorRules(ctx context.Context) (*vschemapb.MirrorRules, e // SaveMirrorRules saves the mirror rules into the topo. func (ts *Server) SaveMirrorRules(ctx context.Context, mirrorRules *vschemapb.MirrorRules) error { + if err := ctx.Err(); err != nil { + return err + } + data, err := mirrorRules.MarshalVT() if err != nil { return err