Skip to content

Commit

Permalink
reject atomic distributed transaction on savepoints and modified syst…
Browse files Browse the repository at this point in the history
…em settings (vitessio#16835)

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Sep 27, 2024
1 parent 670192d commit 3e53713
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 6 deletions.
70 changes: 70 additions & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2780,6 +2780,76 @@ func TestExecutorPrepareExecute(t *testing.T) {
require.Error(t, err)
}

// TestExecutorRejectTwoPC test all the unsupported cases for multi-shard atomic commit.
func TestExecutorRejectTwoPC(t *testing.T) {
executor, sbc1, sbc2, _, ctx := createExecutorEnv(t)
tcases := []struct {
sqls []string
testRes []*sqltypes.Result

expErr string
}{
{
sqls: []string{
`set time_zone = "+08:00"`,
`insert into user_extra(user_id) values (1)`,
`insert into user_extra(user_id) values (2)`,
`insert into user_extra(user_id) values (3)`,
},
expErr: "VT12001: unsupported: atomic distributed transaction commit with system settings",
}, {
sqls: []string{
`update t1 set unq_col = 1 where id = 1`,
`update t1 set unq_col = 1 where id = 3`,
},
testRes: []*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|unq_col|unchanged", "int64|int64|int64"),
"1|2|0"),
},
expErr: "VT12001: unsupported: atomic distributed transaction commit with consistent lookup vindex",
}, {
sqls: []string{
`savepoint x`,
`insert into user_extra(user_id) values (1)`,
`insert into user_extra(user_id) values (3)`,
},
testRes: []*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|unq_col|unchanged", "int64|int64|int64"),
"1|2|0"),
},
expErr: "VT12001: unsupported: atomic distributed transaction commit with savepoint",
},
}

for _, tcase := range tcases {
t.Run(fmt.Sprintf("%v", tcase.sqls), func(t *testing.T) {
sbc1.SetResults(tcase.testRes)
sbc2.SetResults(tcase.testRes)

// create a new session
session := NewSafeSession(&vtgatepb.Session{
TargetString: KsTestSharded,
TransactionMode: vtgatepb.TransactionMode_TWOPC,
EnableSystemSettings: true,
})

// start transaction
_, err := executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, "begin", nil)
require.NoError(t, err)

// execute queries
for _, sql := range tcase.sqls {
_, err = executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, sql, nil)
require.NoError(t, err)
}

// commit 2pc
_, err = executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, "commit", nil)
require.ErrorContains(t, err, tcase.expErr)
})
}
}

func TestExecutorTruncateErrors(t *testing.T) {
executor, _, _, _, ctx := createExecutorEnv(t)

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/legacy_scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,6 @@ func newTestScatterConn(ctx context.Context, hc discovery.HealthCheck, serv srvt
// in '-cells_to_watch' command line parameter, which is
// empty by default. So it's unused in this test, set to nil.
gw := NewTabletGateway(ctx, hc, serv, cell)
tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC)
tc := NewTxConn(gw, vtgatepb.TransactionMode_MULTI)
return NewScatterConn("", tc, gw)
}
23 changes: 18 additions & 5 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,16 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error

// commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice
func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err error) {
if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 {
_ = txc.Rollback(ctx, session)
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "pre or post actions not allowed for 2PC commits")
}

// If the number of participants is one or less, then it's a normal commit.
if len(session.ShardSessions) <= 1 {
return txc.commitNormal(ctx, session)
}

if err := txc.checkValidCondition(session); err != nil {
_ = txc.Rollback(ctx, session)
return err
}

mmShard := session.ShardSessions[0]
rmShards := session.ShardSessions[1:]
dtid := dtids.New(mmShard)
Expand Down Expand Up @@ -276,6 +276,19 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err
return nil
}

func (txc *TxConn) checkValidCondition(session *SafeSession) error {
if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 {
return vterrors.VT12001("atomic distributed transaction commit with consistent lookup vindex")
}
if len(session.GetSavepoints()) != 0 {
return vterrors.VT12001("atomic distributed transaction commit with savepoint")
}
if session.GetInReservedConn() {
return vterrors.VT12001("atomic distributed transaction commit with system settings")
}
return nil
}

func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) {
switch txPhase {
case Commit2pcCreateTransaction:
Expand Down

0 comments on commit 3e53713

Please sign in to comment.