From b6ce0a0341abda9eaf6e8e14609039779375d976 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 16 Nov 2023 13:37:21 +0100 Subject: [PATCH] Support cluster bootstrapping in vtctldclient (#14315) Signed-off-by: Matt Lord --- examples/common/scripts/consul-up.sh | 8 +-- examples/common/scripts/etcd-up.sh | 9 ++- examples/common/scripts/zk-up.sh | 8 +-- go/cmd/vtctldclient/command/root.go | 65 ++++++++++++++++++ go/cmd/vtctldclient/command/root_test.go | 67 +++++++++++++++++++ go/flags/endtoend/vtctldclient.txt | 7 ++ go/vt/vtctl/grpcvtctldserver/server.go | 4 +- .../vreplication/vcopier_atomic.go | 1 - 8 files changed, 153 insertions(+), 16 deletions(-) diff --git a/examples/common/scripts/consul-up.sh b/examples/common/scripts/consul-up.sh index 584a25f437a..fb75495b278 100755 --- a/examples/common/scripts/consul-up.sh +++ b/examples/common/scripts/consul-up.sh @@ -40,13 +40,13 @@ sleep 5 # Add the CellInfo description for the cell. # If the node already exists, it's fine, means we used existing data. -echo "add $cell CellInfo" +echo "add ${cell} CellInfo" set +e # shellcheck disable=SC2086 -vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \ - --root "vitess/$cell" \ +command vtctldclient --server internal --topo-implementation consul --topo-global-server "${CONSUL_SERVER}:${consul_http_port}" AddCellInfo \ + --root "/vitess/${cell}" \ --server-address "${CONSUL_SERVER}:${consul_http_port}" \ - "$cell" + "${cell}" set -e echo "consul start done..." diff --git a/examples/common/scripts/etcd-up.sh b/examples/common/scripts/etcd-up.sh index ac81c1fbd28..1ed22ffce2e 100755 --- a/examples/common/scripts/etcd-up.sh +++ b/examples/common/scripts/etcd-up.sh @@ -32,13 +32,12 @@ sleep 5 # And also add the CellInfo description for the cell. # If the node already exists, it's fine, means we used existing data. -echo "add $cell CellInfo" +echo "add ${cell} CellInfo" set +e -# shellcheck disable=SC2086 -vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \ - --root /vitess/$cell \ +command vtctldclient --server internal AddCellInfo \ + --root "/vitess/${cell}" \ --server-address "${ETCD_SERVER}" \ - $cell + "${cell}" set -e echo "etcd is running!" diff --git a/examples/common/scripts/zk-up.sh b/examples/common/scripts/zk-up.sh index 3137ed724cc..2b79053d2f6 100755 --- a/examples/common/scripts/zk-up.sh +++ b/examples/common/scripts/zk-up.sh @@ -52,10 +52,10 @@ echo "Started zk servers." # If the node already exists, it's fine, means we used existing data. set +e # shellcheck disable=SC2086 -vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \ - --root /vitess/$cell \ - --server-address $ZK_SERVER \ - $cell +command vtctldclient --server internal --topo-implementation zk2 --topo-global-server "${ZK_SERVER}" AddCellInfo \ + --root "/vitess/${cell}" \ + --server-address "${ZK_SERVER}" \ + "${cell}" set -e echo "Configured zk servers." diff --git a/go/cmd/vtctldclient/command/root.go b/go/cmd/vtctldclient/command/root.go index 1194b49ec8f..a5848a7b42a 100644 --- a/go/cmd/vtctldclient/command/root.go +++ b/go/cmd/vtctldclient/command/root.go @@ -22,6 +22,8 @@ import ( "fmt" "io" "strconv" + "strings" + "sync" "time" "github.com/spf13/cobra" @@ -29,7 +31,11 @@ import ( "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/grpcvtctldserver" + "vitess.io/vitess/go/vt/vtctl/localvtctldclient" "vitess.io/vitess/go/vt/vtctl/vtctldclient" + "vitess.io/vitess/go/vt/vttablet/tmclient" // These imports ensure init()s within them get called and they register their commands/subcommands. "vitess.io/vitess/go/cmd/vtctldclient/cli" @@ -42,8 +48,16 @@ import ( _ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/reshard" _ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/vdiff" _ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/workflow" + + // These imports register the topo factories to use when --server=internal. + _ "vitess.io/vitess/go/vt/topo/consultopo" + _ "vitess.io/vitess/go/vt/topo/etcd2topo" + _ "vitess.io/vitess/go/vt/topo/zk2topo" ) +// The --server value if you want to use a "local" vtctld server. +const useInternalVtctld = "internal" + var ( // VtctldClientProtocol is the protocol to use when creating the vtctldclient.VtctldClient. VtctldClientProtocol = "grpc" @@ -54,14 +68,37 @@ var ( commandCtx context.Context commandCancel func() + // Register functions to be called when the command completes. + onTerm = []func(){} + + // Register our nil tmclient grpc handler only one time. + // This is primarily for tests where we execute the root + // command multiple times. + once = sync.Once{} + server string actionTimeout time.Duration compactOutput bool + topoOptions = struct { + implementation string + globalServerAddresses []string + globalRoot string + }{ // Set defaults + implementation: "etcd2", + globalServerAddresses: []string{"localhost:2379"}, + globalRoot: "/vitess/global", + } + // Root is the main entrypoint to the vtctldclient CLI. Root = &cobra.Command{ Use: "vtctldclient", Short: "Executes a cluster management command on the remote vtctld server.", + Long: fmt.Sprintf(`Executes a cluster management command on the remote vtctld server. +If there are no running vtctld servers -- for example when bootstrapping +a new Vitess cluster -- you can specify a --server value of '%s'. +When doing so, you would use the --topo* flags so that the client can +connect directly to the topo server(s).`, useInternalVtctld), // We use PersistentPreRun to set up the tracer, grpc client, and // command context for every command. PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { @@ -87,6 +124,10 @@ var ( if client != nil { err = client.Close() } + // Execute any registered onTerm functions. + for _, f := range onTerm { + f() + } trace.LogErrorsWhenClosing(traceCloser) return err }, @@ -152,6 +193,27 @@ func getClientForCommand(cmd *cobra.Command) (vtctldclient.VtctldClient, error) return nil, errNoServer } + if server == useInternalVtctld { + ts, err := topo.OpenServer(topoOptions.implementation, strings.Join(topoOptions.globalServerAddresses, ","), topoOptions.globalRoot) + if err != nil { + return nil, fmt.Errorf("failed to connect to the topology server: %v", err) + } + onTerm = append(onTerm, ts.Close) + + // Use internal vtctld server implementation. + // Register a nil grpc handler -- we will not use tmclient at all but + // a factory still needs to be registered. + once.Do(func() { + tmclient.RegisterTabletManagerClientFactory("grpc", func() tmclient.TabletManagerClient { + return nil + }) + }) + vtctld := grpcvtctldserver.NewVtctldServer(ts) + localvtctldclient.SetServer(vtctld) + VtctldClientProtocol = "local" + server = "" + } + return vtctldclient.New(VtctldClientProtocol, server) } @@ -159,5 +221,8 @@ func init() { Root.PersistentFlags().StringVar(&server, "server", "", "server to use for the connection (required)") Root.PersistentFlags().DurationVar(&actionTimeout, "action_timeout", time.Hour, "timeout to use for the command") Root.PersistentFlags().BoolVar(&compactOutput, "compact", false, "use compact format for otherwise verbose outputs") + Root.PersistentFlags().StringVar(&topoOptions.implementation, "topo-implementation", topoOptions.implementation, "the topology implementation to use") + Root.PersistentFlags().StringSliceVar(&topoOptions.globalServerAddresses, "topo-global-server-address", topoOptions.globalServerAddresses, "the address of the global topology server(s)") + Root.PersistentFlags().StringVar(&topoOptions.globalRoot, "topo-global-root", topoOptions.globalRoot, "the path of the global topology data in the global topology server") vreplcommon.RegisterCommands(Root) } diff --git a/go/cmd/vtctldclient/command/root_test.go b/go/cmd/vtctldclient/command/root_test.go index 155fac78705..5efe844e1a1 100644 --- a/go/cmd/vtctldclient/command/root_test.go +++ b/go/cmd/vtctldclient/command/root_test.go @@ -17,13 +17,19 @@ limitations under the License. package command_test import ( + "context" + "fmt" "os" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/cmd/vtctldclient/command" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vtctl/localvtctldclient" vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" @@ -52,3 +58,64 @@ func TestRoot(t *testing.T) { assert.Contains(t, err.Error(), "unknown command") }) } + +// TestRootWithInternalVtctld tests that the internal VtctldServer +// implementation -- used with --server=internal -- works for +// commands as expected. +func TestRootWithInternalVtctld(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + cell := "zone1" + ts, factory := memorytopo.NewServerAndFactory(ctx, cell) + topo.RegisterFactory("test", factory) + command.VtctldClientProtocol = "local" + baseArgs := []string{"vtctldclient", "--server", "internal", "--topo-implementation", "test"} + + args := append([]string{}, os.Args...) + protocol := command.VtctldClientProtocol + t.Cleanup(func() { + ts.Close() + os.Args = append([]string{}, args...) + command.VtctldClientProtocol = protocol + }) + + testCases := []struct { + command string + args []string + expectErr string + }{ + { + command: "AddCellInfo", + args: []string{"--root", fmt.Sprintf("/vitess/%s", cell), "--server-address", "", cell}, + expectErr: "node already exists", // Cell already exists + }, + { + command: "GetTablets", + }, + { + command: "NoCommandDrJones", + expectErr: "unknown command", // Invalid command + }, + } + + for _, tc := range testCases { + t.Run(tc.command, func(t *testing.T) { + defer func() { + // Reset the OS args. + os.Args = append([]string{}, args...) + }() + + os.Args = append(baseArgs, tc.command) + os.Args = append(os.Args, tc.args...) + + err := command.Root.Execute() + if tc.expectErr != "" { + if !strings.Contains(err.Error(), tc.expectErr) { + t.Errorf(fmt.Sprintf("%s error = %v, expectErr = %v", tc.command, err, tc.expectErr)) + } + } else { + require.NoError(t, err, "unexpected error: %v", err) + } + }) + } +} diff --git a/go/flags/endtoend/vtctldclient.txt b/go/flags/endtoend/vtctldclient.txt index f70f17f8136..86c1bb3bfa2 100644 --- a/go/flags/endtoend/vtctldclient.txt +++ b/go/flags/endtoend/vtctldclient.txt @@ -1,4 +1,8 @@ Executes a cluster management command on the remote vtctld server. +If there are no running vtctld servers -- for example when bootstrapping +a new Vitess cluster -- you can specify a --server value of 'internal'. +When doing so, you would use the --topo* flags so that the client can +connect directly to the topo server(s). Usage: vtctldclient [flags] @@ -126,6 +130,9 @@ Flags: --security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only) --server string server to use for the connection (required) --stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1) + --topo-global-root string the path of the global topology data in the global topology server (default "/vitess/global") + --topo-global-server-address strings the address of the global topology server(s) (default [localhost:2379]) + --topo-implementation string the topology implementation to use (default "etcd2") -v, --v Level log level for V logs --version version for vtctldclient --vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 25b711e1019..21c4dc272f6 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -2018,7 +2018,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable tablets := make([]*topodatapb.Tablet, 0, len(tabletMap)) for _, ti := range tabletMap { - if req.TabletType != 0 && ti.Type != req.TabletType { + if req.TabletType != topodatapb.TabletType_UNKNOWN && ti.Type != req.TabletType { continue } adjustTypeForStalePrimary(ti, truePrimaryTimestamp) @@ -2086,7 +2086,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable if req.Keyspace != "" && tablet.Keyspace != req.Keyspace { continue } - if req.TabletType != 0 && tablet.Type != req.TabletType { + if req.TabletType != topodatapb.TabletType_UNKNOWN && tablet.Type != req.TabletType { continue } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index 4da072e3955..fe92f284ce8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -303,7 +303,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings // deleteCopyState deletes the copy state entry for a table, signifying that the copy phase is complete for that table. func (vc *vcopier) deleteCopyState(tableName string) error { log.Infof("Deleting copy state for table %s", tableName) - //FIXME get sidecar db name delQuery := fmt.Sprintf("delete from _vt.copy_state where table_name=%s and vrepl_id = %d", encodeString(tableName), vc.vr.id) if _, err := vc.vr.dbClient.Execute(delQuery); err != nil { return err