Skip to content

Commit

Permalink
Track shard session affecting change inside the transaction (#17266)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Dec 4, 2024
1 parent 551a5f7 commit 1cb39b0
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 314 deletions.
373 changes: 192 additions & 181 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions go/vt/proto/vtgate/vtgate_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3140,3 +3140,62 @@ func TestDeleteMultiTable(t *testing.T) {
// delete from `user` where (`user`.id) in ::dml_vals - 1 shard
testQueryLog(t, executor, logChan, "TestExecute", "DELETE", "delete `user` from `user` join music on `user`.col = music.col where music.user_id = 1", 18)
}

// TestSessionRowsAffected test that rowsAffected is set correctly for each shard session.
func TestSessionRowsAffected(t *testing.T) {
method := t.Name()
executor, _, sbc4060, _, ctx := createExecutorEnv(t)

session := NewAutocommitSession(&vtgatepb.Session{})

// start the transaction
_, err := executor.Execute(ctx, nil, method, session, "begin", nil)
require.NoError(t, err)

// -20 - select query
_, err = executor.Execute(ctx, nil, method, session, "select * from user where id = 1", nil)
require.NoError(t, err)
require.Len(t, session.ShardSessions, 1)
require.False(t, session.ShardSessions[0].RowsAffected)

// -20 - update query (rows affected)
_, err = executor.Execute(ctx, nil, method, session, "update user set foo = 41 where id = 1", nil)
require.NoError(t, err)
require.True(t, session.ShardSessions[0].RowsAffected)

// e0- - select query
_, err = executor.Execute(ctx, nil, method, session, "select * from user where id = 7", nil)
require.NoError(t, err)
assert.Len(t, session.ShardSessions, 2)
require.False(t, session.ShardSessions[1].RowsAffected)

// c0-e0 - update query (rows affected)
_, err = executor.Execute(ctx, nil, method, session, "update user set foo = 42 where id = 5", nil)
require.NoError(t, err)
require.Len(t, session.ShardSessions, 3)
require.True(t, session.ShardSessions[2].RowsAffected)

// 40-60 - update query (no rows affected)
sbc4060.SetResults([]*sqltypes.Result{{RowsAffected: 0}})
_, err = executor.Execute(ctx, nil, method, session, "update user set foo = 42 where id = 3", nil)
require.NoError(t, err)
assert.Len(t, session.ShardSessions, 4)
require.False(t, session.ShardSessions[3].RowsAffected)

// 40-60 - select query
_, err = executor.Execute(ctx, nil, method, session, "select * from user where id = 3", nil)
require.NoError(t, err)
require.False(t, session.ShardSessions[3].RowsAffected)

// 40-60 - delete query (rows affected)
_, err = executor.Execute(ctx, nil, method, session, "delete from user where id = 3", nil)
require.NoError(t, err)
require.True(t, session.ShardSessions[0].RowsAffected)
require.False(t, session.ShardSessions[1].RowsAffected)
require.True(t, session.ShardSessions[2].RowsAffected)
require.True(t, session.ShardSessions[3].RowsAffected)

_, err = executor.Execute(ctx, nil, method, session, "commit", nil)
require.NoError(t, err)
require.Zero(t, session.ShardSessions)
}
192 changes: 107 additions & 85 deletions go/vt/vtgate/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,19 @@ import (
"sync"
"time"

"vitess.io/vitess/go/sqltypes"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/datetime"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/sysvars"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type (
Expand Down Expand Up @@ -347,73 +344,81 @@ func (session *SafeSession) InTransaction() bool {
return session.Session.InTransaction
}

// FindAndChangeSessionIfInSingleTxMode returns the transactionId and tabletAlias, if any, for a session
// modifies the shard session in a specific case for single mode transaction.
func (session *SafeSession) FindAndChangeSessionIfInSingleTxMode(keyspace, shard string, tabletType topodatapb.TabletType, txMode vtgatepb.TransactionMode) (int64, int64, *topodatapb.TabletAlias, error) {
// FindAndChangeSessionIfInSingleTxMode retrieves the ShardSession matching the given keyspace, shard, and tablet type.
// It performs additional checks and may modify the ShardSession in specific cases for single-mode transactions.
//
// Key behavior:
// 1. Retrieves the appropriate list of sessions (PreSessions, PostSessions, or default ShardSessions) based on the commit order.
// 2. Identifies a matching session by keyspace, shard, and tablet type.
// 3. If the session meets specific conditions (e.g., non-vindex-only, single transaction mode), it updates the session state:
// - Converts a vindex-only session to a standard session if required by the transaction type.
// - If a multi-shard transaction is detected in Single mode, marks the session for rollback and returns an error.
//
// Parameters:
// - keyspace: The keyspace of the target shard.
// - shard: The shard name of the target.
// - tabletType: The type of the tablet for the shard session.
// - txMode: The transaction mode (e.g., Single, Multi).
//
// Returns:
// - The matching ShardSession, if found and valid for the operation.
// - An error if a Single-mode transaction attempts to span multiple shards.
func (session *SafeSession) FindAndChangeSessionIfInSingleTxMode(keyspace, shard string, tabletType topodatapb.TabletType, txMode vtgatepb.TransactionMode) (*vtgatepb.Session_ShardSession, error) {
session.mu.Lock()
defer session.mu.Unlock()
sessions := session.ShardSessions

shardSession := session.findSessionLocked(keyspace, shard, tabletType)

if shardSession == nil {
return nil, nil
}

if !shardSession.VindexOnly {
return shardSession, nil
}

if err := session.singleModeErrorOnCrossShard(txMode, 0); err != nil {
return nil, err
}

// the shard session is now used by non-vindex query as well,
// so it is not an exclusive vindex only shard session anymore.
shardSession.VindexOnly = false
return shardSession, nil
}

func (session *SafeSession) findSessionLocked(keyspace, shard string, tabletType topodatapb.TabletType) *vtgatepb.Session_ShardSession {
// Select the appropriate session list based on the commit order.
var sessions []*vtgatepb.Session_ShardSession
switch session.commitOrder {
case vtgatepb.CommitOrder_PRE:
sessions = session.PreSessions
case vtgatepb.CommitOrder_POST:
sessions = session.PostSessions
default:
sessions = session.ShardSessions
}

// Find and return the matching shard session.
for _, shardSession := range sessions {
if keyspace == shardSession.Target.Keyspace && tabletType == shardSession.Target.TabletType && shard == shardSession.Target.Shard {
if txMode != vtgatepb.TransactionMode_SINGLE || !shardSession.VindexOnly || session.queryFromVindex {
return shardSession.TransactionId, shardSession.ReservedId, shardSession.TabletAlias, nil
}
count := actualNoOfShardSession(session.ShardSessions)
// If the count of shard session which are non vindex only is greater than 0, then it is a
if count > 0 {
session.mustRollback = true
return 0, 0, nil, vterrors.Errorf(vtrpcpb.Code_ABORTED, "multi-db transaction attempted: %v", session.ShardSessions)
}
// the shard session is now used by non-vindex query as well,
// so it is not an exclusive vindex only shard session anymore.
shardSession.VindexOnly = false
return shardSession.TransactionId, shardSession.ReservedId, shardSession.TabletAlias, nil
}
}
return 0, 0, nil, nil
}

func addOrUpdate(shardSession *vtgatepb.Session_ShardSession, sessions []*vtgatepb.Session_ShardSession) ([]*vtgatepb.Session_ShardSession, error) {
appendSession := true
for i, sess := range sessions {
targetedAtSameTablet := sess.Target.Keyspace == shardSession.Target.Keyspace &&
sess.Target.TabletType == shardSession.Target.TabletType &&
sess.Target.Shard == shardSession.Target.Shard
if targetedAtSameTablet {
if !proto.Equal(sess.TabletAlias, shardSession.TabletAlias) {
errorDetails := fmt.Sprintf("got non-matching aliases (%v vs %v) for the same target (keyspace: %v, tabletType: %v, shard: %v)",
sess.TabletAlias, shardSession.TabletAlias,
sess.Target.Keyspace, sess.Target.TabletType, sess.Target.Shard)
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, errorDetails)
}
// replace the old info with the new one
sessions[i] = shardSession
appendSession = false
break
if shardSession.Target.Keyspace == keyspace &&
shardSession.Target.Shard == shard &&
shardSession.Target.TabletType == tabletType {
return shardSession
}
}
if appendSession {
sessions = append(sessions, shardSession)
}

return sessions, nil
return nil
}

// AppendOrUpdate adds a new ShardSession, or updates an existing one if one already exists for the given shard session
func (session *SafeSession) AppendOrUpdate(shardSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error {
func (session *SafeSession) AppendOrUpdate(target *querypb.Target, info *shardActionInfo, existingSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error {
session.mu.Lock()
defer session.mu.Unlock()

// additional check of transaction id is required
// as now in autocommit mode there can be session due to reserved connection
// that needs to be stored as shard session.
if session.autocommitState == autocommitted && shardSession.TransactionId != 0 {
if session.autocommitState == autocommitted && info.transactionID != 0 {
// Should be unreachable
return vterrors.VT13001("unexpected 'autocommitted' state in transaction")
}
Expand All @@ -423,45 +428,62 @@ func (session *SafeSession) AppendOrUpdate(shardSession *vtgatepb.Session_ShardS
}
session.autocommitState = notAutocommittable

// Always append, in order for rollback to succeed.
switch session.commitOrder {
case vtgatepb.CommitOrder_NORMAL:
if session.queryFromVindex {
shardSession.VindexOnly = true
if existingSession != nil {
existingSession.TransactionId = info.transactionID
existingSession.ReservedId = info.reservedID
if !existingSession.RowsAffected {
existingSession.RowsAffected = info.rowsAffected
}
newSessions, err := addOrUpdate(shardSession, session.ShardSessions)
if err != nil {
if existingSession.VindexOnly {
existingSession.VindexOnly = session.queryFromVindex
}
if err := session.singleModeErrorOnCrossShard(txMode, 1); err != nil {
return err
}
session.ShardSessions = newSessions
return nil
}
newSession := &vtgatepb.Session_ShardSession{
Target: target,
TabletAlias: info.alias,
TransactionId: info.transactionID,
ReservedId: info.reservedID,
RowsAffected: info.rowsAffected,
VindexOnly: session.queryFromVindex,
}

if session.queryFromVindex {
break
}
// isSingle is enforced only for normal commit order operations.
if session.isSingleDB(txMode) && len(session.ShardSessions) > 1 {
count := actualNoOfShardSession(session.ShardSessions)
if count <= 1 {
break
}
session.mustRollback = true
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "multi-db transaction attempted: %v", session.ShardSessions)
}
case vtgatepb.CommitOrder_PRE:
newSessions, err := addOrUpdate(shardSession, session.PreSessions)
if err != nil {
// Always append, in order for rollback to succeed.
switch session.commitOrder {
case vtgatepb.CommitOrder_NORMAL:
session.ShardSessions = append(session.ShardSessions, newSession)
if err := session.singleModeErrorOnCrossShard(txMode, 1); err != nil {
return err
}
session.PreSessions = newSessions
case vtgatepb.CommitOrder_PRE:
session.PreSessions = append(session.PreSessions, newSession)
case vtgatepb.CommitOrder_POST:
newSessions, err := addOrUpdate(shardSession, session.PostSessions)
if err != nil {
return err
}
session.PostSessions = newSessions
session.PostSessions = append(session.PostSessions, newSession)
default:
// Should be unreachable
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] SafeSession.AppendOrUpdate: unexpected commitOrder")
return vterrors.VT13001(fmt.Sprintf("unexpected commitOrder to append shard session: %v", session.commitOrder))
}

return nil
}

// singleModeErrorOnCrossShard checks if a transaction violates the Single mode constraint by spanning multiple shards.
func (session *SafeSession) singleModeErrorOnCrossShard(txMode vtgatepb.TransactionMode, exceedsCrossShard int) error {
// Skip the check if:
// 1. The query comes from a lookup vindex.
// 2. The transaction mode is not Single.
// 3. The transaction is not in the normal shard session.
if session.queryFromVindex || session.commitOrder != vtgatepb.CommitOrder_NORMAL || !session.isSingleDB(txMode) {
return nil
}

// If the transaction spans multiple shards, abort it.
if actualNoOfShardSession(session.ShardSessions) > exceedsCrossShard {
session.mustRollback = true // Mark the session for rollback.
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "multi-db transaction attempted: %v", session.ShardSessions)
}

return nil
Expand Down
Loading

0 comments on commit 1cb39b0

Please sign in to comment.