Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into vrepl_dry_run_perms
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 16, 2024
2 parents 3caa747 + b5eb928 commit 30807f0
Show file tree
Hide file tree
Showing 45 changed files with 8,307 additions and 5,621 deletions.
29 changes: 0 additions & 29 deletions go/cmd/vtctldclient/command/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ var (
RunE: commandExecuteMultiFetchAsDBA,
Aliases: []string{"ExecuteMultiFetchAsDba"},
}
// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "GetUnresolvedTransactions <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,
}
)

var executeFetchAsAppOptions = struct {
Expand Down Expand Up @@ -205,26 +198,6 @@ func commandExecuteMultiFetchAsDBA(cmd *cobra.Command, args []string) error {
return nil
}

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Printf("%s\n", data)
return nil
}

func init() {
ExecuteFetchAsApp.Flags().Int64Var(&executeFetchAsAppOptions.MaxRows, "max-rows", 10_000, "The maximum number of rows to fetch from the remote tablet.")
ExecuteFetchAsApp.Flags().BoolVar(&executeFetchAsAppOptions.UsePool, "use-pool", false, "Use the tablet connection pool instead of creating a fresh connection.")
Expand All @@ -242,6 +215,4 @@ func init() {
ExecuteMultiFetchAsDBA.Flags().BoolVar(&executeMultiFetchAsDBAOptions.ReloadSchema, "reload-schema", false, "Instructs the tablet to reload its schema after executing the query.")
ExecuteMultiFetchAsDBA.Flags().BoolVarP(&executeMultiFetchAsDBAOptions.JSON, "json", "j", false, "Output the results in JSON instead of a human-readable table.")
Root.AddCommand(ExecuteMultiFetchAsDBA)

Root.AddCommand(GetUnresolvedTransactions)
}
134 changes: 134 additions & 0 deletions go/cmd/vtctldclient/command/transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package command

import (
"fmt"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
querypb "vitess.io/vitess/go/vt/proto/query"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
DistributedTransaction = &cobra.Command{
Use: "DistributedTransaction <cmd>",
Short: "Perform commands on distributed transaction",
Args: cobra.MinimumNArgs(2),

DisableFlagsInUseLine: true,
}

// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "list <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Aliases: []string{"List"},
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,

DisableFlagsInUseLine: true,
}

// ConcludeTransaction makes a ConcludeTransaction gRPC call to a vtctld.
ConcludeTransaction = &cobra.Command{
Use: "conclude <dtid> [<keyspace/shard> ...]",
Short: "Concludes the unresolved transaction by rolling back the prepared transaction on each participating shard and removing the transaction metadata record.",
Aliases: []string{"Conclude"},
Args: cobra.MinimumNArgs(1),
RunE: commandConcludeTransaction,

DisableFlagsInUseLine: true,
}
)

type ConcludeTransactionOutput struct {
Dtid string `json:"dtid"`
Message string `json:"message"`
Error string `json:"error,omitempty"`
}

const (
concludeSuccess = "Successfully concluded the distributed transaction"
concludeFailure = "Failed to conclude the distributed transaction"
)

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}

func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
allArgs := cmd.Flags().Args()
shards, err := cli.ParseKeyspaceShards(allArgs[1:])
if err != nil {
return err
}
cli.FinishedParsing(cmd)

dtid := allArgs[0]
var participants []*querypb.Target
for _, shard := range shards {
participants = append(participants, &querypb.Target{
Keyspace: shard.Keyspace,
Shard: shard.Name,
})
}
output := ConcludeTransactionOutput{
Dtid: dtid,
Message: concludeSuccess,
}

_, err = client.ConcludeTransaction(commandCtx,
&vtctldatapb.ConcludeTransactionRequest{
Dtid: dtid,
Participants: participants,
})
if err != nil {
output.Message = concludeFailure
output.Error = err.Error()
}

data, _ := cli.MarshalJSON(output)
fmt.Println(string(data))

return err
}

func init() {
DistributedTransaction.AddCommand(GetUnresolvedTransactions)
DistributedTransaction.AddCommand(ConcludeTransaction)

Root.AddCommand(DistributedTransaction)
}
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Available Commands:
DeleteShards Deletes the specified shards from the topology.
DeleteSrvVSchema Deletes the SrvVSchema object in the given cell.
DeleteTablets Deletes tablet(s) from the topology.
DistributedTransaction Perform commands on distributed transaction
EmergencyReparentShard Reparents the shard to the new primary. Assumes the old primary is dead and not responding.
ExecuteFetchAsApp Executes the given query as the App user on the remote tablet.
ExecuteFetchAsDBA Executes the given query as the DBA user on the remote tablet.
Expand Down Expand Up @@ -59,7 +60,6 @@ Available Commands:
GetTablets Looks up tablets according to filter criteria.
GetThrottlerStatus Get the throttler status for the given tablet.
GetTopologyPath Gets the value associated with the particular path (key) in the topology server.
GetUnresolvedTransactions Retrieves unresolved transactions for the given keyspace.
GetVSchema Prints a JSON representation of a keyspace's topo record.
GetWorkflows Gets all vreplication workflows (Reshard, MoveTables, etc) in the given keyspace.
LegacyVtctlCommand Invoke a legacy vtctlclient command. Flag parsing is best effort.
Expand Down
2 changes: 2 additions & 0 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type BinlogEvent interface {

// Timestamp returns the timestamp from the event header.
Timestamp() uint32
// ServerID returns the server ID from the event header.
ServerID() uint32

// Format returns a BinlogFormat struct based on the event data.
// This is only valid if IsFormatDescription() returns true.
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ func (ev filePosFakeEvent) Timestamp() uint32 {
return ev.timestamp
}

func (ev filePosFakeEvent) ServerID() uint32 {
return 1
}

func (ev filePosFakeEvent) Format() (BinlogFormat, error) {
return BinlogFormat{}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func TestRowsEvent(t *testing.T) {
require.True(t, reflect.DeepEqual(gotRows, rows), "NewRowsEvent().Rows() got Rows:\n%v\nexpected:\n%v", gotRows, rows)

assert.NotZero(t, event.Timestamp())
assert.NotZero(t, event.ServerID())
}

func TestHeartbeatEvent(t *testing.T) {
Expand All @@ -384,6 +385,7 @@ func TestHeartbeatEvent(t *testing.T) {
require.NotNil(t, event)
assert.True(t, event.IsHeartbeat())
assert.Zero(t, event.Timestamp())
assert.NotZero(t, event.ServerID())
}

func TestRotateRotateEvent(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions go/test/endtoend/onlineddl/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func CreateTempScript(t *testing.T, content string) (fileName string) {
func MysqlClientExecFile(t *testing.T, mysqlParams *mysql.ConnParams, testDataPath, testName string, fileName string) (output string) {
t.Helper()

errorFile, err := os.CreateTemp("", "onlineddl-test-")
require.NoError(t, err)
defer os.Remove(errorFile.Name())

bashPath, err := exec.LookPath("bash")
require.NoError(t, err)
mysqlPath, err := exec.LookPath("mysql")
Expand All @@ -55,13 +59,15 @@ func MysqlClientExecFile(t *testing.T, mysqlParams *mysql.ConnParams, testDataPa
if !filepath.IsAbs(fileName) {
filePath, _ = filepath.Abs(path.Join(testDataPath, testName, fileName))
}
bashCommand := fmt.Sprintf(`%s -u%s --socket=%s --database=%s -s -s < %s 2> /tmp/error.log`, mysqlPath, mysqlParams.Uname, mysqlParams.UnixSocket, mysqlParams.DbName, filePath)
bashCommand := fmt.Sprintf(`%s -u%s --socket=%s --database=%s -s -s < %s 2> %s`, mysqlPath, mysqlParams.Uname, mysqlParams.UnixSocket, mysqlParams.DbName, filePath, errorFile.Name())
cmd, err := exec.Command(
bashPath,
"-c",
bashCommand,
).Output()

require.NoError(t, err)
errorContent, readerr := os.ReadFile(errorFile.Name())
require.NoError(t, readerr)
require.NoError(t, err, "error details: %s", errorContent)
return string(cmd)
}
12 changes: 11 additions & 1 deletion go/test/endtoend/tabletmanager/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,19 @@ func TestTabletCommands(t *testing.T) {
})

t.Run("GetUnresolvedTransactions", func(t *testing.T) {
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetUnresolvedTransactions", keyspaceName)
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "list", keyspaceName)
require.NoError(t, err)
})
t.Run("ConcludeTransaction", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})
t.Run("ConcludeTransaction with participants", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234", "ks/0")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})
// check Ping / RefreshState / RefreshStateByShard
err = clusterInstance.VtctldClientProcess.ExecuteCommand("PingTablet", primaryTablet.Alias)
require.Nil(t, err, "error should be Nil")
Expand Down
Loading

0 comments on commit 30807f0

Please sign in to comment.