Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

[do not merge] Slack vitess v11.0.4.vtctld.pre #247

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions go/cmd/vtctldclient/internal/command/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ var GetBackups = &cobra.Command{
}

var getBackupsOptions = struct {
Limit uint32
OutputJSON bool
Limit uint32
Detailed bool
DetailedLimit uint32
OutputJSON bool
}{}

func commandGetBackups(cmd *cobra.Command, args []string) error {
Expand All @@ -49,15 +51,17 @@ func commandGetBackups(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

resp, err := client.GetBackups(commandCtx, &vtctldatapb.GetBackupsRequest{
Keyspace: keyspace,
Shard: shard,
Limit: getBackupsOptions.Limit,
Keyspace: keyspace,
Shard: shard,
Limit: getBackupsOptions.Limit,
Detailed: getBackupsOptions.Detailed,
DetailedLimit: getBackupsOptions.DetailedLimit,
})
if err != nil {
return err
}

if getBackupsOptions.OutputJSON {
if getBackupsOptions.OutputJSON || getBackupsOptions.Detailed {
data, err := cli.MarshalJSON(resp)
if err != nil {
return err
Expand All @@ -80,5 +84,7 @@ func commandGetBackups(cmd *cobra.Command, args []string) error {
func init() {
GetBackups.Flags().Uint32VarP(&getBackupsOptions.Limit, "limit", "l", 0, "Retrieve only the most recent N backups")
GetBackups.Flags().BoolVarP(&getBackupsOptions.OutputJSON, "json", "j", false, "Output backup info in JSON format rather than a list of backups")
GetBackups.Flags().BoolVar(&getBackupsOptions.Detailed, "detailed", false, "Get detailed backup info, such as the engine used for each backup, and its status. Implies --json.")
GetBackups.Flags().Uint32Var(&getBackupsOptions.DetailedLimit, "detailed-limit", 0, "Get detailed backup info for only the most recent N backups. Ignored if --detailed is not passed.")
Root.AddCommand(GetBackups)
}
13 changes: 12 additions & 1 deletion go/vt/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package grpcclient

import (
"context"
"flag"
"time"

Expand Down Expand Up @@ -58,6 +59,16 @@ func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([
// failFast is a non-optional parameter because callers are required to specify
// what that should be.
func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return DialContext(context.Background(), target, failFast, opts...)
}

// DialContext creates a grpc connection to the given target. Setup steps are
// covered by the context deadline, and, if WithBlock is specified in the dial
// options, connection establishment steps are covered by the context as well.
//
// failFast is a non-optional parameter because callers are required to specify
// what that should be.
func DialContext(ctx context.Context, target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
grpccommon.EnableTracingOpt()
newopts := []grpc.DialOption{
grpc.WithDefaultCallOptions(
Expand Down Expand Up @@ -98,7 +109,7 @@ func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.Clie

newopts = append(newopts, interceptors()...)

return grpc.Dial(target, newopts...)
return grpc.DialContext(ctx, target, newopts...)
}

func interceptors() []grpc.DialOption {
Expand Down
12 changes: 12 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,18 @@ func KeyRangeStartEqual(left, right *topodatapb.KeyRange) bool {
return bytes.Equal(addPadding(left.Start), addPadding(right.Start))
}

// KeyRangeContiguous returns true if the end of the left key range exactly
// matches the start of the right key range (i.e they are contigious)
func KeyRangeContiguous(left, right *topodatapb.KeyRange) bool {
if left == nil {
return right == nil || (len(right.Start) == 0 && len(right.End) == 0)
}
if right == nil {
return len(left.Start) == 0 && len(left.End) == 0
}
return bytes.Equal(addPadding(left.End), addPadding(right.Start))
}

// KeyRangeEndEqual returns true if both key ranges have the same end
func KeyRangeEndEqual(left, right *topodatapb.KeyRange) bool {
if left == nil {
Expand Down
116 changes: 60 additions & 56 deletions go/vt/key/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,6 @@ func TestKeyRangeAdd(t *testing.T) {
out: "40-c0",
ok: true,
}}
stringToKeyRange := func(spec string) *topodatapb.KeyRange {
if spec == "" {
return nil
}
parts := strings.Split(spec, "-")
if len(parts) != 2 {
panic("invalid spec")
}
kr, err := ParseKeyRangeParts(parts[0], parts[1])
if err != nil {
panic(err)
}
return kr
}
keyRangeToString := func(kr *topodatapb.KeyRange) string {
if kr == nil {
return ""
Expand Down Expand Up @@ -271,20 +257,6 @@ func TestKeyRangeEndEqual(t *testing.T) {
second: "-8000",
out: true,
}}
stringToKeyRange := func(spec string) *topodatapb.KeyRange {
if spec == "" {
return nil
}
parts := strings.Split(spec, "-")
if len(parts) != 2 {
panic("invalid spec")
}
kr, err := ParseKeyRangeParts(parts[0], parts[1])
if err != nil {
panic(err)
}
return kr
}

for _, tcase := range testcases {
first := stringToKeyRange(tcase.first)
Expand Down Expand Up @@ -326,20 +298,6 @@ func TestKeyRangeStartEqual(t *testing.T) {
second: "-8000",
out: true,
}}
stringToKeyRange := func(spec string) *topodatapb.KeyRange {
if spec == "" {
return nil
}
parts := strings.Split(spec, "-")
if len(parts) != 2 {
panic("invalid spec")
}
kr, err := ParseKeyRangeParts(parts[0], parts[1])
if err != nil {
panic(err)
}
return kr
}

for _, tcase := range testcases {
first := stringToKeyRange(tcase.first)
Expand Down Expand Up @@ -377,20 +335,6 @@ func TestKeyRangeEqual(t *testing.T) {
second: "-8000",
out: true,
}}
stringToKeyRange := func(spec string) *topodatapb.KeyRange {
if spec == "" {
return nil
}
parts := strings.Split(spec, "-")
if len(parts) != 2 {
panic("invalid spec")
}
kr, err := ParseKeyRangeParts(parts[0], parts[1])
if err != nil {
panic(err)
}
return kr
}

for _, tcase := range testcases {
first := stringToKeyRange(tcase.first)
Expand All @@ -402,6 +346,51 @@ func TestKeyRangeEqual(t *testing.T) {
}
}

func TestKeyRangeContiguous(t *testing.T) {
testcases := []struct {
first string
second string
out bool
}{{
first: "-40",
second: "40-80",
out: true,
}, {
first: "40-80",
second: "-40",
out: false,
}, {
first: "-",
second: "-40",
out: true,
}, {
first: "40-80",
second: "c0-",
out: false,
}, {
first: "40-80",
second: "80-c0",
out: true,
}, {
first: "40-80",
second: "8000000000000000-c000000000000000",
out: true,
}, {
first: "4000000000000000-8000000000000000",
second: "80-c0",
out: true,
}}

for _, tcase := range testcases {
first := stringToKeyRange(tcase.first)
second := stringToKeyRange(tcase.second)
out := KeyRangeContiguous(first, second)
if out != tcase.out {
t.Fatalf("KeyRangeContiguous(%q, %q) expected %t, got %t", tcase.first, tcase.second, tcase.out, out)
}
}
}

func TestEvenShardsKeyRange_Error(t *testing.T) {
testCases := []struct {
i, n int
Expand Down Expand Up @@ -813,3 +802,18 @@ func TestShardCalculatorForShardsGreaterThan512(t *testing.T) {

assert.Equal(t, want, got[511], "Invalid mapping for a 512-shard keyspace. Expected %v, got %v", want, got[511])
}

func stringToKeyRange(spec string) *topodatapb.KeyRange {
if spec == "" {
return nil
}
parts := strings.Split(spec, "-")
if len(parts) != 2 {
panic("invalid spec")
}
kr, err := ParseKeyRangeParts(parts[0], parts[1])
if err != nil {
panic(err)
}
return kr
}
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/azblobbackupstorage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ func (bh *AZBlobBackupHandle) ReadFile(ctx context.Context, filename string) (io
}), nil
}

// CheckFile is part of the BackupHandle interface. It is currently unimplemented.
func (bh *AZBlobBackupHandle) CheckFile(ctx context.Context, filename string) (bool, error) {
return false, nil
}

// AZBlobBackupStorage structs implements the BackupStorage interface for AZBlob
type AZBlobBackupStorage struct {
}
Expand Down
50 changes: 50 additions & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package mysqlctl

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand All @@ -34,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -140,6 +143,53 @@ func Backup(ctx context.Context, params BackupParams) error {
return finishErr
}

// GetBackupInfo returns the name of the backupengine used to produce a given
// backup, based on the MANIFEST file from the backup, and the Status of the
// backup, based on the engine-specific definition of what makes a complete or
// valid backup.
func GetBackupInfo(ctx context.Context, bh backupstorage.BackupHandle) (engine string, status mysqlctlpb.BackupInfo_Status, err error) {
mfest, err := bh.ReadFile(ctx, backupManifestFileName)
if err != nil {
// (TODO|@ajm88): extend (backupstorage.BackupHandle).ReadFile to wrap
// certain errors as fs.ErrNotExist, and distinguish between INCOMPLETE
// (MANIFEST has not been written to storage) and INVALID (MANIFEST
// exists but can't be read/parsed).
return "", mysqlctlpb.BackupInfo_INCOMPLETE, err
}
defer mfest.Close()

mfestBytes, err := ioutil.ReadAll(mfest)
if err != nil {
return "", mysqlctlpb.BackupInfo_INVALID, err
}

// We unmarshal into a map here rather than using the GetBackupManifest
// because we are going to pass the raw mfestBytes to the particular
// backupengine implementation for further unmarshalling and processing.
//
// As a result, some of this code is duplicated with other functions in this
// package, but doing things this way has the benefit of minimizing extra
// calls to backupstorage.BackupHandle methods (which can be network-y and
// slow, or subject to external rate limits, etc).
var manifest map[string]interface{}
if err := json.Unmarshal(mfestBytes, &manifest); err != nil {
return "", mysqlctlpb.BackupInfo_INVALID, err
}

engine, ok := manifest["BackupMethod"].(string)
if !ok {
return "", mysqlctlpb.BackupInfo_INVALID, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "missing BackupMethod field in MANIFEST")
}

be, err := getBackupEngine(engine)
if err != nil {
return engine, mysqlctlpb.BackupInfo_COMPLETE, err
}

status, err = be.GetBackupStatus(ctx, bh, mfestBytes)
return engine, status, err
}

// ParseBackupName parses the backup name for a given dir/name, according to
// the format generated by mysqlctl.Backup. An error is returned only if the
// backup name does not have the expected number of parts; errors parsing the
Expand Down
10 changes: 10 additions & 0 deletions go/vt/mysqlctl/backupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"

mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
)

var (
Expand All @@ -44,6 +46,10 @@ var (
// BackupEngine is the interface to take a backup with a given engine.
type BackupEngine interface {
ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error)
// GetBackupStatus returns the status of a given backup, according to the
// specifics of the particular backupengine implementation. See the comments
// on the various implementations for more information.
GetBackupStatus(ctx context.Context, bh backupstorage.BackupHandle, mfestBytes []byte) (mysqlctlpb.BackupInfo_Status, error)
ShouldDrainForBackup() bool
}

Expand Down Expand Up @@ -119,6 +125,10 @@ var BackupRestoreEngineMap = make(map[string]BackupRestoreEngine)
// This must only be called after flags have been parsed.
func GetBackupEngine() (BackupEngine, error) {
name := *backupEngineImplementation
return getBackupEngine(name)
}

func getBackupEngine(name string) (BackupEngine, error) {
be, ok := BackupRestoreEngineMap[name]
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "unknown BackupEngine implementation %q", name)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ type BackupHandle interface {
// ReadCloser is closed.
ReadFile(ctx context.Context, filename string) (io.ReadCloser, error)

// CheckFile checks if a file is included in a backup. Only works for
// read-only backups (created by ListBackups). Returns a boolean to indicate
// if the file exists, and an error. Variants of "file not found" errors do
// result in an error, but instead result in (false, nil).
CheckFile(ctx context.Context, filename string) (bool, error)

// concurrency.ErrorRecorder is embedded here to coordinate reporting and
// handling of errors among all the components involved in taking a backup.
concurrency.ErrorRecorder
Expand Down
Loading