Skip to content

Commit

Permalink
feat: abort early once we reach count = 0
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Dec 19, 2024
1 parent 193e007 commit 6db0700
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions go/vt/vtgate/engine/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"strconv"
"sync"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/evalengine"

Expand Down Expand Up @@ -100,15 +98,16 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars

bindVars = copyBindVars(bindVars)

// When offset is present, we hijack the limit value so we can calculate
// the offset in memory from the result of the scatter query with count + offset.
// Adjust the upper limit so that the initial fetch includes both the offset and count.
// We do this because we want to skip the first `offset` rows locally rather than on the server side.
bindVars[UpperLimitStr] = sqltypes.Int64BindVariable(int64(count + offset))

var mu sync.Mutex
err = vcursor.StreamExecutePrimitive(ctx, l.Input, bindVars, wantfields, func(qr *sqltypes.Result) error {
mu.Lock()
defer mu.Unlock()
log.Errorf("LastInsertID: %d InsertIDChanged %t\n", qr.InsertID, qr.InsertIDChanged)

// If this is the first callback and fields are requested, send the fields immediately.
if wantfields && len(qr.Fields) != 0 {
if err := callback(&sqltypes.Result{Fields: qr.Fields}); err != nil {
return err
Expand All @@ -119,17 +118,32 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars
return callback(qr)
}

// we've still not seen all rows we need to see before we can return anything to the client
// If we still need to skip `offset` rows before returning any to the client:
if offset > 0 {
if inputSize <= offset {
// not enough to return anything yet
// not enough to return anything yet, but we still want to pass on metadata such as last_insert_id
offset -= inputSize
return nil
qr.Rows = nil
return callback(qr)
}
// Skip `offset` rows from this batch and reset offset to 0.
qr.Rows = qr.Rows[offset:]
offset = 0
}

// At this point, we've dealt with the offset. Now handle the count (limit).
if count == 0 {
// If count is zero, we've fetched everything we need.
if !l.RequireCompleteInput && !vcursor.Session().InTransaction() {
return io.EOF
}

// If we require the complete input, or we are in a transaction, we cannot return io.EOF early.
// Instead, we return empty results as needed until input ends.
qr.Rows = nil
return callback(qr)
}

// reduce count till 0.
resultSize := len(qr.Rows)
if count > resultSize {
Expand All @@ -143,6 +157,8 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars
return err
}

// If we required complete input or are in a transaction, we must not exit early.
// We'll return empty batches until the input is done.
if l.RequireCompleteInput || vcursor.Session().InTransaction() {
return nil
}
Expand Down

0 comments on commit 6db0700

Please sign in to comment.