Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to prevent stopping buffering prematurely #17013

Merged
merged 10 commits into from
Nov 11, 2024
66 changes: 66 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package newfeaturetest
import (
"context"
"fmt"
"math/rand/v2"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -177,3 +180,66 @@ func TestERSWithWriteInPromoteReplica(t *testing.T) {
_, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s")
require.NoError(t, err, "ERS should not fail even if there is a sidecardb change")
}

func TestSimultaneousPRS(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupShardedReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)

// Start by reparenting all the shards to the first tablet.
keyspace := clusterInstance.Keyspaces[0]
shards := keyspace.Shards
for _, shard := range shards {
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias)
require.NoError(t, err)
}

rowCount := 1000
vtParams := clusterInstance.GetVTParams(keyspace.Name)
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
// Now, we need to insert some data into the cluster.
for i := 1; i <= rowCount; i++ {
_, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false)
require.NoError(t, err)
}

// Now we start a goroutine that continues to read the data until we've finished the test.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
go func() {
conn, err := mysql.Connect(context.Background(), &vtParams)
if err != nil {
return
}
// We're running queries every 100 millisecond and verifying the results are all correct.
res, err := conn.ExecuteFetch(utils.GetSelectionQuery(), rowCount+10, false)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
require.Len(t, res.Rows, rowCount)
}()
}
}
}()

// Now, we run go routines to run PRS calls on all the shards simultaneously.
wg := sync.WaitGroup{}
for _, shard := range shards {
wg.Add(1)
go func() {
time.Sleep(time.Second * time.Duration(rand.IntN(6)))
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[1].Alias)
require.NoError(t, err)
}()
}
wg.Wait()
cancel()
}
45 changes: 45 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,51 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro
return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync")
}

// SetupShardedReparentCluster is used to setup a sharded cluster for testing
func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.LocalProcessCluster {
clusterInstance := cluster.NewCluster(cell1, Hostname)
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--lock_tables_timeout", "5s",
"--track_schema_versions=true",
"--queryserver_enable_online_ddl=false")
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--enable_buffer",
// Long timeout in case failover is slow.
"--buffer_window", "10m",
"--buffer_max_failover_duration", "10m",
"--buffer_min_time_between_failovers", "20m",
)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: sqlSchema,
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
DurabilityPolicy: durability,
}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false)
require.NoError(t, err)

// Start Vtgate
err = clusterInstance.StartVtgate()
require.NoError(t, err)
return clusterInstance
}

// GetInsertQuery returns a built insert query to insert a row.
func GetInsertQuery(idx int) string {
return fmt.Sprintf(insertSQL, idx, idx)
}

// GetSelectionQuery returns a built selection query read the data.
func GetSelectionQuery() string {
return `select * from vt_insert_test`
}

// TeardownCluster is used to teardown the reparent cluster. When
// run in a CI environment -- which is considered true when the
// "CI" env variable is set to "true" -- the teardown also removes
Expand Down
Loading