diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 6d09c174a4d..58fe45547a5 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -143,6 +143,9 @@ func cleanup(t *testing.T) { twopcutil.ClearOutTable(t, vtParams, "twopc_lookup") twopcutil.ClearOutTable(t, vtParams, "lookup_unique") twopcutil.ClearOutTable(t, vtParams, "lookup") + twopcutil.ClearOutTable(t, vtParams, "twopc_consistent_lookup") + twopcutil.ClearOutTable(t, vtParams, "consistent_lookup_unique") + twopcutil.ClearOutTable(t, vtParams, "consistent_lookup") sm.reset() } diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index aff839eabe9..ceff8c16af5 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -30,15 +30,38 @@ create table twopc_lookup create table lookup ( - col varchar(128), + col bigint, id bigint, keyspace_id varbinary(100), - primary key (id) + primary key (col, id) ) Engine = InnoDB; create table lookup_unique ( - col_unique varchar(128), + col_unique bigint, + keyspace_id varbinary(100), + primary key (col_unique) +) Engine = InnoDB; + +create table twopc_consistent_lookup +( + id bigint, + col bigint, + col_unique bigint, + primary key (id) +) Engine=InnoDB; + +create table consistent_lookup +( + col bigint, + id bigint, + keyspace_id varbinary(100), + primary key (col, id) +) Engine = InnoDB; + +create table consistent_lookup_unique +( + col_unique bigint, keyspace_id varbinary(100), primary key (col_unique) ) Engine = InnoDB; diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 72a4f950a5a..a760cfb24b3 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -563,7 +563,11 @@ func compareMaps(t *testing.T, expected, actual map[string][]string, flexibleExp // TestDTResolveAfterMMCommit tests that transaction is committed on recovery // failure after MM commit. func TestDTResolveAfterMMCommit(t *testing.T) { - defer cleanup(t) + initconn, closer := start(t) + defer closer() + + // Do an insertion into a table that has a consistent lookup vindex. + utils.Exec(t, initconn, "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)") vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") require.NoError(t, err) @@ -589,6 +593,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) { require.NoError(t, err) _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil) require.NoError(t, err) + // Also do an update to a table that has a consistent lookup vindex. + // We expect to see only the pre-session changes in the logs. + _, err = conn.Execute(qCtx, "update twopc_consistent_lookup set col = 22 where id = 4", nil) + require.NoError(t, err) // The caller ID is used to simulate the failure at the desired point. newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) @@ -625,7 +633,9 @@ func TestDTResolveAfterMMCommit(t *testing.T) { }, "ks.redo_statement:-40": { "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]", "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]", }, "ks.redo_statement:40-80": { "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", @@ -641,6 +651,12 @@ func TestDTResolveAfterMMCommit(t *testing.T) { `insert:[INT64(7) VARCHAR("foo")]`, `insert:[INT64(9) VARCHAR("baz")]`, }, + "ks.consistent_lookup:-40": { + "insert:[INT64(22) INT64(4) VARBINARY(\" \\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.twopc_consistent_lookup:-40": { + "update:[INT64(4) INT64(22) INT64(6)]", + }, } assert.Equal(t, expectations, logTable, "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) @@ -649,7 +665,11 @@ func TestDTResolveAfterMMCommit(t *testing.T) { // TestDTResolveAfterRMPrepare tests that transaction is rolled back on recovery // failure after RM prepare and before MM commit. func TestDTResolveAfterRMPrepare(t *testing.T) { - defer cleanup(t) + initconn, closer := start(t) + defer closer() + + // Do an insertion into a table that has a consistent lookup vindex. + utils.Exec(t, initconn, "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)") vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") require.NoError(t, err) @@ -671,6 +691,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) { require.NoError(t, err) _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) require.NoError(t, err) + // Also do an update to a table that has a consistent lookup vindex. + // We expect to see only the pre-session changes in the logs. + _, err = conn.Execute(qCtx, "update twopc_consistent_lookup set col = 22 where id = 4", nil) + require.NoError(t, err) // The caller ID is used to simulate the failure at the desired point. newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMPrepared_FailNow", "", ""), nil) @@ -693,16 +717,29 @@ func TestDTResolveAfterRMPrepare(t *testing.T) { }, "ks.dt_participant:80-": { "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", }, "ks.redo_state:40-80": { "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", }, + "ks.redo_state:-40": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, "ks.redo_statement:40-80": { "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", }, + "ks.redo_statement:-40": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]", + }, + "ks.consistent_lookup:-40": { + "insert:[INT64(22) INT64(4) VARBINARY(\" \\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, } assert.Equal(t, expectations, logTable, "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) @@ -1491,8 +1528,8 @@ func TestVindexes(t *testing.T) { "update:[INT64(6) INT64(9) INT64(9)]", }, "ks.lookup:80-": { - "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", - "insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, }, }, @@ -1519,8 +1556,8 @@ func TestVindexes(t *testing.T) { "update:[INT64(6) INT64(4) INT64(20)]", }, "ks.lookup_unique:80-": { - "delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", - "insert:[VARCHAR(\"20\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(20) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, }, }, @@ -1547,10 +1584,10 @@ func TestVindexes(t *testing.T) { "delete:[INT64(6) INT64(4) INT64(9)]", }, "ks.lookup_unique:80-": { - "delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, "ks.lookup:80-": { - "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, }, }, @@ -1572,10 +1609,10 @@ func TestVindexes(t *testing.T) { "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", }, "ks.lookup:80-": { - "insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, "ks.lookup_unique:-40": { - "insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, "ks.twopc_lookup:-40": { "insert:[INT64(20) INT64(4) INT64(22)]", @@ -1625,16 +1662,130 @@ func TestVindexes(t *testing.T) { "delete:[INT64(9) INT64(4) INT64(4)]", }, "ks.lookup_unique:-40": { - "insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, "ks.lookup_unique:80-": { - "delete:[VARCHAR(\"4\") VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, "ks.lookup:80-": { - "insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", - "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", - "insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", - "delete:[VARCHAR(\"4\") INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Consistent Lookup Single Update", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_consistent_lookup set col = 9 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.twopc_consistent_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.consistent_lookup:80-": { + "insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Consistent Lookup-Unique Single Update", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_consistent_lookup set col_unique = 20 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.twopc_consistent_lookup:40-80": { + "update:[INT64(6) INT64(4) INT64(20)]", + }, + "ks.consistent_lookup_unique:80-": { + "insert:[INT64(20) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Consistent Lookup And Consistent Lookup-Unique Single Delete", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "delete from twopc_consistent_lookup where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.twopc_consistent_lookup:40-80": { + "delete:[INT64(6) INT64(4) INT64(9)]", + }, + "ks.consistent_lookup_unique:80-": { + "delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.consistent_lookup:80-": { + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Consistent Lookup And Consistent Lookup-Unique Mix", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "insert into twopc_consistent_lookup(id, col, col_unique) values(20, 4, 22)", + "update twopc_consistent_lookup set col = 9 where col_unique = 9", + "delete from twopc_consistent_lookup where id = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_consistent_lookup:-40": { + "insert:[INT64(20) INT64(4) INT64(22)]", + }, + "ks.twopc_consistent_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.twopc_consistent_lookup:80-": { + "delete:[INT64(9) INT64(4) INT64(4)]", + }, + "ks.consistent_lookup_unique:-40": { + "insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.consistent_lookup_unique:80-": { + "delete:[INT64(4) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.consistent_lookup:80-": { + "insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", }, }, }, diff --git a/go/test/endtoend/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json index 0c22f40d54b..e6bbbfdb9a4 100644 --- a/go/test/endtoend/transaction/twopc/vschema.json +++ b/go/test/endtoend/transaction/twopc/vschema.json @@ -24,6 +24,24 @@ "to": "keyspace_id" }, "owner": "twopc_lookup" + }, + "consistent_lookup_vdx": { + "type": "consistent_lookup", + "params": { + "table": "consistent_lookup", + "from": "col,id", + "to": "keyspace_id" + }, + "owner": "twopc_consistent_lookup" + }, + "consistent_lookup_unique_vdx": { + "type": "consistent_lookup_unique", + "params": { + "table": "consistent_lookup_unique", + "from": "col_unique", + "to": "keyspace_id" + }, + "owner": "twopc_consistent_lookup" } }, "tables": { @@ -85,6 +103,41 @@ "name": "xxhash" } ] + }, + "twopc_consistent_lookup": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + }, + { + "columns": [ + "col", + "id" + ], + "name": "consistent_lookup_vdx" + }, + { + "column": "col_unique", + "name": "consistent_lookup_unique_vdx" + } + ] + }, + "consistent_lookup": { + "column_vindexes": [ + { + "column": "col", + "name": "xxhash" + } + ] + }, + "consistent_lookup_unique": { + "column_vindexes": [ + { + "column": "col_unique", + "name": "xxhash" + } + ] } } } \ No newline at end of file diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index d3ab28d6600..74bfb710582 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -2862,57 +2862,6 @@ func TestExecutorSettingsInTwoPC(t *testing.T) { } } -// TestExecutorRejectTwoPC test all the unsupported cases for multi-shard atomic commit. -func TestExecutorRejectTwoPC(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) - tcases := []struct { - sqls []string - testRes []*sqltypes.Result - - expErr string - }{ - { - sqls: []string{ - `update t1 set unq_col = 1 where id = 1`, - `update t1 set unq_col = 1 where id = 3`, - }, - testRes: []*sqltypes.Result{ - sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|unq_col|unchanged", "int64|int64|int64"), - "1|2|0"), - }, - expErr: "VT12001: unsupported: atomic distributed transaction commit with consistent lookup vindex", - }, - } - - for _, tcase := range tcases { - t.Run(fmt.Sprintf("%v", tcase.sqls), func(t *testing.T) { - sbc1.SetResults(tcase.testRes) - sbc2.SetResults(tcase.testRes) - - // create a new session - session := econtext.NewSafeSession(&vtgatepb.Session{ - TargetString: KsTestSharded, - TransactionMode: vtgatepb.TransactionMode_TWOPC, - EnableSystemSettings: true, - }) - - // start transaction - _, err := executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, "begin", nil) - require.NoError(t, err) - - // execute queries - for _, sql := range tcase.sqls { - _, err = executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, sql, nil) - require.NoError(t, err) - } - - // commit 2pc - _, err = executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, "commit", nil) - require.ErrorContains(t, err, tcase.expErr) - }) - } -} - func TestExecutorTruncateErrors(t *testing.T) { executor, _, _, _, ctx := createExecutorEnv(t) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 3ce138bc0e4..cadb1392eca 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -118,10 +118,34 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er } defer recordCommitTime(session, twopc, time.Now()) + + err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard) + if err != nil { + _ = txc.Release(ctx, session) + return err + } + if twopc { - return txc.commit2PC(ctx, session) + err = txc.commit2PC(ctx, session) + } else { + err = txc.commitNormal(ctx, session) + } + + if err != nil { + _ = txc.Release(ctx, session) + return err } - return txc.commitNormal(ctx, session) + + err = txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard) + if err != nil { + // If last commit fails, there will be nothing to rollback. + session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)}) + // With reserved connection we should release them. + if session.InReservedConn() { + _ = txc.Release(ctx, session) + } + } + return nil } func recordCommitTime(session *econtext.SafeSession, twopc bool, startTime time.Time) { @@ -165,11 +189,6 @@ func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSes } func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSession) error { - if err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard); err != nil { - _ = txc.Release(ctx, session) - return err - } - // Retain backward compatibility on commit order for the normal session. for i, shardSession := range session.ShardSessions { if err := txc.commitShard(ctx, shardSession, session.GetLogger()); err != nil { @@ -193,19 +212,9 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSessi }) warnings.Add("NonAtomicCommit", 1) } - _ = txc.Release(ctx, session) return err } } - - if err := txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard); err != nil { - // If last commit fails, there will be nothing to rollback. - session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)}) - // With reserved connection we should release them. - if session.InReservedConn() { - _ = txc.Release(ctx, session) - } - } return nil } @@ -216,11 +225,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) return txc.commitNormal(ctx, session) } - if err := txc.checkValidCondition(session); err != nil { - _ = txc.Rollback(ctx, session) - return err - } - mmShard := session.ShardSessions[0] rmShards := session.ShardSessions[1:] dtid := dtids.New(mmShard) @@ -301,13 +305,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) return nil } -func (txc *TxConn) checkValidCondition(session *econtext.SafeSession) error { - if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 { - return vterrors.VT12001("atomic distributed transaction commit with consistent lookup vindex") - } - return nil -} - func (txc *TxConn) errActionAndLogWarn( ctx context.Context, session *econtext.SafeSession,