Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply patch 11909 #131

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 165 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {

const schemaUnsharded = `
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
insert into customer_seq(id, next_id, cache) values(0, 1, 3);
`
const vschemaUnsharded = `
{
Expand Down Expand Up @@ -218,14 +219,19 @@ const vschemaSharded = `
func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false)
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
log.Infof("error inserting row %d: %v", id, err)
}
vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false)
vtgateConn.ExecuteFetch("commit", 1000, false)
}

type numEvents struct {
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numShard0BeforeReshardEvents, numShard0AfterReshardEvents int64
}

// tests the StopOnReshard flag
Expand Down Expand Up @@ -375,6 +381,150 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
return &ne
}

// Validate that we can continue streaming from multiple keyspaces after first copying some tables and then resharding one of the keyspaces
// Ensure that there are no missing row events during the resharding process.
func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEvents {
defaultCellName := "zone1"
allCellNames = defaultCellName
allCells := []string{allCellNames}
vc = NewVitessCluster(t, "VStreamCopyMultiKeyspaceReshard", allCells, mainClusterConfig)

require.NotNil(t, vc)
ogdr := defaultReplicas
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
if err != nil {
log.Fatal(err)
}
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// We want to confirm that the following two tables are streamed.
// 1. the customer_seq in the unsharded keyspace
// 2. the customer table in the sharded keyspace
Match: "/customer.*/",
}},
}
flags := &vtgatepb.VStreamFlags{}
done := false

id := 1000
// First goroutine that keeps inserting rows into the table being streamed until a minute after reshard
// We should keep getting events on the new shards
go func() {
for {
if done {
return
}
id++
time.Sleep(1 * time.Second)
insertRow("sharded", "customer", id)
}
}()
// stream events from the VStream API
var ne numEvents
reshardDone := false
go func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "0":
if reshardDone {
ne.numShard0AfterReshardEvents++
} else {
ne.numShard0BeforeReshardEvents++
}
case "-80":
ne.numLessThan80Events++
case "80-":
ne.numGreaterThan80Events++
case "-40":
ne.numLessThan40Events++
case "40-":
ne.numGreaterThan40Events++
}
ne.numRowEvents++
case binlogdatapb.VEventType_JOURNAL:
ne.numJournalEvents++
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Errorf("Returned err %v", err)
done = true
}
if done {
return
}
}
}()

ticker := time.NewTicker(1 * time.Second)
tickCount := 0
for {
<-ticker.C
tickCount++
switch tickCount {
case 1:
reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName)
reshardDone = true
case 60:
done = true
}
if done {
break
}
}
log.Infof("ne=%v", ne)

// The number of row events streamed by the VStream API should match the number of rows inserted.
// This is important for sharded tables, where we need to ensure that no row events are missed during the resharding process.
//
// On the other hand, we don't verify the exact number of row events for the unsharded keyspace
// because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1.
// We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding,
// is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward.
customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer")
insertedCustomerRows, err := evalengine.ToInt64(customerResult.Rows[0][0])
require.NoError(t, err)
require.Equal(t, insertedCustomerRows, ne.numLessThan80Events+ne.numGreaterThan80Events+ne.numLessThan40Events+ne.numGreaterThan40Events)
return ne
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down Expand Up @@ -406,3 +556,15 @@ func TestVStreamWithKeyspacesToWatch(t *testing.T) {

testVStreamWithFailover(t, false)
}

func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
ne := testVStreamCopyMultiKeyspaceReshard(t, 3000)
require.Equal(t, int64(0), ne.numJournalEvents)
require.NotZero(t, ne.numRowEvents)
require.NotZero(t, ne.numShard0BeforeReshardEvents)
require.NotZero(t, ne.numShard0AfterReshardEvents)
require.NotZero(t, ne.numLessThan80Events)
require.NotZero(t, ne.numGreaterThan80Events)
require.NotZero(t, ne.numLessThan40Events)
require.NotZero(t, ne.numGreaterThan40Events)
}
68 changes: 60 additions & 8 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ create table t1_copy_basic(
primary key(id1)
) Engine=InnoDB;

create table t1_copy_all(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_copy_resume(
id1 bigint,
id2 bigint,
Expand Down Expand Up @@ -151,6 +157,12 @@ create table t1_sharded(
Name: "hash",
}},
},
"t1_copy_all": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down Expand Up @@ -218,6 +230,31 @@ create table t1_sharded(
},
},
}

schema2 = `
create table t1_copy_all_ks2(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
`

vschema2 = &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1_copy_all_ks2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
},
}
)

func TestMain(m *testing.M) {
Expand All @@ -226,21 +263,36 @@ func TestMain(m *testing.M) {
exitCode := func() int {
var cfg vttest.Config
cfg.Topology = &vttestpb.VTTestTopology{
Keyspaces: []*vttestpb.Keyspace{{
Name: "ks",
Shards: []*vttestpb.Shard{{
Name: "-80",
}, {
Name: "80-",
}},
}},
Keyspaces: []*vttestpb.Keyspace{
{
Name: "ks",
Shards: []*vttestpb.Shard{{
Name: "-80",
}, {
Name: "80-",
}},
},
{
Name: "ks2",
Shards: []*vttestpb.Shard{{
Name: "-80",
}, {
Name: "80-",
}},
},
},
}
if err := cfg.InitSchemas("ks", schema, vschema); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.RemoveAll(cfg.SchemaDir)
return 1
}
defer os.RemoveAll(cfg.SchemaDir)
if err := cfg.InitSchemas("ks2", schema2, vschema2); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.RemoveAll(cfg.SchemaDir)
return 1
}

cfg.TabletHostName = *tabletHostName

Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package endtoend
import (
"context"
"fmt"
osExec "os/exec"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -55,6 +56,16 @@ func TestCreateAndDropDatabase(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

// cleanup the keyspace from the topology.
defer func() {
// the corresponding database needs to be created in advance.
// a subsequent DeleteKeyspace command returns the error of 'node doesn't exist' without it.
_ = exec(t, conn, "create database testitest")

_, err := osExec.Command("vtctldclient", "--server", grpcAddress, "DeleteKeyspace", "--recursive", "--force", "testitest").CombinedOutput()
require.NoError(t, err)
}()

// run it 3 times.
for count := 0; count < 3; count++ {
t.Run(fmt.Sprintf("exec:%d", count), func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/endtoend/row_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/utils"
)

func TestRowCount(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()
utils.Exec(t, conn, "use ks")
type tc struct {
query string
expected int
Expand Down
Loading
Loading