Skip to content

Commit

Permalink
Consistent lookup vindex tests for atomic distributed transactions (v…
Browse files Browse the repository at this point in the history
…itessio#17393)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Dec 18, 2024
1 parent c48d761 commit 8652490
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 99 deletions.
3 changes: 3 additions & 0 deletions go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func cleanup(t *testing.T) {
twopcutil.ClearOutTable(t, vtParams, "twopc_lookup")
twopcutil.ClearOutTable(t, vtParams, "lookup_unique")
twopcutil.ClearOutTable(t, vtParams, "lookup")
twopcutil.ClearOutTable(t, vtParams, "twopc_consistent_lookup")
twopcutil.ClearOutTable(t, vtParams, "consistent_lookup_unique")
twopcutil.ClearOutTable(t, vtParams, "consistent_lookup")
sm.reset()
}

Expand Down
29 changes: 26 additions & 3 deletions go/test/endtoend/transaction/twopc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,38 @@ create table twopc_lookup

create table lookup
(
col varchar(128),
col bigint,
id bigint,
keyspace_id varbinary(100),
primary key (id)
primary key (col, id)
) Engine = InnoDB;

create table lookup_unique
(
col_unique varchar(128),
col_unique bigint,
keyspace_id varbinary(100),
primary key (col_unique)
) Engine = InnoDB;

create table twopc_consistent_lookup
(
id bigint,
col bigint,
col_unique bigint,
primary key (id)
) Engine=InnoDB;

create table consistent_lookup
(
col bigint,
id bigint,
keyspace_id varbinary(100),
primary key (col, id)
) Engine = InnoDB;

create table consistent_lookup_unique
(
col_unique bigint,
keyspace_id varbinary(100),
primary key (col_unique)
) Engine = InnoDB;
183 changes: 167 additions & 16 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,11 @@ func compareMaps(t *testing.T, expected, actual map[string][]string, flexibleExp
// TestDTResolveAfterMMCommit tests that transaction is committed on recovery
// failure after MM commit.
func TestDTResolveAfterMMCommit(t *testing.T) {
defer cleanup(t)
initconn, closer := start(t)
defer closer()

// Do an insertion into a table that has a consistent lookup vindex.
utils.Exec(t, initconn, "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)")

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
Expand All @@ -589,6 +593,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil)
require.NoError(t, err)
// Also do an update to a table that has a consistent lookup vindex.
// We expect to see only the pre-session changes in the logs.
_, err = conn.Execute(qCtx, "update twopc_consistent_lookup set col = 22 where id = 4", nil)
require.NoError(t, err)

// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil)
Expand Down Expand Up @@ -625,7 +633,9 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
Expand All @@ -641,6 +651,12 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
`insert:[INT64(7) VARCHAR("foo")]`,
`insert:[INT64(9) VARCHAR("baz")]`,
},
"ks.consistent_lookup:-40": {
"insert:[INT64(22) INT64(4) VARBINARY(\" \\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.twopc_consistent_lookup:-40": {
"update:[INT64(4) INT64(22) INT64(6)]",
},
}
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
Expand All @@ -649,7 +665,11 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
// TestDTResolveAfterRMPrepare tests that transaction is rolled back on recovery
// failure after RM prepare and before MM commit.
func TestDTResolveAfterRMPrepare(t *testing.T) {
defer cleanup(t)
initconn, closer := start(t)
defer closer()

// Do an insertion into a table that has a consistent lookup vindex.
utils.Exec(t, initconn, "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)")

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
Expand All @@ -671,6 +691,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) {
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil)
require.NoError(t, err)
// Also do an update to a table that has a consistent lookup vindex.
// We expect to see only the pre-session changes in the logs.
_, err = conn.Execute(qCtx, "update twopc_consistent_lookup set col = 22 where id = 4", nil)
require.NoError(t, err)

// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMPrepared_FailNow", "", ""), nil)
Expand All @@ -693,16 +717,29 @@ func TestDTResolveAfterRMPrepare(t *testing.T) {
},
"ks.dt_participant:80-": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]",
},
"ks.redo_state:40-80": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_state:-40": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
},
"ks.consistent_lookup:-40": {
"insert:[INT64(22) INT64(4) VARBINARY(\" \\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
}
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
Expand Down Expand Up @@ -1491,8 +1528,8 @@ func TestVindexes(t *testing.T) {
"update:[INT64(6) INT64(9) INT64(9)]",
},
"ks.lookup:80-": {
"delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
Expand All @@ -1519,8 +1556,8 @@ func TestVindexes(t *testing.T) {
"update:[INT64(6) INT64(4) INT64(20)]",
},
"ks.lookup_unique:80-": {
"delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[VARCHAR(\"20\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(20) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
Expand All @@ -1547,10 +1584,10 @@ func TestVindexes(t *testing.T) {
"delete:[INT64(6) INT64(4) INT64(9)]",
},
"ks.lookup_unique:80-": {
"delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup:80-": {
"delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
Expand All @@ -1572,10 +1609,10 @@ func TestVindexes(t *testing.T) {
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
},
"ks.lookup:80-": {
"insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup_unique:-40": {
"insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.twopc_lookup:-40": {
"insert:[INT64(20) INT64(4) INT64(22)]",
Expand Down Expand Up @@ -1625,16 +1662,130 @@ func TestVindexes(t *testing.T) {
"delete:[INT64(9) INT64(4) INT64(4)]",
},
"ks.lookup_unique:-40": {
"insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup_unique:80-": {
"delete:[VARCHAR(\"4\") VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup:80-": {
"insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[VARCHAR(\"4\") INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Consistent Lookup Single Update",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"update twopc_consistent_lookup set col = 9 where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.twopc_consistent_lookup:40-80": {
"update:[INT64(6) INT64(9) INT64(9)]",
},
"ks.consistent_lookup:80-": {
"insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Consistent Lookup-Unique Single Update",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"update twopc_consistent_lookup set col_unique = 20 where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.twopc_consistent_lookup:40-80": {
"update:[INT64(6) INT64(4) INT64(20)]",
},
"ks.consistent_lookup_unique:80-": {
"insert:[INT64(20) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Consistent Lookup And Consistent Lookup-Unique Single Delete",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"delete from twopc_consistent_lookup where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.twopc_consistent_lookup:40-80": {
"delete:[INT64(6) INT64(4) INT64(9)]",
},
"ks.consistent_lookup_unique:80-": {
"delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.consistent_lookup:80-": {
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Consistent Lookup And Consistent Lookup-Unique Mix",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"insert into twopc_consistent_lookup(id, col, col_unique) values(20, 4, 22)",
"update twopc_consistent_lookup set col = 9 where col_unique = 9",
"delete from twopc_consistent_lookup where id = 9",
"commit",
},
logExpected: map[string][]string{
"ks.redo_statement:80-": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]",
},
"ks.twopc_consistent_lookup:-40": {
"insert:[INT64(20) INT64(4) INT64(22)]",
},
"ks.twopc_consistent_lookup:40-80": {
"update:[INT64(6) INT64(9) INT64(9)]",
},
"ks.twopc_consistent_lookup:80-": {
"delete:[INT64(9) INT64(4) INT64(4)]",
},
"ks.consistent_lookup_unique:-40": {
"insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.consistent_lookup_unique:80-": {
"delete:[INT64(4) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.consistent_lookup:80-": {
"insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
Expand Down
Loading

0 comments on commit 8652490

Please sign in to comment.