Skip to content

Commit

Permalink
added insertID changed boolean to indicate if last_insert id value is…
Browse files Browse the repository at this point in the history
… modified

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Dec 17, 2024
1 parent fbc3dbb commit 0db81eb
Show file tree
Hide file tree
Showing 10 changed files with 820 additions and 699 deletions.
3 changes: 3 additions & 0 deletions go/sqltypes/proto3.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func ResultToProto3(qr *Result) *querypb.QueryResult {
Fields: qr.Fields,
RowsAffected: qr.RowsAffected,
InsertId: qr.InsertID,
InsertIdChanged: qr.InsertIDChanged,
Rows: RowsToProto3(qr.Rows),
Info: qr.Info,
SessionStateChanges: qr.SessionStateChanges,
Expand All @@ -119,6 +120,7 @@ func Proto3ToResult(qr *querypb.QueryResult) *Result {
Fields: qr.Fields,
RowsAffected: qr.RowsAffected,
InsertID: qr.InsertId,
InsertIDChanged: qr.InsertIdChanged,
Rows: proto3ToRows(qr.Fields, qr.Rows),
Info: qr.Info,
SessionStateChanges: qr.SessionStateChanges,
Expand All @@ -136,6 +138,7 @@ func CustomProto3ToResult(fields []*querypb.Field, qr *querypb.QueryResult) *Res
Fields: qr.Fields,
RowsAffected: qr.RowsAffected,
InsertID: qr.InsertId,
InsertIDChanged: qr.InsertIdChanged,
Rows: proto3ToRows(fields, qr.Rows),
Info: qr.Info,
SessionStateChanges: qr.SessionStateChanges,
Expand Down
9 changes: 8 additions & 1 deletion go/sqltypes/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Result struct {
Fields []*querypb.Field `json:"fields"`
RowsAffected uint64 `json:"rows_affected"`
InsertID uint64 `json:"insert_id"`
InsertIDChanged bool `json:"insert_id_changed"`
Rows []Row `json:"rows"`
SessionStateChanges string `json:"session_state_changes"`
StatusFlags uint16 `json:"status_flags"`
Expand Down Expand Up @@ -92,6 +93,7 @@ func (result *Result) Copy() *Result {
out := &Result{
RowsAffected: result.RowsAffected,
InsertID: result.InsertID,
InsertIDChanged: result.InsertIDChanged,
SessionStateChanges: result.SessionStateChanges,
StatusFlags: result.StatusFlags,
Info: result.Info,
Expand All @@ -116,6 +118,7 @@ func (result *Result) ShallowCopy() *Result {
return &Result{
Fields: result.Fields,
InsertID: result.InsertID,
InsertIDChanged: result.InsertIDChanged,
RowsAffected: result.RowsAffected,
Info: result.Info,
SessionStateChanges: result.SessionStateChanges,
Expand All @@ -129,6 +132,7 @@ func (result *Result) Metadata() *Result {
return &Result{
Fields: result.Fields,
InsertID: result.InsertID,
InsertIDChanged: result.InsertIDChanged,
RowsAffected: result.RowsAffected,
Info: result.Info,
SessionStateChanges: result.SessionStateChanges,
Expand All @@ -153,6 +157,7 @@ func (result *Result) Truncate(l int) *Result {

out := &Result{
InsertID: result.InsertID,
InsertIDChanged: result.InsertIDChanged,
RowsAffected: result.RowsAffected,
Info: result.Info,
SessionStateChanges: result.SessionStateChanges,
Expand Down Expand Up @@ -198,6 +203,7 @@ func (result *Result) Equal(other *Result) bool {
return FieldsEqual(result.Fields, other.Fields) &&
result.RowsAffected == other.RowsAffected &&
result.InsertID == other.InsertID &&
result.InsertIDChanged == other.InsertIDChanged &&
slices.EqualFunc(result.Rows, other.Rows, func(a, b Row) bool {
return RowEqual(a, b)
})
Expand Down Expand Up @@ -331,9 +337,10 @@ func (result *Result) AppendResult(src *Result) {
result.Fields = src.Fields
}
result.RowsAffected += src.RowsAffected
if src.InsertID != 0 {
if src.InsertID != 0 || src.InsertIDChanged {
result.InsertID = src.InsertID
}
result.InsertIDChanged = result.InsertIDChanged || src.InsertIDChanged
result.Rows = append(result.Rows, src.Rows...)
}

Expand Down
11 changes: 11 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ func TestCast(t *testing.T) {
mcmp.AssertMatches("select cast('3.2' as unsigned)", `[[UINT64(3)]]`)
}

// TestSetAndGetLastInsertID tests that the last_insert_id function works as intended when used with different arguments.
func TestSetAndGetLastInsertID(t *testing.T) {
mcmp, closer := start(t)
defer closer()

mcmp.Exec("select last_insert_id(42)")
mcmp.Exec("select last_insert_id()")
mcmp.Exec("select last_insert_id(0)")
mcmp.Exec("select last_insert_id()")
}

// TestVindexHints tests that vindex hints work as intended.
func TestVindexHints(t *testing.T) {
mcmp, closer := start(t)
Expand Down
1,379 changes: 695 additions & 684 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 14 additions & 12 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func (e *Executor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
stmtType, result, err := e.execute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats)
logStats.Error = err
if result == nil {
saveSessionStats(safeSession, stmtType, 0, 0, 0, err)
saveSessionStats(safeSession, stmtType, false, 0, 0, 0, err)
} else {
saveSessionStats(safeSession, stmtType, result.RowsAffected, result.InsertID, len(result.Rows), err)
saveSessionStats(safeSession, stmtType, result.InsertIDChanged, result.InsertID, result.RowsAffected, len(result.Rows), err)
}
if result != nil && len(result.Rows) > warnMemoryRows {
warnings.Add("ResultsExceeded", 1)
Expand All @@ -262,22 +262,24 @@ func (e *Executor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
}

type streaminResultReceiver struct {
mu sync.Mutex
stmtType sqlparser.StatementType
rowsAffected uint64
rowsReturned int
insertID uint64
callback func(*sqltypes.Result) error
mu sync.Mutex
stmtType sqlparser.StatementType
rowsAffected uint64
rowsReturned int
insertID uint64
insertIDChanged bool
callback func(*sqltypes.Result) error
}

func (s *streaminResultReceiver) storeResultStats(typ sqlparser.StatementType, qr *sqltypes.Result) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rowsAffected += qr.RowsAffected
s.rowsReturned += len(qr.Rows)
if qr.InsertID != 0 {
if qr.InsertID != 0 || qr.InsertIDChanged {
s.insertID = qr.InsertID
}
s.insertIDChanged = s.insertIDChanged || qr.InsertIDChanged
s.stmtType = typ
return s.callback(qr)
}
Expand Down Expand Up @@ -379,7 +381,7 @@ func (e *Executor) StreamExecute(
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, resultHandler, srr.storeResultStats)

logStats.Error = err
saveSessionStats(safeSession, srr.stmtType, srr.rowsAffected, srr.insertID, srr.rowsReturned, err)
saveSessionStats(safeSession, srr.stmtType, srr.insertIDChanged, srr.insertID, srr.rowsAffected, srr.rowsReturned, err)
if srr.rowsReturned > warnMemoryRows {
warnings.Add("ResultsExceeded", 1)
piiSafeSQL, err := e.env.Parser().RedactSQLQuery(sql)
Expand Down Expand Up @@ -412,15 +414,15 @@ func canReturnRows(stmtType sqlparser.StatementType) bool {
}
}

func saveSessionStats(safeSession *econtext.SafeSession, stmtType sqlparser.StatementType, rowsAffected, insertID uint64, rowsReturned int, err error) {
func saveSessionStats(safeSession *econtext.SafeSession, stmtType sqlparser.StatementType, insertIDChanged bool, insertID, rowsAffected uint64, rowsReturned int, err error) {
safeSession.RowCount = -1
if err != nil {
return
}
if !safeSession.IsFoundRowsHandled() {
safeSession.FoundRows = uint64(rowsReturned)
}
if insertID > 0 {
if insertID != 0 || insertIDChanged {
safeSession.LastInsertId = insertID
}
switch stmtType {
Expand Down
27 changes: 25 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ type QueryExecutor struct {
}

const (
streamRowsSize = 256
streamRowsSize = 256
resetLastIDQuery = "select last_insert_id(-4200)"
resetLastIDValue = 18446744073709547416
)

var (
Expand Down Expand Up @@ -1121,6 +1123,13 @@ func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields
}
defer qre.tsv.statelessql.Remove(qd)

if qre.options.ShouldFetchLastInsertID() {
_, err = conn.Exec(ctx, resetLastIDQuery, 1, false)
if err != nil {
return nil, err
}
}

exec, err := conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
if err != nil {
return nil, err
Expand All @@ -1147,6 +1156,13 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string,
}
defer qre.tsv.statefulql.Remove(qd)

if qre.options.ShouldFetchLastInsertID() {
_, err = conn.Exec(ctx, resetLastIDQuery, 1, false)
if err != nil {
return nil, err
}
}

exec, err := conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
if err != nil {
return nil, err
Expand All @@ -1161,6 +1177,10 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string,
}

func (qre *QueryExecutor) fetchLastInsertID(ctx context.Context, conn *connpool.Conn, exec *sqltypes.Result) error {
if exec.InsertID != 0 {
return nil
}

result, err := conn.Exec(ctx, "select last_insert_id()", 1, false)
if err != nil {
return err
Expand All @@ -1171,7 +1191,10 @@ func (qre *QueryExecutor) fetchLastInsertID(ctx context.Context, conn *connpool.
if err != nil {
return err
}
exec.InsertID = insertID
if resetLastIDValue != insertID {
exec.InsertID = insertID
exec.InsertIDChanged = true
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ message QueryResult {
repeated Row rows = 4;
string info = 6;
string session_state_changes = 7;
bool insert_id_changed=8;
}

// QueryWarning is used to convey out of band query execution warnings
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0db81eb

Please sign in to comment.