From d5899e51fd0cde14086e518b1d65245434b1ce46 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 19 Dec 2024 18:01:11 +0100 Subject: [PATCH] addressed review comments Signed-off-by: Andres Taylor --- go/vt/proto/query/query_extra.go | 24 ------------------- go/vt/vtgate/engine/delete.go | 2 +- go/vt/vtgate/engine/dml.go | 2 +- go/vt/vtgate/engine/insert.go | 2 +- go/vt/vtgate/engine/limit.go | 18 ++++++++++---- go/vt/vtgate/engine/merge_sort.go | 15 ++++++++---- go/vt/vtgate/engine/merge_sort_test.go | 2 +- go/vt/vtgate/engine/route.go | 6 +++-- go/vt/vtgate/engine/set.go | 6 ++--- go/vt/vtgate/engine/shard_route.go | 2 +- go/vt/vtgate/engine/update.go | 2 +- go/vt/vttablet/tabletserver/query_executor.go | 4 ++-- 12 files changed, 40 insertions(+), 45 deletions(-) delete mode 100644 go/vt/proto/query/query_extra.go diff --git a/go/vt/proto/query/query_extra.go b/go/vt/proto/query/query_extra.go deleted file mode 100644 index 21cdb8c1efc..00000000000 --- a/go/vt/proto/query/query_extra.go +++ /dev/null @@ -1,24 +0,0 @@ -/* -Copyright 2024 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package query - -func (x *ExecuteOptions) ShouldFetchLastInsertID() bool { - if x == nil { - return false - } - return x.FetchLastInsertId -} diff --git a/go/vt/vtgate/engine/delete.go b/go/vt/vtgate/engine/delete.go index 25bf09e2509..14859fc2135 100644 --- a/go/vt/vtgate/engine/delete.go +++ b/go/vt/vtgate/engine/delete.go @@ -87,7 +87,7 @@ func (del *Delete) deleteVindexEntries(ctx context.Context, vcursor VCursor, bin for i := range rss { queries[i] = &querypb.BoundQuery{Sql: del.OwnedVindexQuery, BindVariables: bindVars} } - subQueryResults, errors := vcursor.ExecuteMultiShard(ctx, del, rss, queries, false, false, del.FetchLastInsertID) + subQueryResults, errors := vcursor.ExecuteMultiShard(ctx, del, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, del.FetchLastInsertID) for _, err := range errors { if err != nil { return err diff --git a/go/vt/vtgate/engine/dml.go b/go/vt/vtgate/engine/dml.go index 3d739a6667f..9a0a044a3c4 100644 --- a/go/vt/vtgate/engine/dml.go +++ b/go/vt/vtgate/engine/dml.go @@ -134,7 +134,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error { func (dml *DML) execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) (*sqltypes.Result, error) { autocommit := (len(rss) == 1 || dml.MultiShardAutocommit) && vcursor.AutocommitApproval() - result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true, autocommit, dml.FetchLastInsertID) + result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /*rollbackOnError*/, autocommit, dml.FetchLastInsertID) return result, vterrors.Aggregate(errs) } diff --git a/go/vt/vtgate/engine/insert.go b/go/vt/vtgate/engine/insert.go index cb822818d68..2cae911ccd5 100644 --- a/go/vt/vtgate/engine/insert.go +++ b/go/vt/vtgate/engine/insert.go @@ -171,7 +171,7 @@ func (ins *Insert) executeInsertQueries( if err != nil { return nil, err } - result, errs := vcursor.ExecuteMultiShard(ctx, ins, rss, queries, true, autocommit, ins.FetchLastInsertID) + result, errs := vcursor.ExecuteMultiShard(ctx, ins, rss, queries, true /*rollbackOnError*/, autocommit, ins.FetchLastInsertID) if errs != nil { return nil, vterrors.Aggregate(errs) } diff --git a/go/vt/vtgate/engine/limit.go b/go/vt/vtgate/engine/limit.go index 56cf0ab87eb..d50a0d4adde 100644 --- a/go/vt/vtgate/engine/limit.go +++ b/go/vt/vtgate/engine/limit.go @@ -33,12 +33,22 @@ import ( var _ Primitive = (*Limit)(nil) -// Limit is a primitive that performs the LIMIT operation. +// Limit performs the LIMIT operation, restricting the number of rows returned. type Limit struct { - Count evalengine.Expr - Offset evalengine.Expr + // Count specifies the maximum number of rows to return. + Count evalengine.Expr + + // Offset specifies the number of rows to skip before returning results. + Offset evalengine.Expr + + // RequireCompleteInput determines if all input rows must be fully retrieved. + // - If true, all Result structs are passed through, and the total rows are limited. + // - If false, Limit returns io.EOF once the limit is reached in streaming mode, + // signaling the tablet to stop sending data. RequireCompleteInput bool - Input Primitive + + // Input provides the input rows. + Input Primitive } var UpperLimitStr = "__upper_limit" diff --git a/go/vt/vtgate/engine/merge_sort.go b/go/vt/vtgate/engine/merge_sort.go index 8678ff49241..abf2a65f311 100644 --- a/go/vt/vtgate/engine/merge_sort.go +++ b/go/vt/vtgate/engine/merge_sort.go @@ -31,7 +31,7 @@ import ( // StreamExecutor is a subset of Primitive that MergeSort // requires its inputs to satisfy. type StreamExecutor interface { - StreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error + StreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, fetchLastInsertID bool, callback func(*sqltypes.Result) error) error } var _ Primitive = (*MergeSort)(nil) @@ -52,6 +52,7 @@ type MergeSort struct { Primitives []StreamExecutor OrderBy evalengine.Comparison ScatterErrorsAsWarnings bool + FetchLastInsertID bool } // RouteType satisfies Primitive. @@ -83,7 +84,7 @@ func (ms *MergeSort) TryStreamExecute(ctx context.Context, vcursor VCursor, bind gotFields := wantfields handles := make([]*streamHandle, len(ms.Primitives)) for i, input := range ms.Primitives { - handles[i] = runOneStream(ctx, vcursor, input, bindVars, gotFields) + handles[i] = runOneStream(ctx, vcursor, input, bindVars, gotFields, ms.FetchLastInsertID) if !ms.ScatterErrorsAsWarnings { // we only need the fields from the first input, unless we allow ScatterErrorsAsWarnings. // in that case, we need to ask all the inputs for fields - we don't know which will return anything @@ -221,7 +222,13 @@ type streamHandle struct { } // runOnestream starts a streaming query on one shard, and returns a streamHandle for it. -func runOneStream(ctx context.Context, vcursor VCursor, input StreamExecutor, bindVars map[string]*querypb.BindVariable, wantfields bool) *streamHandle { +func runOneStream( + ctx context.Context, + vcursor VCursor, + input StreamExecutor, + bindVars map[string]*querypb.BindVariable, + wantfields, fetchLastInsertID bool, +) *streamHandle { handle := &streamHandle{ fields: make(chan []*querypb.Field, 1), row: make(chan []sqltypes.Value, 10), @@ -231,7 +238,7 @@ func runOneStream(ctx context.Context, vcursor VCursor, input StreamExecutor, bi defer close(handle.fields) defer close(handle.row) - handle.err = input.StreamExecute(ctx, vcursor, bindVars, wantfields, func(qr *sqltypes.Result) error { + handle.err = input.StreamExecute(ctx, vcursor, bindVars, wantfields, fetchLastInsertID, func(qr *sqltypes.Result) error { if !handle.fieldSeen && len(qr.Fields) != 0 { handle.fieldSeen = true select { diff --git a/go/vt/vtgate/engine/merge_sort_test.go b/go/vt/vtgate/engine/merge_sort_test.go index 6b383e12572..60890283f89 100644 --- a/go/vt/vtgate/engine/merge_sort_test.go +++ b/go/vt/vtgate/engine/merge_sort_test.go @@ -411,7 +411,7 @@ type shardResult struct { sendErr error } -func (sr *shardResult) StreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { +func (sr *shardResult) StreamExecute(_ context.Context, _ VCursor, _ map[string]*querypb.BindVariable, _ bool, _ bool, callback func(*sqltypes.Result) error) error { for _, r := range sr.results { if err := callback(r); err != nil { return err diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index f585efea2e6..20ab7695f70 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -183,7 +183,7 @@ func (route *Route) executeShards( } queries := getQueries(route.Query, bvs) - result, errs := vcursor.ExecuteMultiShard(ctx, route, rss, queries, false, false, route.FetchLastInsertID) + result, errs := vcursor.ExecuteMultiShard(ctx, route, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID) route.executeWarmingReplicaRead(ctx, vcursor, bindVars, queries) @@ -318,10 +318,12 @@ func (route *Route) mergeSort( primitive: route, }) } + ms := MergeSort{ Primitives: prims, OrderBy: route.OrderBy, ScatterErrorsAsWarnings: route.ScatterErrorsAsWarnings, + FetchLastInsertID: route.FetchLastInsertID, } return vcursor.StreamExecutePrimitive(ctx, &ms, bindVars, wantfields, func(qr *sqltypes.Result) error { return callback(qr.Truncate(route.TruncateColumnCount)) @@ -539,7 +541,7 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs return } - _, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false, false, route.FetchLastInsertID) + _, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID) if len(errs) > 0 { log.Warningf("Failed to execute warming replica read: %v", errs) } else { diff --git a/go/vt/vtgate/engine/set.go b/go/vt/vtgate/engine/set.go index e12a4029fcf..95fb5c87a32 100644 --- a/go/vt/vtgate/engine/set.go +++ b/go/vt/vtgate/engine/set.go @@ -308,7 +308,7 @@ func (svs *SysVarReservedConn) Execute(ctx context.Context, vcursor VCursor, env BindVariables: env.BindVars, } } - _, errs := vcursor.ExecuteMultiShard(ctx, nil, rss, queries, false, false, false) + _, errs := vcursor.ExecuteMultiShard(ctx, nil /*primitive*/, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, false /*fetchLastInsertID*/) return vterrors.Aggregate(errs) } @@ -320,7 +320,7 @@ func (svs *SysVarReservedConn) execSetStatement(ctx context.Context, vcursor VCu BindVariables: env.BindVars, } } - _, errs := vcursor.ExecuteMultiShard(ctx, nil, rss, queries, false, false, false) + _, errs := vcursor.ExecuteMultiShard(ctx, nil /*primitive*/, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, false /*fetchLastInsertID*/) return vterrors.Aggregate(errs) } @@ -333,7 +333,7 @@ func (svs *SysVarReservedConn) checkAndUpdateSysVar(ctx context.Context, vcursor if err != nil { return false, err } - qr, err := execShard(ctx, nil, vcursor, sysVarExprValidationQuery, res.BindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */, false) + qr, err := execShard(ctx, nil /*primitive*/, vcursor, sysVarExprValidationQuery, res.BindVars, rss[0], false /* rollbackOnError */, false /* canAutocommit */, false /*fetchLastInsertID*/) if err != nil { return false, err } diff --git a/go/vt/vtgate/engine/shard_route.go b/go/vt/vtgate/engine/shard_route.go index dadb7a36704..70df57f017f 100644 --- a/go/vt/vtgate/engine/shard_route.go +++ b/go/vt/vtgate/engine/shard_route.go @@ -37,7 +37,7 @@ type shardRoute struct { } // StreamExecute performs a streaming exec. -func (sr *shardRoute) StreamExecute(ctx context.Context, vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, callback func(*sqltypes.Result) error) error { +func (sr *shardRoute) StreamExecute(ctx context.Context, vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool, fetchLastInsertID bool, callback func(*sqltypes.Result) error) error { // TODO rollback on error and autocommit should probably not be used like this errors := vcursor.StreamExecuteMulti(ctx, sr.primitive, sr.query, []*srvtopo.ResolvedShard{sr.rs}, []map[string]*querypb.BindVariable{sr.bv}, false /* rollbackOnError */, false /* autocommit */, false, callback) return vterrors.Aggregate(errors) diff --git a/go/vt/vtgate/engine/update.go b/go/vt/vtgate/engine/update.go index 563a032a409..49c8576fd84 100644 --- a/go/vt/vtgate/engine/update.go +++ b/go/vt/vtgate/engine/update.go @@ -102,7 +102,7 @@ func (upd *Update) updateVindexEntries(ctx context.Context, vcursor VCursor, bin for i := range rss { queries[i] = &querypb.BoundQuery{Sql: upd.OwnedVindexQuery, BindVariables: bindVars} } - subQueryResult, errors := vcursor.ExecuteMultiShard(ctx, upd, rss, queries, false, false, upd.FetchLastInsertID) + subQueryResult, errors := vcursor.ExecuteMultiShard(ctx, upd, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, upd.FetchLastInsertID) for _, err := range errors { if err != nil { return err diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 79bf9f0bc11..8a67844437e 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1167,7 +1167,7 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string, } func (qre *QueryExecutor) resetLastInsertIDIfNeeded(ctx context.Context, conn *connpool.Conn) error { - if qre.options.ShouldFetchLastInsertID() { + if qre.options.GetFetchLastInsertId() { // if the query contains a last_insert_id(x) function, // we need to reset the last insert id to check if it was set by the query or not _, err := conn.Exec(ctx, resetLastIDQuery, 1, false) @@ -1179,7 +1179,7 @@ func (qre *QueryExecutor) resetLastInsertIDIfNeeded(ctx context.Context, conn *c } func (qre *QueryExecutor) fetchLastInsertID(ctx context.Context, conn *connpool.Conn, exec *sqltypes.Result) error { - if exec.InsertID != 0 || !qre.options.ShouldFetchLastInsertID() { + if exec.InsertID != 0 || !qre.options.GetFetchLastInsertId() { return nil }