Skip to content

Commit

Permalink
VReplication: use new topo named locks and TTL override for workflow …
Browse files Browse the repository at this point in the history
…coordination (#16260)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Jul 10, 2024
1 parent eb29999 commit 16b05c1
Show file tree
Hide file tree
Showing 30 changed files with 724 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func GetSwitchTrafficCommand(opts *SubCommandsOpts) *cobra.Command {
topodatapb.TabletType_RDONLY,
}
}
if SwitchTrafficOptions.Timeout.Seconds() < 1 {
return fmt.Errorf("timeout value must be at least 1 second")
}
return nil
},
RunE: commandSwitchTraffic,
Expand Down
3 changes: 1 addition & 2 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ var (
}
onDDLDefault = binlogdatapb.OnDDLAction_IGNORE.String()
MaxReplicationLagDefault = 30 * time.Second
TimeoutDefault = 30 * time.Second

BaseOptions = struct {
Workflow string
Expand Down Expand Up @@ -245,7 +244,7 @@ var SwitchTrafficOptions = struct {
func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) {
cmd.Flags().StringSliceVarP(&SwitchTrafficOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to switch traffic in.")
cmd.Flags().Var((*topoproto.TabletTypeListFlag)(&SwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for.")
cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", TimeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
cmd.Flags().DurationVar(&SwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", MaxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")
Expand Down
3 changes: 2 additions & 1 deletion go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"

Expand Down Expand Up @@ -874,7 +875,7 @@ func registerCommands(root *cobra.Command) {
create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.")
create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.")
create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.")
create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.")
create.Flags().Int64Var(&createOptions.Limit, "limit", math.MaxInt64, "Max rows to stop comparing after.")
create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.")
create.Flags().Int64Var(&createOptions.MaxReportSampleRows, "max-report-sample-rows", 10, "Maximum number of row differences to report (0 for all differences). NOTE: when increasing this value it is highly recommended to also specify --only-pks")
Expand Down
69 changes: 69 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ectd2
import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -201,6 +202,74 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestLockingWithTTL tests that locking with the TTL override works as intended.
func TestLockingWithTTL(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()

// Acquire a keyspace lock with a short custom TTL.
ttl := 1 * time.Second
ctx, unlock, err := ts.LockKeyspace(ctx, KeyspaceName, "TestLockingWithTTL", topo.WithTTL(ttl))
require.NoError(t, err)
defer unlock(&err)

// Check that CheckKeyspaceLocked DOES return an error after waiting more than
// the specified TTL as we should have lost our lock.
time.Sleep(ttl * 2)
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.Error(t, err)
}

// TestNamedLocking tests that named locking works as intended.
func TestNamedLocking(t *testing.T) {
// Create topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()
lockName := "TestNamedLocking"
action := "Testing"

// Acquire a named lock.
ctx, unlock, err := ts.LockName(ctx, lockName, action)
require.NoError(t, err)

// Check that we can't reacquire it from the same context.
_, _, err = ts.LockName(ctx, lockName, action)
require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName))

// Check that CheckNameLocked doesn't return an error as we should still be
// holding the lock.
err = topo.CheckNameLocked(ctx, lockName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different goroutine.
secondCallerAcquired := false
go func() {
_, unlock, err := ts.LockName(context.Background(), lockName, action)
defer unlock(&err)
require.NoError(t, err)
secondCallerAcquired = true
}()

// Wait for some time and ensure that the second attempt at acquiring the lock
// is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondCallerAcquired)

// Unlock the name.
unlock(&err)
// Check that we no longer have the named lock.
err = topo.CheckNameLocked(ctx, lockName)
require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName))

// Wait to see that the second goroutine WAS now able to acquire the named lock.
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package vreplication

var dryRunResultsSwitchWritesCustomerShard = []string{
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]",
"Lock keyspace product",
"Lock keyspace customer",
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]",
"/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:",
"Wait for vreplication on stopped streams to catchup for up to 30s",
"Create reverse vreplication workflow p2c_reverse",
Expand All @@ -36,8 +36,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{
}

var dryRunResultsReadCustomerShard = []string{
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]",
"Lock keyspace product",
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]",
"Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Serving VSchema will be rebuilt for the customer keyspace",
Expand Down
16 changes: 16 additions & 0 deletions go/vt/topo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package topo
import (
"context"
"sort"
"time"
)

// Conn defines the interface that must be implemented by topology
Expand Down Expand Up @@ -120,6 +121,21 @@ type Conn interface {
// Returns ErrInterrupted if ctx is canceled.
Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error)

// LockWithTTL is similar to `Lock` but the difference is that it allows
// you to override the global default TTL that is configured for the
// implementation (--topo_etcd_lease_ttl and --topo_consul_lock_session_ttl).
// Note: this is no different than `Lock` for ZooKeeper as it does not
// support lock TTLs and they exist until released or the session ends.
LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error)

// LockName is similar to `Lock` but the difference is that it does not require
// the path to exist and have children in order to lock it. This is because with
// named locks you are NOT locking an actual topo entity such as a Keyspace record.
// Because this lock is not blocking any Vitess operations OTHER than another
// caller that is trying to get the same named lock, there is a static 24 hour
// TTL on them to ensure that they are eventually cleaned up.
LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error)

// TryLock takes lock on the given directory with a fail-fast approach.
// It is similar to `Lock` but the difference is it attempts to acquire the lock
// if it is likely to succeed. If there is already a lock on given path, then unlike `Lock`
Expand Down
40 changes: 34 additions & 6 deletions go/vt/topo/consultopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/hashicorp/consul/api"

Expand Down Expand Up @@ -49,7 +50,27 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, s.lockTTL)
}

// LockWithTTL is part of the topo.Conn interface.
func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) {
// We list the directory first to make sure it exists.
if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil {
// We need to return the right error codes, like
// topo.ErrNoNode and topo.ErrInterrupted, and the
// easiest way to do this is to return convertError(err).
// It may lose some of the context, if this is an issue,
// maybe logging the error would work here.
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents, ttl.String())
}

// LockName is part of the topo.Conn interface.
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return s.lock(ctx, dirPath, contents, topo.NamedLockTTL.String())
}

// TryLock is part of the topo.Conn interface.
Expand All @@ -74,11 +95,11 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
}

// everything is good let's acquire the lock.
return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, s.lockTTL)
}

// Lock is part of the topo.Conn interface.
func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
func (s *Server) lock(ctx context.Context, dirPath, contents, ttl string) (topo.LockDescriptor, error) {
lockPath := path.Join(s.root, dirPath, locksFilename)

lockOpts := &api.LockOptions{
Expand All @@ -90,12 +111,19 @@ func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockD
},
}
lockOpts.SessionOpts.Checks = s.lockChecks
if s.lockDelay > 0 {
lockOpts.SessionOpts.LockDelay = s.lockDelay
}
if s.lockTTL != "" {
// Override the API default with the global default from
// --topo_consul_lock_session_ttl.
lockOpts.SessionOpts.TTL = s.lockTTL
}
if ttl != "" {
// Override the global default with the one provided by the
// caller.
lockOpts.SessionOpts.TTL = ttl
}
if s.lockDelay > 0 {
lockOpts.SessionOpts.LockDelay = s.lockDelay
}
// Build the lock structure.
l, err := s.client.LockOpts(lockOpts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/consultopo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Server struct {
locks map[string]*lockInstance

lockChecks []string
lockTTL string
lockTTL string // This is the default used for all non-named locks
lockDelay time.Duration
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/etcd2topo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error)

// Try to get the primaryship, by getting a lock.
var err error
ld, err = mp.s.lock(lockCtx, electionPath, mp.id)
ld, err = mp.s.lock(lockCtx, electionPath, mp.id, leaseTTL)
if err != nil {
// It can be that we were interrupted.
return nil, err
Expand Down
31 changes: 26 additions & 5 deletions go/vt/topo/etcd2topo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/spf13/pflag"

Expand All @@ -34,7 +35,7 @@ import (
)

var (
leaseTTL = 30
leaseTTL = 30 // This is the default used for all non-named locks
)

func init() {
Expand Down Expand Up @@ -153,7 +154,7 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
}

// everything is good let's acquire the lock.
return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, leaseTTL)
}

// Lock is part of the topo.Conn interface.
Expand All @@ -168,15 +169,35 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, leaseTTL)
}

// LockWithTTL is part of the topo.Conn interface.
func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) {
// We list the directory first to make sure it exists.
if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil {
// We need to return the right error codes, like
// topo.ErrNoNode and topo.ErrInterrupted, and the
// easiest way to do this is to return convertError(err).
// It may lose some of the context, if this is an issue,
// maybe logging the error would work here.
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents, int(ttl.Seconds()))
}

// LockName is part of the topo.Conn interface.
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return s.lock(ctx, dirPath, contents, int(topo.NamedLockTTL.Seconds()))
}

// lock is used by both Lock() and primary election.
func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.LockDescriptor, error) {
func (s *Server) lock(ctx context.Context, nodePath, contents string, ttl int) (topo.LockDescriptor, error) {
nodePath = path.Join(s.root, nodePath, locksPath)

// Get a lease, set its KeepAlive.
lease, err := s.cli.Grant(ctx, int64(leaseTTL))
lease, err := s.cli.Grant(ctx, int64(ttl))
if err != nil {
return nil, convertError(err, nodePath)
}
Expand Down
15 changes: 15 additions & 0 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"strings"
"sync"
"time"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -291,6 +292,20 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc
return &fakeLockDescriptor{}, nil
}

// LockWithTTL implements the Conn interface.
func (f *FakeConn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) {
f.mu.Lock()
defer f.mu.Unlock()
return &fakeLockDescriptor{}, nil
}

// LockName implements the Conn interface.
func (f *FakeConn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
f.mu.Lock()
defer f.mu.Unlock()
return &fakeLockDescriptor{}, nil
}

// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (f *FakeConn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return f.Lock(ctx, dirPath, contents)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/keyspace_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func (s *keyspaceLock) Path() string {
// - a context with a locksInfo structure for future reference.
// - an unlock method
// - an error if anything failed.
func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) {
func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string, opts ...LockOption) (context.Context, func(*error), error) {
return ts.internalLock(ctx, &keyspaceLock{
keyspace: keyspace,
}, action, true)
}, action, opts...)
}

// CheckKeyspaceLocked can be called on a context to make sure we have the lock
Expand Down
Loading

0 comments on commit 16b05c1

Please sign in to comment.