Skip to content

Commit

Permalink
feat: pass it along for streaming queries
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Dec 17, 2024
1 parent 5b5be41 commit a872d14
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (c *Conn) readHeaderFrom(r io.Reader) (int, error) {
return 0, vterrors.Wrapf(err, "io.ReadFull(header size) failed")
}

sequence := uint8(c.header[3])
sequence := c.header[3]
if sequence != c.sequence {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence, expected %v got %v", c.sequence, sequence)
}
Expand Down
57 changes: 31 additions & 26 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,38 +133,43 @@ func TestCast(t *testing.T) {

// TestSetAndGetLastInsertID tests that the last_insert_id function works as intended when used with different arguments.
func TestSetAndGetLastInsertID(t *testing.T) {
mcmp, closer := start(t)
defer closer()

checkQuery := func(query string) {
mcmp.Run(query, func(mcmp *utils.MySQLCompare) {
mcmp.Exec(query)
mcmp.Exec("select last_insert_id()")
})
}
notZero := 1
for _, workload := range []string{"olap", "oltp"} {
mcmp, closer := start(t)
defer closer()

checkQuery("select last_insert_id(42)")
checkQuery("select last_insert_id(0)")
_, err := mcmp.VtConn.ExecuteFetch(fmt.Sprintf("set workload = %s", workload), 1000, false)
require.NoError(t, err)
checkQuery := func(i string) {
for _, val := range []int{0, notZero} {
query := fmt.Sprintf(i, val)
name := fmt.Sprintf("%s - %s", workload, query)
mcmp.Run(name, func(mcmp *utils.MySQLCompare) {
mcmp.Exec(query)
mcmp.Exec("select last_insert_id()")
})
}
notZero++
}

// Test within SELECT expressions
checkQuery("select last_insert_id(123), id1, id2 from t1 limit 1")
checkQuery("select id1, last_insert_id(999) as li, id2 from t1 where id1 > last_insert_id(0)")
checkQuery("select last_insert_id(%d)")

// Test in WHERE clauses
checkQuery("select id1 from t1 where id1 = last_insert_id(55)")
checkQuery("select id1 from t1 where last_insert_id(0) = 0")
// Test within SELECT expressions
checkQuery("select last_insert_id(%d), id1, id2 from t1 limit 1")
checkQuery("select last_insert_id(%d), id1, id2 from t1 where 1 = 2")
checkQuery("select 12 from t1 where last_insert_id(%d)")

// Add a row so we can test the last_insert_id in UPDATE statements
mcmp.Exec("insert into t1 (id1, id2) values (1, 10)")
// Add a row so we can test the last_insert_id in UPDATE statements
mcmp.Exec("insert into t1 (id1, id2) values (1, 10)")

// Test in UPDATE statements
checkQuery("update t1 set id2 = last_insert_id(777) where id1 = 1") // this should run
checkQuery("update t1 set id2 = last_insert_id(456) where id1 = 2") // this should not run
checkQuery("update t1 set id2 = 88 where id1 = last_insert_id(0)")
// Test in UPDATE statements
checkQuery("update t1 set id2 = last_insert_id(%d) where id1 = 1") // this should run
checkQuery("update t1 set id2 = last_insert_id(%d) where id1 = 2") // this should not run
checkQuery("update t1 set id2 = 88 where id1 = last_insert_id(%d)")

// Test in DELETE statements
checkQuery("delete from t1 where id1 = last_insert_id(456)")
checkQuery("delete from t1 where id1 = last_insert_id(1)")
// Test in DELETE statements
checkQuery("delete from t1 where id1 = last_insert_id(%d)")
}
}

// TestVindexHints tests that vindex hints work as intended.
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,11 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction
trace.AnnotateSQL(span, sqlparser.Preview(sql))
callBackClosingSpan := func(result *sqltypes.Result) error {
defer span.Finish()

if err := qre.fetchLastInsertID(ctx, conn.Conn, result); err != nil {
return err
}

return callback(result)
}

Expand All @@ -1217,13 +1222,21 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn)
if err := qre.resetLastInsertIDIfNeeded(ctx, conn.Conn); err != nil {
return err
}

if isTransaction {
err := qre.tsv.statefulql.Add(qd)
if err != nil {
return err
}
defer qre.tsv.statefulql.Remove(qd)
return conn.Conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
err = conn.Conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
if err != nil {
return err
}
return nil
}
err := qre.tsv.olapql.Add(qd)
if err != nil {
Expand Down

0 comments on commit a872d14

Please sign in to comment.