Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: added test to check binlogs to contain the cascade events #13970

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 127 additions & 1 deletion go/test/endtoend/vtgate/foreignkey/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ limitations under the License.
package foreignkey

import (
"context"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// TestInsertWithFK tests that insertions work as expected when foreign key management is enabled in Vitess.
Expand Down Expand Up @@ -101,7 +108,7 @@ func TestDeleteWithFK(t *testing.T) {
utils.AssertMatches(t, conn, `select * from u_t2`, `[[INT64(342) NULL] [INT64(19) INT64(1234)]]`)
}

// TestUpdations tests that update work as expected when foreign key management is enabled in Vitess.
// TestUpdateWithFK tests that update work as expected when foreign key management is enabled in Vitess.
func TestUpdateWithFK(t *testing.T) {
mcmp, closer := start(t)
conn := mcmp.VtConn
Expand Down Expand Up @@ -162,6 +169,125 @@ func TestUpdateWithFK(t *testing.T) {
utils.AssertMatches(t, conn, `select * from u_t3 order by id`, `[[INT64(1) INT64(12)] [INT64(32) INT64(13)]]`)
}

// TestVstreamForFKBinLog tests that dml queries with fks are written with child row first approach in the binary logs.
func TestVstreamForFKBinLog(t *testing.T) {
vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "")
require.NoError(t, err)
defer vtgateConn.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan *binlogdatapb.VEvent)
runVStream(t, ctx, ch, vtgateConn)

mcmp, closer := start(t)
conn := mcmp.VtConn
defer closer()
defer cancel()

utils.Exec(t, conn, `use uks`)

// insert some data.
utils.Exec(t, conn, `insert into u_t1(id, col1) values (1,2), (11,4), (111,6)`)
utils.Exec(t, conn, `insert into u_t2(id, col2) values (2,2), (22,4)`)
utils.Exec(t, conn, `insert into u_t3(id, col3) values (33,4), (333,6)`)
// drain 3 row events.
_ = drainEvents(t, ch, 3)

tcases := []struct {
query string
events int
rowEvents []string
}{{
query: `update u_t1 set col1 = 3 where id = 11`,
events: 3,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"334"} after:{lengths:2 lengths:1 values:"333"}} keyspace:"uks" shard:"0" flags:3`,
`table_name:"uks.u_t2" row_changes:{before:{lengths:2 lengths:1 values:"224"} after:{lengths:2 lengths:-1 values:"22"}} keyspace:"uks" shard:"0" flags:1`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"114"} after:{lengths:2 lengths:1 values:"113"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `update u_t1 set col1 = 5 where id = 11`,
events: 2,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"333"} after:{lengths:2 lengths:1 values:"335"}} keyspace:"uks" shard:"0" flags:3`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"113"} after:{lengths:2 lengths:1 values:"115"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `delete from u_t1 where col1 = 6`,
events: 2,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:3 lengths:1 values:"3336"}} keyspace:"uks" shard:"0" flags:1`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:3 lengths:1 values:"1116"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `update u_t1 set col1 = null where id = 11`,
events: 2,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"335"} after:{lengths:2 lengths:-1 values:"33"}} keyspace:"uks" shard:"0" flags:3`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"115"} after:{lengths:2 lengths:-1 values:"11"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `delete from u_t1 where id = 11`,
events: 1,
rowEvents: []string{
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:-1 values:"11"}} keyspace:"uks" shard:"0" flags:1`,
},
}}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
utils.Exec(t, conn, tcase.query)
// drain row events.
rowEvents := drainEvents(t, ch, tcase.events)
assert.ElementsMatch(t, tcase.rowEvents, rowEvents)
})
}
}

func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, vtgateConn *vtgateconn.VTGateConn) {
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{Keyspace: unshardedKs, Shard: "0", Gtid: "current"},
}}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/u.*",
}},
}
vReader, err := vtgateConn.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, filter, nil)
require.NoError(t, err)

go func() {
for {
evs, err := vReader.Recv()
if err == io.EOF || ctx.Err() != nil {
return
}
require.NoError(t, err)

for _, ev := range evs {
if ev.Type == binlogdatapb.VEventType_ROW {
ch <- ev
}
}
}
}()
}

func drainEvents(t *testing.T, ch chan *binlogdatapb.VEvent, count int) []string {
var rowEvents []string
for i := 0; i < count; i++ {
select {
case re := <-ch:
rowEvents = append(rowEvents, re.RowEvent.String())
case <-time.After(10 * time.Second):
t.Fatalf("timeout waiting for event number: %d", i+1)
}
}
return rowEvents
}

// TestFkScenarios tests the various foreign key scenarios with different constraints
// and makes sure that Vitess works with them as expected. All the tables are present in both sharded and unsharded keyspace
// and all the foreign key constraints are cross-shard ones for the sharded keyspace.
Expand Down
19 changes: 10 additions & 9 deletions go/test/endtoend/vtgate/foreignkey/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/utils"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
shardedKs = "ks"
unshardedKs = "uks"
Cell = "test"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
Cell = "test"
//go:embed sharded_schema.sql
shardedSchemaSQL string

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestMain(m *testing.M) {
SchemaSQL: unshardedSchemaSQL,
VSchema: unshardedVSchema,
}
err = clusterInstance.StartUnshardedKeyspace(*uKs, 0, false)
err = clusterInstance.StartUnshardedKeyspace(*uKs, 1, false)
if err != nil {
return 1
}
Expand All @@ -101,6 +101,7 @@ func TestMain(m *testing.M) {
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)

connParams, closer, err := utils.NewMySQL(clusterInstance, shardedKs, shardedSchemaSQL)
if err != nil {
Expand Down