Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topology Server Locking Refactor #16005

Merged
merged 11 commits into from
Jun 12, 2024
83 changes: 83 additions & 0 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -140,6 +142,87 @@ func TestTopoRestart(t *testing.T) {
}
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
ctx, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockKeyspace(ctx, KeyspaceName, "TestKeyspaceLocking")
require.ErrorContains(t, err, "lock for keyspace customer is already held")
// Check that CheckKeyspaceLocked doesn't return an error.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the keyspace.
unlock(&err)
// Check that we no longer have keyspace lock acquired.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.ErrorContains(t, err, "keyspace customer is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
86 changes: 86 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -111,10 +113,94 @@ func TestTopoDownServingQuery(t *testing.T) {
execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`)
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString())
defer func() {
_ = clusterInstance.TopoProcess.SetupEtcd()
}()
time.Sleep(3 * time.Second)
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
ctx, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockKeyspace(ctx, KeyspaceName, "TestKeyspaceLocking")
require.ErrorContains(t, err, "lock for keyspace customer is already held")
// Check that CheckKeyspaceLocked doesn't return an error.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the keyspace.
unlock(&err)
// Check that we no longer have keyspace lock acquired.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.ErrorContains(t, err, "keyspace customer is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
39 changes: 39 additions & 0 deletions go/test/endtoend/topotest/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"testing"
"time"
)

// WaitForBoolValue takes a pointer to a boolean and waits for it to reach a certain value.
func WaitForBoolValue(t *testing.T, val *bool, waitFor bool) {
timeout := time.After(15 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("Timed out waiting for the boolean to become %v", waitFor)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
return
default:
if *val == waitFor {
return
}
time.Sleep(100 * time.Millisecond)
}
}
}
83 changes: 83 additions & 0 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -116,6 +118,87 @@ func TestTopoDownServingQuery(t *testing.T) {
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
ctx, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockKeyspace(ctx, KeyspaceName, "TestKeyspaceLocking")
require.ErrorContains(t, err, "lock for keyspace customer is already held")
// Check that CheckKeyspaceLocked doesn't return an error.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the keyspace.
unlock(&err)
// Check that we no longer have keyspace lock acquired.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.ErrorContains(t, err, "keyspace customer is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
2 changes: 1 addition & 1 deletion go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
}
for index, sql := range sqls {
// Attempt to renew lease:
if err := rl.Do(func() error { return topo.CheckKeyspaceLockedAndRenew(ctx, exec.keyspace) }); err != nil {
if err := rl.Do(func() error { return topo.CheckKeyspaceLocked(ctx, exec.keyspace) }); err != nil {
return errorExecResult(vterrors.Wrapf(err, "CheckKeyspaceLocked in ApplySchemaKeyspace %v", exec.keyspace))
}
execResult.CurSQLIndex = index
Expand Down
Loading
Loading