Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Dec 19, 2024
1 parent 9dc7651 commit 873d095
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 46 deletions.
24 changes: 0 additions & 24 deletions go/vt/proto/query/query_extra.go

This file was deleted.

2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (del *Delete) deleteVindexEntries(ctx context.Context, vcursor VCursor, bin
for i := range rss {
queries[i] = &querypb.BoundQuery{Sql: del.OwnedVindexQuery, BindVariables: bindVars}
}
subQueryResults, errors := vcursor.ExecuteMultiShard(ctx, del, rss, queries, false, false, del.FetchLastInsertID)
subQueryResults, errors := vcursor.ExecuteMultiShard(ctx, del, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, del.FetchLastInsertID)
for _, err := range errors {
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error {

func (dml *DML) execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || dml.MultiShardAutocommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true, autocommit, dml.FetchLastInsertID)
result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /*rollbackOnError*/, autocommit, dml.FetchLastInsertID)
return result, vterrors.Aggregate(errs)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (ins *Insert) executeInsertQueries(
if err != nil {
return nil, err
}
result, errs := vcursor.ExecuteMultiShard(ctx, ins, rss, queries, true, autocommit, ins.FetchLastInsertID)
result, errs := vcursor.ExecuteMultiShard(ctx, ins, rss, queries, true /*rollbackOnError*/, autocommit, ins.FetchLastInsertID)
if errs != nil {
return nil, vterrors.Aggregate(errs)
}
Expand Down
18 changes: 14 additions & 4 deletions go/vt/vtgate/engine/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,22 @@ import (

var _ Primitive = (*Limit)(nil)

// Limit is a primitive that performs the LIMIT operation.
// Limit performs the LIMIT operation, restricting the number of rows returned.
type Limit struct {
Count evalengine.Expr
Offset evalengine.Expr
// Count specifies the maximum number of rows to return.
Count evalengine.Expr

// Offset specifies the number of rows to skip before returning results.
Offset evalengine.Expr

// RequireCompleteInput determines if all input rows must be fully retrieved.
// - If true, all Result structs are passed through, and the total rows are limited.
// - If false, Limit returns io.EOF once the limit is reached in streaming mode,
// signaling the tablet to stop sending data.
RequireCompleteInput bool
Input Primitive

// Input provides the input rows.
Input Primitive
}

var UpperLimitStr = "__upper_limit"
Expand Down
15 changes: 11 additions & 4 deletions go/vt/vtgate/engine/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// StreamExecutor is a subset of Primitive that MergeSort
// requires its inputs to satisfy.
type StreamExecutor interface {
StreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error
StreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, fetchLastInsertID bool, callback func(*sqltypes.Result) error) error
}

var _ Primitive = (*MergeSort)(nil)
Expand All @@ -52,6 +52,7 @@ type MergeSort struct {
Primitives []StreamExecutor
OrderBy evalengine.Comparison
ScatterErrorsAsWarnings bool
FetchLastInsertID bool
}

// RouteType satisfies Primitive.
Expand Down Expand Up @@ -83,7 +84,7 @@ func (ms *MergeSort) TryStreamExecute(ctx context.Context, vcursor VCursor, bind
gotFields := wantfields
handles := make([]*streamHandle, len(ms.Primitives))
for i, input := range ms.Primitives {
handles[i] = runOneStream(ctx, vcursor, input, bindVars, gotFields)
handles[i] = runOneStream(ctx, vcursor, input, bindVars, gotFields, ms.FetchLastInsertID)
if !ms.ScatterErrorsAsWarnings {
// we only need the fields from the first input, unless we allow ScatterErrorsAsWarnings.
// in that case, we need to ask all the inputs for fields - we don't know which will return anything
Expand Down Expand Up @@ -221,7 +222,13 @@ type streamHandle struct {
}

// runOnestream starts a streaming query on one shard, and returns a streamHandle for it.
func runOneStream(ctx context.Context, vcursor VCursor, input StreamExecutor, bindVars map[string]*querypb.BindVariable, wantfields bool) *streamHandle {
func runOneStream(
ctx context.Context,
vcursor VCursor,
input StreamExecutor,
bindVars map[string]*querypb.BindVariable,
wantfields, fetchLastInsertID bool,
) *streamHandle {
handle := &streamHandle{
fields: make(chan []*querypb.Field, 1),
row: make(chan []sqltypes.Value, 10),
Expand All @@ -231,7 +238,7 @@ func runOneStream(ctx context.Context, vcursor VCursor, input StreamExecutor, bi
defer close(handle.fields)
defer close(handle.row)

handle.err = input.StreamExecute(ctx, vcursor, bindVars, wantfields, func(qr *sqltypes.Result) error {
handle.err = input.StreamExecute(ctx, vcursor, bindVars, wantfields, fetchLastInsertID, func(qr *sqltypes.Result) error {
if !handle.fieldSeen && len(qr.Fields) != 0 {
handle.fieldSeen = true
select {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/merge_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ type shardResult struct {
sendErr error
}

func (sr *shardResult) StreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
func (sr *shardResult) StreamExecute(_ context.Context, _ VCursor, _ map[string]*querypb.BindVariable, _ bool, _ bool, callback func(*sqltypes.Result) error) error {
for _, r := range sr.results {
if err := callback(r); err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (route *Route) executeShards(
}

queries := getQueries(route.Query, bvs)
result, errs := vcursor.ExecuteMultiShard(ctx, route, rss, queries, false, false, route.FetchLastInsertID)
result, errs := vcursor.ExecuteMultiShard(ctx, route, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID)

route.executeWarmingReplicaRead(ctx, vcursor, bindVars, queries)

Expand Down Expand Up @@ -318,10 +318,12 @@ func (route *Route) mergeSort(
primitive: route,
})
}

ms := MergeSort{
Primitives: prims,
OrderBy: route.OrderBy,
ScatterErrorsAsWarnings: route.ScatterErrorsAsWarnings,
FetchLastInsertID: route.FetchLastInsertID,
}
return vcursor.StreamExecutePrimitive(ctx, &ms, bindVars, wantfields, func(qr *sqltypes.Result) error {
return callback(qr.Truncate(route.TruncateColumnCount))
Expand Down Expand Up @@ -539,7 +541,7 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs
return
}

_, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false, false, route.FetchLastInsertID)
_, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID)
if len(errs) > 0 {
log.Warningf("Failed to execute warming replica read: %v", errs)
} else {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (svs *SysVarReservedConn) Execute(ctx context.Context, vcursor VCursor, env
BindVariables: env.BindVars,
}
}
_, errs := vcursor.ExecuteMultiShard(ctx, nil, rss, queries, false, false, false)
_, errs := vcursor.ExecuteMultiShard(ctx, nil /*primitive*/, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, false /*fetchLastInsertID*/)
return vterrors.Aggregate(errs)
}

Expand All @@ -320,7 +320,7 @@ func (svs *SysVarReservedConn) execSetStatement(ctx context.Context, vcursor VCu
BindVariables: env.BindVars,
}
}
_, errs := vcursor.ExecuteMultiShard(ctx, nil, rss, queries, false, false, false)
_, errs := vcursor.ExecuteMultiShard(ctx, nil /*primitive*/, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, false /*fetchLastInsertID*/)
return vterrors.Aggregate(errs)
}

Expand All @@ -333,7 +333,7 @@ func (svs *SysVarReservedConn) checkAndUpdateSysVar(ctx context.Context, vcursor
if err != nil {
return false, err
}
qr, err := execShard(ctx, nil, vcursor, sysVarExprValidationQuery, res.BindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */, false)
qr, err := execShard(ctx, nil /*primitive*/, vcursor, sysVarExprValidationQuery, res.BindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */, false /*fetchLastInsertID*/)
if err != nil {
return false, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/shard_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type shardRoute struct {
}

// StreamExecute performs a streaming exec.
func (sr *shardRoute) StreamExecute(ctx context.Context, vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, callback func(*sqltypes.Result) error) error {
func (sr *shardRoute) StreamExecute(ctx context.Context, vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, fetchLastInsertID bool, callback func(*sqltypes.Result) error) error {
// TODO rollback on error and autocommit should probably not be used like this
errors := vcursor.StreamExecuteMulti(ctx, sr.primitive, sr.query, []*srvtopo.ResolvedShard{sr.rs}, []map[string]*querypb.BindVariable{sr.bv}, false /* rollbackOnError */, false /* autocommit */, false, callback)
errors := vcursor.StreamExecuteMulti(ctx, sr.primitive, sr.query, []*srvtopo.ResolvedShard{sr.rs}, []map[string]*querypb.BindVariable{sr.bv}, false /* rollbackOnError */, false /* autocommit */, fetchLastInsertID, callback)
return vterrors.Aggregate(errors)
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (upd *Update) updateVindexEntries(ctx context.Context, vcursor VCursor, bin
for i := range rss {
queries[i] = &querypb.BoundQuery{Sql: upd.OwnedVindexQuery, BindVariables: bindVars}
}
subQueryResult, errors := vcursor.ExecuteMultiShard(ctx, upd, rss, queries, false, false, upd.FetchLastInsertID)
subQueryResult, errors := vcursor.ExecuteMultiShard(ctx, upd, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, upd.FetchLastInsertID)
for _, err := range errors {
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string,
}

func (qre *QueryExecutor) resetLastInsertIDIfNeeded(ctx context.Context, conn *connpool.Conn) error {
if qre.options.ShouldFetchLastInsertID() {
if qre.options.GetFetchLastInsertId() {
// if the query contains a last_insert_id(x) function,
// we need to reset the last insert id to check if it was set by the query or not
_, err := conn.Exec(ctx, resetLastIDQuery, 1, false)
Expand All @@ -1179,7 +1179,7 @@ func (qre *QueryExecutor) resetLastInsertIDIfNeeded(ctx context.Context, conn *c
}

func (qre *QueryExecutor) fetchLastInsertID(ctx context.Context, conn *connpool.Conn, exec *sqltypes.Result) error {
if exec.InsertID != 0 || !qre.options.ShouldFetchLastInsertID() {
if exec.InsertID != 0 || !qre.options.GetFetchLastInsertId() {
return nil
}

Expand Down

0 comments on commit 873d095

Please sign in to comment.