diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index cdb6b61f91a..c891142f8f6 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -1028,19 +1028,22 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) { // cleanup all the old data. conn, closer := start(t) defer closer() +<<<<<<< HEAD out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none") require.NoError(t, err, out) defer clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") +======= + defer func() { + reparentAllShards(t, clusterInstance, 0) + }() +>>>>>>> a3d400177f (Ensure PRS runs for all the shards in `TestSemiSyncRequiredWithTwoPC` (#17384)) - // After changing the durability policy for the given keyspace to none, we run PRS. - shard := clusterInstance.Keyspaces[0].Shards[2] - newPrimary := shard.Vttablets[1] - _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( - "PlannedReparentShard", - fmt.Sprintf("%s/%s", keyspaceName, shard.Name), - "--new-primary", newPrimary.Alias) - require.NoError(t, err) + reparentAllShards(t, clusterInstance, 0) + out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none") + require.NoError(t, err, out) + // After changing the durability policy for the given keyspace to none, we run PRS to ensure the changes have taken effect. + reparentAllShards(t, clusterInstance, 1) // A new distributed transaction should fail. utils.Exec(t, conn, "begin") @@ -1050,4 +1053,326 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) { _, err = utils.ExecAllowError(t, conn, "commit") require.Error(t, err) require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not") +<<<<<<< HEAD +======= + + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") + require.NoError(t, err) + reparentAllShards(t, clusterInstance, 0) + + // Transaction should now succeed. + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + _, err = utils.ExecAllowError(t, conn, "commit") + require.NoError(t, err) +} + +// reparentAllShards reparents all the shards to the given tablet index for that shard. +func reparentAllShards(t *testing.T, clusterInstance *cluster.LocalProcessCluster, idx int) { + for _, shard := range clusterInstance.Keyspaces[0].Shards { + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, shard.Vttablets[idx].Alias) + require.NoError(t, err) + } +} + +// TestReadTransactionStatus tests that read transaction state rpc works as expected. +func TestReadTransactionStatus(t *testing.T) { + conn, closer := start(t) + defer closer() + defer conn.Close() + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) + + // We create a multi shard commit and delay its commit on one of the shards. + // This allows us to query to transaction status and actually get some data back. + var wg sync.WaitGroup + twopcutil.RunMultiShardCommitWithDelay(t, conn, "10", &wg, []string{ + "begin", + "insert into twopc_t1(id, col) values(4, 4)", + "insert into twopc_t1(id, col) values(6, 4)", + "insert into twopc_t1(id, col) values(9, 4)", + }) + + // Create a tablet manager client and use it to read the transaction state. + tmc := grpctmclient.NewClient() + defer tmc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + primaryTablet := getTablet(clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().GrpcPort) + // Wait for the transaction to show up in the unresolved list. + var unresTransaction *querypb.TransactionMetadata + timeout := time.After(10 * time.Second) + for { + for _, shard := range clusterInstance.Keyspaces[0].Shards { + urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1) + require.NoError(t, err) + if len(urtRes) > 0 { + unresTransaction = urtRes[0] + } + } + if unresTransaction != nil { + break + } + select { + case <-timeout: + require.Fail(t, "timed out waiting for unresolved transaction") + default: + } + } + require.NotNil(t, unresTransaction) + res, err := tmc.GetTransactionInfo(ctx, primaryTablet, unresTransaction.Dtid) + require.NoError(t, err) + assert.Equal(t, "PREPARED", res.State) + assert.Equal(t, "", res.Message) + assert.Equal(t, []string{"insert into twopc_t1(id, col) values (9, 4)"}, res.Statements) + + // Also try running the RPC from vtctld and verify we see the same values. + out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", + "Read", + fmt.Sprintf(`--dtid=%s`, unresTransaction.Dtid), + ) + require.NoError(t, err) + require.Contains(t, out, "insert into twopc_t1(id, col) values (9, 4)") + require.Contains(t, out, unresTransaction.Dtid) + + // Read the data from vtadmin API, and verify that too has the same information. + apiRes := clusterInstance.VtadminProcess.MakeAPICallRetry(t, fmt.Sprintf("/api/transaction/local/%v/info", unresTransaction.Dtid)) + require.Contains(t, apiRes, "insert into twopc_t1(id, col) values (9, 4)") + require.Contains(t, apiRes, unresTransaction.Dtid) + require.Contains(t, apiRes, strconv.FormatInt(res.TimeCreated, 10)) + + // Wait for the commit to have returned. + wg.Wait() +} + +// TestVindexes tests that different vindexes work well with two-phase commit. +func TestVindexes(t *testing.T) { + testcases := []struct { + name string + initQueries []string + testQueries []string + logExpected map[string][]string + }{ + { + name: "Lookup Single Update", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_lookup set col = 9 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + }, + "ks.twopc_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.lookup:80-": { + "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Lookup-Unique Single Update", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_lookup set col_unique = 20 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup_unique(col_unique, keyspace_id) values (20, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup_unique(col_unique, keyspace_id) values (20, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + }, + "ks.twopc_lookup:40-80": { + "update:[INT64(6) INT64(4) INT64(20)]", + }, + "ks.lookup_unique:80-": { + "delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[VARCHAR(\"20\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Lookup And Lookup-Unique Single Delete", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "delete from twopc_lookup where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + }, + "ks.twopc_lookup:40-80": { + "delete:[INT64(6) INT64(4) INT64(9)]", + }, + "ks.lookup_unique:80-": { + "delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup:80-": { + "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Lookup And Lookup-Unique Single Insertion", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "insert into twopc_lookup(id, col, col_unique) values(20, 4, 22)", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + }, + "ks.lookup:80-": { + "insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup_unique:-40": { + "insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.twopc_lookup:-40": { + "insert:[INT64(20) INT64(4) INT64(22)]", + }, + }, + }, + { + name: "Lookup And Lookup-Unique Mix", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "insert into twopc_lookup(id, col, col_unique) values(20, 4, 22)", + "update twopc_lookup set col = 9 where col_unique = 9", + "delete from twopc_lookup where id = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(3) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(4) BLOB(\"delete from lookup where col = 4 and id = 9 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(5) BLOB(\"delete from lookup_unique where col_unique = 4 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(6) BLOB(\"delete from twopc_lookup where id = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(3) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(4) BLOB(\"delete from lookup where col = 4 and id = 9 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(5) BLOB(\"delete from lookup_unique where col_unique = 4 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(6) BLOB(\"delete from twopc_lookup where id = 9 limit 10001 /* INT64 */\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"update twopc_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"update twopc_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_lookup:-40": { + "insert:[INT64(20) INT64(4) INT64(22)]", + }, + "ks.twopc_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.twopc_lookup:80-": { + "delete:[INT64(9) INT64(4) INT64(4)]", + }, + "ks.lookup_unique:-40": { + "insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup_unique:80-": { + "delete:[VARCHAR(\"4\") VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup:80-": { + "insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[VARCHAR(\"4\") INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // initial insert + for _, query := range tt.initQueries { + execute(qCtx, t, conn, query) + } + + // ignore initial change + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + _ = retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + + // Insert into multiple shards + for _, query := range tt.testQueries { + execute(qCtx, t, conn, query) + } + + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. + logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + for key, val := range tt.logExpected { + assert.EqualValues(t, val, logTable[key], key) + } + }) + } +} + +func getTablet(tabletGrpcPort int) *tabletpb.Tablet { + portMap := make(map[string]int32) + portMap["grpc"] = int32(tabletGrpcPort) + return &tabletpb.Tablet{Hostname: hostname, PortMap: portMap} +>>>>>>> a3d400177f (Ensure PRS runs for all the shards in `TestSemiSyncRequiredWithTwoPC` (#17384)) }