From 05a3a1490a2a4ce38a602492472b762b7b89c940 Mon Sep 17 00:00:00 2001 From: Brendan Dougherty Date: Wed, 3 Apr 2024 18:45:37 +0000 Subject: [PATCH] topo: Fix GetTabletsByCell returns empty on partial result error Signed-off-by: Brendan Dougherty --- go/vt/topo/keyspace_external_test.go | 2 +- go/vt/topo/memorytopo/directory.go | 3 ++ go/vt/topo/memorytopo/election.go | 4 ++ go/vt/topo/memorytopo/file.go | 16 +++++++- go/vt/topo/memorytopo/lock.go | 14 +++++++ go/vt/topo/memorytopo/memorytopo.go | 57 ++++++++++++++++++++++++---- go/vt/topo/memorytopo/watch.go | 6 +++ go/vt/topo/tablet.go | 12 ++++-- go/vt/topo/tablet_test.go | 57 +++++++++++++++++++++++++++- 9 files changed, 156 insertions(+), 15 deletions(-) diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index 38ff1c8ce7b..4edb45a411d 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -142,7 +142,7 @@ func TestServerGetServingShards(t *testing.T) { require.NotNil(t, stats) if tt.fallback { - factory.SetListError(errNoListImpl) + factory.AddOperationError(memorytopo.List, ".*", errNoListImpl) } err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) diff --git a/go/vt/topo/memorytopo/directory.go b/go/vt/topo/memorytopo/directory.go index 8e673f474a6..b8fa11a9d52 100644 --- a/go/vt/topo/memorytopo/directory.go +++ b/go/vt/topo/memorytopo/directory.go @@ -39,6 +39,9 @@ func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.D if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(ListDir, dirPath); err != nil { + return nil, err + } isRoot := false if dirPath == "" || dirPath == "/" { diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 0a76c202de2..1b6d2292f5c 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -35,6 +35,10 @@ func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation c.factory.mu.Lock() defer c.factory.mu.Unlock() + if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil { + return nil, err + } + // Make sure the global path exists. electionPath := path.Join(electionsPath, name) if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil { diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 800e7791afa..86722477e53 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -46,6 +46,9 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(Create, filePath); err != nil { + return nil, err + } // Get the parent dir. dir, file := path.Split(filePath) @@ -92,6 +95,9 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(Update, filePath); err != nil { + return nil, err + } // Get the parent dir, we'll need it in case of creation. dir, file := path.Split(filePath) @@ -168,6 +174,9 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(Get, filePath); err != nil { + return nil, nil, err + } // Get the node. n := c.factory.nodeByPath(c.cell, filePath) @@ -195,8 +204,8 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, if c.factory.err != nil { return nil, c.factory.err } - if c.factory.listErr != nil { - return nil, c.factory.listErr + if err := c.factory.getOperationError(List, filePathPrefix); err != nil { + return nil, err } dir, file := path.Split(filePathPrefix) @@ -259,6 +268,9 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version if c.factory.err != nil { return c.factory.err } + if err := c.factory.getOperationError(Delete, filePath); err != nil { + return err + } // Get the parent dir. dir, file := path.Split(filePath) diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index afce7868469..d0943c7058d 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -44,6 +44,13 @@ type memoryTopoLockDescriptor struct { func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { c.factory.callstats.Add([]string{"TryLock"}, 1) + c.factory.mu.Lock() + err := c.factory.getOperationError(TryLock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + return c.Lock(ctx, dirPath, contents) } @@ -51,6 +58,13 @@ func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.Lock func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { c.factory.callstats.Add([]string{"Lock"}, 1) + c.factory.mu.Lock() + err := c.factory.getOperationError(Lock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + return c.lock(ctx, dirPath, contents) } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 2b5385d8e28..9d703a2869a 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -23,6 +23,7 @@ import ( "context" "errors" "math/rand/v2" + "regexp" "strings" "sync" "sync/atomic" @@ -50,6 +51,25 @@ const ( UnreachableServerAddr = "unreachable" ) +// Operation is one of the operations defined by topo.Conn +type Operation int + +// The following is the list of topo.Conn operations +const ( + ListDir = Operation(iota) + Create + Update + Get + List + Delete + Lock + TryLock + Watch + WatchRecursive + NewLeaderParticipation + Close +) + // Factory is a memory-based implementation of topo.Factory. It // takes a file-system like approach, with directories at each level // being an actual directory node. This is meant to be closer to @@ -72,14 +92,20 @@ type Factory struct { // err is used for testing purposes to force queries / watches // to return the given error err error - // listErr is used for testing purposed to fake errors from - // calls to List. - listErr error + // operationErrors is used for testing purposes to fake errors from + // operations and paths matching the spec + operationErrors map[Operation][]errorSpec // callstats allows us to keep track of how many topo.Conn calls // we make (Create, Get, Update, Delete, List, ListDir, etc). callstats *stats.CountersWithMultiLabels } +type errorSpec struct { + op Operation + pathPattern *regexp.Regexp + err error +} + // HasGlobalReadOnlyCell is part of the topo.Factory interface. func (f *Factory) HasGlobalReadOnlyCell(serverAddr, root string) bool { return false @@ -248,9 +274,10 @@ func (n *node) PropagateWatchError(err error) { // in case of a problem. func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) { f := &Factory{ - cells: make(map[string]*node), - generation: uint64(rand.Int64N(1 << 60)), - callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}), + cells: make(map[string]*node), + generation: uint64(rand.Int64N(1 << 60)), + callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}), + operationErrors: make(map[Operation][]errorSpec), } f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil) @@ -363,9 +390,23 @@ func (f *Factory) recursiveDelete(n *node) { } } -func (f *Factory) SetListError(err error) { +func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) { f.mu.Lock() defer f.mu.Unlock() - f.listErr = err + f.operationErrors[op] = append(f.operationErrors[op], errorSpec{ + op: op, + pathPattern: regexp.MustCompile(pathPattern), + err: err, + }) +} + +func (f *Factory) getOperationError(op Operation, path string) error { + specs := f.operationErrors[op] + for _, spec := range specs { + if spec.pathPattern.MatchString(path) { + return spec.err + } + } + return nil } diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 3651bcca9ce..dcb90a8f0ef 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -37,6 +37,9 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(Watch, filePath); err != nil { + return nil, nil, err + } n := c.factory.nodeByPath(c.cell, filePath) if n == nil { @@ -89,6 +92,9 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil { + return nil, nil, err + } n := c.factory.getOrCreatePath(c.cell, dirpath) if n == nil { diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 493e448752b..671a0f43905 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -240,6 +240,7 @@ type GetTabletsByCellOptions struct { // GetTabletsByCell returns all the tablets in the cell. // It returns ErrNoNode if the cell doesn't exist. +// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete. // It returns (nil, nil) if the cell exists, but there are no tablets in it. func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. @@ -277,6 +278,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G // GetTabletsIndividuallyByCell returns a sorted list of tablets for topo servers that do not // directly support the topoConn.List() functionality. // It returns ErrNoNode if the cell doesn't exist. +// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete. // It returns (nil, nil) if the cell exists, but there are no tablets in it. func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. @@ -286,10 +288,14 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, } sort.Sort(topoproto.TabletAliasList(aliases)) + var partialResultErr error tabletMap, err := ts.GetTabletMap(ctx, aliases, opt) if err != nil { - // we got another error than topo.ErrNoNode - return nil, err + if IsErrType(err, PartialResult) { + partialResultErr = err + } else { + return nil, err + } } tablets := make([]*TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { @@ -303,7 +309,7 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, } } - return tablets, nil + return tablets, partialResultErr } // UpdateTablet updates the tablet data only - not associated replication paths. diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index 04eea71a8a2..3a0153a11b5 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -18,9 +18,12 @@ package topo_test import ( "context" + "errors" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" @@ -76,7 +79,7 @@ func TestServerGetTabletsByCell(t *testing.T) { ts, factory := memorytopo.NewServerAndFactory(ctx, cell) defer ts.Close() if tt.listError != nil { - factory.SetListError(tt.listError) + factory.AddOperationError(memorytopo.List, ".*", tt.listError) } // Create an ephemeral keyspace and generate shard records within @@ -116,3 +119,55 @@ func TestServerGetTabletsByCell(t *testing.T) { }) } } + +func TestServerGetTabletsByCellPartialResults(t *testing.T) { + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts, factory := memorytopo.NewServerAndFactory(ctx, cell) + defer ts.Close() + + // Create an ephemeral keyspace and generate shard records within + // the keyspace to fetch later. + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablets := make([]*topo.TabletInfo, 3) + + for i := 0; i < len(tablets); i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(i), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(i), + }, + Keyspace: keyspace, + Shard: shard, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + } + + // Force fallback to getting tablets individually. + factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported")) + + // Cause the Get for the second tablet to fail. + factory.AddOperationError(memorytopo.Get, "tablets/zone1-0000000001/Tablet", errors.New("fake error")) + + // Verify that we return a partial list of tablets and that each + // tablet matches what we expect. + out, err := ts.GetTabletsByCell(ctx, cell, nil) + assert.Error(t, err) + assert.True(t, topo.IsErrType(err, topo.PartialResult), "Not a partial result: %v", err) + assert.Len(t, out, 2) + assert.True(t, proto.Equal(tablets[0].Tablet, out[0].Tablet), "Got: %v, want %v", tablets[0].Tablet, out[0].Tablet) + assert.True(t, proto.Equal(tablets[2].Tablet, out[1].Tablet), "Got: %v, want %v", tablets[2].Tablet, out[1].Tablet) +}