Skip to content

Commit

Permalink
Support cluster bootstrapping in vtctldclient (vitessio#14315)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored and ejortegau committed Dec 13, 2023
1 parent ccf3017 commit b6ce0a0
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 16 deletions.
8 changes: 4 additions & 4 deletions examples/common/scripts/consul-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ sleep 5

# Add the CellInfo description for the cell.
# If the node already exists, it's fine, means we used existing data.
echo "add $cell CellInfo"
echo "add ${cell} CellInfo"
set +e
# shellcheck disable=SC2086
vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \
--root "vitess/$cell" \
command vtctldclient --server internal --topo-implementation consul --topo-global-server "${CONSUL_SERVER}:${consul_http_port}" AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${CONSUL_SERVER}:${consul_http_port}" \
"$cell"
"${cell}"
set -e

echo "consul start done..."
9 changes: 4 additions & 5 deletions examples/common/scripts/etcd-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ sleep 5

# And also add the CellInfo description for the cell.
# If the node already exists, it's fine, means we used existing data.
echo "add $cell CellInfo"
echo "add ${cell} CellInfo"
set +e
# shellcheck disable=SC2086
vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \
--root /vitess/$cell \
command vtctldclient --server internal AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${ETCD_SERVER}" \
$cell
"${cell}"
set -e

echo "etcd is running!"
Expand Down
8 changes: 4 additions & 4 deletions examples/common/scripts/zk-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ echo "Started zk servers."
# If the node already exists, it's fine, means we used existing data.
set +e
# shellcheck disable=SC2086
vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \
--root /vitess/$cell \
--server-address $ZK_SERVER \
$cell
command vtctldclient --server internal --topo-implementation zk2 --topo-global-server "${ZK_SERVER}" AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${ZK_SERVER}" \
"${cell}"
set -e

echo "Configured zk servers."
65 changes: 65 additions & 0 deletions go/cmd/vtctldclient/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@ import (
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/spf13/cobra"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver"
"vitess.io/vitess/go/vt/vtctl/localvtctldclient"
"vitess.io/vitess/go/vt/vtctl/vtctldclient"
"vitess.io/vitess/go/vt/vttablet/tmclient"

// These imports ensure init()s within them get called and they register their commands/subcommands.
"vitess.io/vitess/go/cmd/vtctldclient/cli"
Expand All @@ -42,8 +48,16 @@ import (
_ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/reshard"
_ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/vdiff"
_ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/workflow"

// These imports register the topo factories to use when --server=internal.
_ "vitess.io/vitess/go/vt/topo/consultopo"
_ "vitess.io/vitess/go/vt/topo/etcd2topo"
_ "vitess.io/vitess/go/vt/topo/zk2topo"
)

// The --server value if you want to use a "local" vtctld server.
const useInternalVtctld = "internal"

var (
// VtctldClientProtocol is the protocol to use when creating the vtctldclient.VtctldClient.
VtctldClientProtocol = "grpc"
Expand All @@ -54,14 +68,37 @@ var (
commandCtx context.Context
commandCancel func()

// Register functions to be called when the command completes.
onTerm = []func(){}

// Register our nil tmclient grpc handler only one time.
// This is primarily for tests where we execute the root
// command multiple times.
once = sync.Once{}

server string
actionTimeout time.Duration
compactOutput bool

topoOptions = struct {
implementation string
globalServerAddresses []string
globalRoot string
}{ // Set defaults
implementation: "etcd2",
globalServerAddresses: []string{"localhost:2379"},
globalRoot: "/vitess/global",
}

// Root is the main entrypoint to the vtctldclient CLI.
Root = &cobra.Command{
Use: "vtctldclient",
Short: "Executes a cluster management command on the remote vtctld server.",
Long: fmt.Sprintf(`Executes a cluster management command on the remote vtctld server.
If there are no running vtctld servers -- for example when bootstrapping
a new Vitess cluster -- you can specify a --server value of '%s'.
When doing so, you would use the --topo* flags so that the client can
connect directly to the topo server(s).`, useInternalVtctld),
// We use PersistentPreRun to set up the tracer, grpc client, and
// command context for every command.
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
Expand All @@ -87,6 +124,10 @@ var (
if client != nil {
err = client.Close()
}
// Execute any registered onTerm functions.
for _, f := range onTerm {
f()
}
trace.LogErrorsWhenClosing(traceCloser)
return err
},
Expand Down Expand Up @@ -152,12 +193,36 @@ func getClientForCommand(cmd *cobra.Command) (vtctldclient.VtctldClient, error)
return nil, errNoServer
}

if server == useInternalVtctld {
ts, err := topo.OpenServer(topoOptions.implementation, strings.Join(topoOptions.globalServerAddresses, ","), topoOptions.globalRoot)
if err != nil {
return nil, fmt.Errorf("failed to connect to the topology server: %v", err)
}
onTerm = append(onTerm, ts.Close)

// Use internal vtctld server implementation.
// Register a nil grpc handler -- we will not use tmclient at all but
// a factory still needs to be registered.
once.Do(func() {
tmclient.RegisterTabletManagerClientFactory("grpc", func() tmclient.TabletManagerClient {
return nil
})
})
vtctld := grpcvtctldserver.NewVtctldServer(ts)
localvtctldclient.SetServer(vtctld)
VtctldClientProtocol = "local"
server = ""
}

return vtctldclient.New(VtctldClientProtocol, server)
}

func init() {
Root.PersistentFlags().StringVar(&server, "server", "", "server to use for the connection (required)")
Root.PersistentFlags().DurationVar(&actionTimeout, "action_timeout", time.Hour, "timeout to use for the command")
Root.PersistentFlags().BoolVar(&compactOutput, "compact", false, "use compact format for otherwise verbose outputs")
Root.PersistentFlags().StringVar(&topoOptions.implementation, "topo-implementation", topoOptions.implementation, "the topology implementation to use")
Root.PersistentFlags().StringSliceVar(&topoOptions.globalServerAddresses, "topo-global-server-address", topoOptions.globalServerAddresses, "the address of the global topology server(s)")
Root.PersistentFlags().StringVar(&topoOptions.globalRoot, "topo-global-root", topoOptions.globalRoot, "the path of the global topology data in the global topology server")
vreplcommon.RegisterCommands(Root)
}
67 changes: 67 additions & 0 deletions go/cmd/vtctldclient/command/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ limitations under the License.
package command_test

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cmd/vtctldclient/command"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtctl/localvtctldclient"

vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice"
Expand Down Expand Up @@ -52,3 +58,64 @@ func TestRoot(t *testing.T) {
assert.Contains(t, err.Error(), "unknown command")
})
}

// TestRootWithInternalVtctld tests that the internal VtctldServer
// implementation -- used with --server=internal -- works for
// commands as expected.
func TestRootWithInternalVtctld(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cell := "zone1"
ts, factory := memorytopo.NewServerAndFactory(ctx, cell)
topo.RegisterFactory("test", factory)
command.VtctldClientProtocol = "local"
baseArgs := []string{"vtctldclient", "--server", "internal", "--topo-implementation", "test"}

args := append([]string{}, os.Args...)
protocol := command.VtctldClientProtocol
t.Cleanup(func() {
ts.Close()
os.Args = append([]string{}, args...)
command.VtctldClientProtocol = protocol
})

testCases := []struct {
command string
args []string
expectErr string
}{
{
command: "AddCellInfo",
args: []string{"--root", fmt.Sprintf("/vitess/%s", cell), "--server-address", "", cell},
expectErr: "node already exists", // Cell already exists
},
{
command: "GetTablets",
},
{
command: "NoCommandDrJones",
expectErr: "unknown command", // Invalid command
},
}

for _, tc := range testCases {
t.Run(tc.command, func(t *testing.T) {
defer func() {
// Reset the OS args.
os.Args = append([]string{}, args...)
}()

os.Args = append(baseArgs, tc.command)
os.Args = append(os.Args, tc.args...)

err := command.Root.Execute()
if tc.expectErr != "" {
if !strings.Contains(err.Error(), tc.expectErr) {
t.Errorf(fmt.Sprintf("%s error = %v, expectErr = %v", tc.command, err, tc.expectErr))
}
} else {
require.NoError(t, err, "unexpected error: %v", err)
}
})
}
}
7 changes: 7 additions & 0 deletions go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
Executes a cluster management command on the remote vtctld server.
If there are no running vtctld servers -- for example when bootstrapping
a new Vitess cluster -- you can specify a --server value of 'internal'.
When doing so, you would use the --topo* flags so that the client can
connect directly to the topo server(s).

Usage:
vtctldclient [flags]
Expand Down Expand Up @@ -126,6 +130,9 @@ Flags:
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--server string server to use for the connection (required)
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--topo-global-root string the path of the global topology data in the global topology server (default "/vitess/global")
--topo-global-server-address strings the address of the global topology server(s) (default [localhost:2379])
--topo-implementation string the topology implementation to use (default "etcd2")
-v, --v Level log level for V logs
--version version for vtctldclient
--vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable

tablets := make([]*topodatapb.Tablet, 0, len(tabletMap))
for _, ti := range tabletMap {
if req.TabletType != 0 && ti.Type != req.TabletType {
if req.TabletType != topodatapb.TabletType_UNKNOWN && ti.Type != req.TabletType {
continue
}
adjustTypeForStalePrimary(ti, truePrimaryTimestamp)
Expand Down Expand Up @@ -2086,7 +2086,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable
if req.Keyspace != "" && tablet.Keyspace != req.Keyspace {
continue
}
if req.TabletType != 0 && tablet.Type != req.TabletType {
if req.TabletType != topodatapb.TabletType_UNKNOWN && tablet.Type != req.TabletType {
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
// deleteCopyState deletes the copy state entry for a table, signifying that the copy phase is complete for that table.
func (vc *vcopier) deleteCopyState(tableName string) error {
log.Infof("Deleting copy state for table %s", tableName)
//FIXME get sidecar db name
delQuery := fmt.Sprintf("delete from _vt.copy_state where table_name=%s and vrepl_id = %d", encodeString(tableName), vc.vr.id)
if _, err := vc.vr.dbClient.Execute(delQuery); err != nil {
return err
Expand Down

0 comments on commit b6ce0a0

Please sign in to comment.