Skip to content

Commit

Permalink
topo: Fix GetTabletsByCell returns empty on partial result error
Browse files Browse the repository at this point in the history
Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar committed Apr 3, 2024
1 parent 0e2f175 commit 05a3a14
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go/vt/topo/keyspace_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestServerGetServingShards(t *testing.T) {
require.NotNil(t, stats)

if tt.fallback {
factory.SetListError(errNoListImpl)
factory.AddOperationError(memorytopo.List, ".*", errNoListImpl)
}

err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})
Expand Down
3 changes: 3 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.D
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(ListDir, dirPath); err != nil {
return nil, err
}

isRoot := false
if dirPath == "" || dirPath == "/" {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation
c.factory.mu.Lock()
defer c.factory.mu.Unlock()

if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil {
return nil, err
}

// Make sure the global path exists.
electionPath := path.Join(electionsPath, name)
if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil {
Expand Down
16 changes: 14 additions & 2 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Create, filePath); err != nil {
return nil, err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -92,6 +95,9 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver
if c.factory.err != nil {
return nil, c.factory.err
}
if err := c.factory.getOperationError(Update, filePath); err != nil {
return nil, err
}

// Get the parent dir, we'll need it in case of creation.
dir, file := path.Split(filePath)
Expand Down Expand Up @@ -168,6 +174,9 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Get, filePath); err != nil {
return nil, nil, err
}

// Get the node.
n := c.factory.nodeByPath(c.cell, filePath)
Expand Down Expand Up @@ -195,8 +204,8 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo,
if c.factory.err != nil {
return nil, c.factory.err
}
if c.factory.listErr != nil {
return nil, c.factory.listErr
if err := c.factory.getOperationError(List, filePathPrefix); err != nil {
return nil, err
}

dir, file := path.Split(filePathPrefix)
Expand Down Expand Up @@ -259,6 +268,9 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version
if c.factory.err != nil {
return c.factory.err
}
if err := c.factory.getOperationError(Delete, filePath); err != nil {
return err
}

// Get the parent dir.
dir, file := path.Split(filePath)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,27 @@ type memoryTopoLockDescriptor struct {
func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.callstats.Add([]string{"TryLock"}, 1)

c.factory.mu.Lock()
err := c.factory.getOperationError(TryLock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.Lock(ctx, dirPath, contents)
}

// Lock is part of the topo.Conn interface.
func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
c.factory.callstats.Add([]string{"Lock"}, 1)

c.factory.mu.Lock()
err := c.factory.getOperationError(Lock, dirPath)
c.factory.mu.Unlock()
if err != nil {
return nil, err
}

return c.lock(ctx, dirPath, contents)
}

Expand Down
57 changes: 49 additions & 8 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"math/rand/v2"
"regexp"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,6 +51,25 @@ const (
UnreachableServerAddr = "unreachable"
)

// Operation is one of the operations defined by topo.Conn
type Operation int

// The following is the list of topo.Conn operations
const (
ListDir = Operation(iota)
Create
Update
Get
List
Delete
Lock
TryLock
Watch
WatchRecursive
NewLeaderParticipation
Close
)

// Factory is a memory-based implementation of topo.Factory. It
// takes a file-system like approach, with directories at each level
// being an actual directory node. This is meant to be closer to
Expand All @@ -72,14 +92,20 @@ type Factory struct {
// err is used for testing purposes to force queries / watches
// to return the given error
err error
// listErr is used for testing purposed to fake errors from
// calls to List.
listErr error
// operationErrors is used for testing purposes to fake errors from
// operations and paths matching the spec
operationErrors map[Operation][]errorSpec
// callstats allows us to keep track of how many topo.Conn calls
// we make (Create, Get, Update, Delete, List, ListDir, etc).
callstats *stats.CountersWithMultiLabels
}

type errorSpec struct {
op Operation
pathPattern *regexp.Regexp
err error
}

// HasGlobalReadOnlyCell is part of the topo.Factory interface.
func (f *Factory) HasGlobalReadOnlyCell(serverAddr, root string) bool {
return false
Expand Down Expand Up @@ -248,9 +274,10 @@ func (n *node) PropagateWatchError(err error) {
// in case of a problem.
func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) {
f := &Factory{
cells: make(map[string]*node),
generation: uint64(rand.Int64N(1 << 60)),
callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}),
cells: make(map[string]*node),
generation: uint64(rand.Int64N(1 << 60)),
callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}),
operationErrors: make(map[Operation][]errorSpec),
}
f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil)

Expand Down Expand Up @@ -363,9 +390,23 @@ func (f *Factory) recursiveDelete(n *node) {
}
}

func (f *Factory) SetListError(err error) {
func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) {
f.mu.Lock()
defer f.mu.Unlock()

f.listErr = err
f.operationErrors[op] = append(f.operationErrors[op], errorSpec{
op: op,
pathPattern: regexp.MustCompile(pathPattern),
err: err,
})
}

func (f *Factory) getOperationError(op Operation, path string) error {
specs := f.operationErrors[op]
for _, spec := range specs {
if spec.pathPattern.MatchString(path) {
return spec.err
}
}
return nil
}
6 changes: 6 additions & 0 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(Watch, filePath); err != nil {
return nil, nil, err
}

n := c.factory.nodeByPath(c.cell, filePath)
if n == nil {
Expand Down Expand Up @@ -89,6 +92,9 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc
if c.factory.err != nil {
return nil, nil, c.factory.err
}
if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil {
return nil, nil, err
}

n := c.factory.getOrCreatePath(c.cell, dirpath)
if n == nil {
Expand Down
12 changes: 9 additions & 3 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type GetTabletsByCellOptions struct {

// GetTabletsByCell returns all the tablets in the cell.
// It returns ErrNoNode if the cell doesn't exist.
// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete.
// It returns (nil, nil) if the cell exists, but there are no tablets in it.
func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) {
// If the cell doesn't exist, this will return ErrNoNode.
Expand Down Expand Up @@ -277,6 +278,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G
// GetTabletsIndividuallyByCell returns a sorted list of tablets for topo servers that do not
// directly support the topoConn.List() functionality.
// It returns ErrNoNode if the cell doesn't exist.
// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete.
// It returns (nil, nil) if the cell exists, but there are no tablets in it.
func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) {
// If the cell doesn't exist, this will return ErrNoNode.
Expand All @@ -286,10 +288,14 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string,
}
sort.Sort(topoproto.TabletAliasList(aliases))

var partialResultErr error
tabletMap, err := ts.GetTabletMap(ctx, aliases, opt)
if err != nil {
// we got another error than topo.ErrNoNode
return nil, err
if IsErrType(err, PartialResult) {
partialResultErr = err
} else {
return nil, err
}
}
tablets := make([]*TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
Expand All @@ -303,7 +309,7 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string,
}
}

return tablets, nil
return tablets, partialResultErr
}

// UpdateTablet updates the tablet data only - not associated replication paths.
Expand Down
57 changes: 56 additions & 1 deletion go/vt/topo/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package topo_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -76,7 +79,7 @@ func TestServerGetTabletsByCell(t *testing.T) {
ts, factory := memorytopo.NewServerAndFactory(ctx, cell)
defer ts.Close()
if tt.listError != nil {
factory.SetListError(tt.listError)
factory.AddOperationError(memorytopo.List, ".*", tt.listError)
}

// Create an ephemeral keyspace and generate shard records within
Expand Down Expand Up @@ -116,3 +119,55 @@ func TestServerGetTabletsByCell(t *testing.T) {
})
}
}

func TestServerGetTabletsByCellPartialResults(t *testing.T) {
const cell = "zone1"
const keyspace = "keyspace"
const shard = "shard"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ts, factory := memorytopo.NewServerAndFactory(ctx, cell)
defer ts.Close()

// Create an ephemeral keyspace and generate shard records within
// the keyspace to fetch later.
require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}))
require.NoError(t, ts.CreateShard(ctx, keyspace, shard))

tablets := make([]*topo.TabletInfo, 3)

for i := 0; i < len(tablets); i++ {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Uid: uint32(i),
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": int32(i),
},
Keyspace: keyspace,
Shard: shard,
}
tInfo := &topo.TabletInfo{Tablet: tablet}
tablets[i] = tInfo
require.NoError(t, ts.CreateTablet(ctx, tablet))
}

// Force fallback to getting tablets individually.
factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported"))

// Cause the Get for the second tablet to fail.
factory.AddOperationError(memorytopo.Get, "tablets/zone1-0000000001/Tablet", errors.New("fake error"))

// Verify that we return a partial list of tablets and that each
// tablet matches what we expect.
out, err := ts.GetTabletsByCell(ctx, cell, nil)
assert.Error(t, err)
assert.True(t, topo.IsErrType(err, topo.PartialResult), "Not a partial result: %v", err)
assert.Len(t, out, 2)
assert.True(t, proto.Equal(tablets[0].Tablet, out[0].Tablet), "Got: %v, want %v", tablets[0].Tablet, out[0].Tablet)
assert.True(t, proto.Equal(tablets[2].Tablet, out[1].Tablet), "Got: %v, want %v", tablets[2].Tablet, out[1].Tablet)
}

0 comments on commit 05a3a14

Please sign in to comment.