diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 1c5e6a0c1b2..f15799a5e71 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -798,6 +798,24 @@ func TestRowCountExceed(t *testing.T) { utils.AssertContainsError(t, conn, "select id1 from t1 where id1 < 1000", `Row count exceeded 100`) } +func TestDDLTargeted(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + utils.Exec(t, conn, "use `ks/-80`") + utils.Exec(t, conn, `begin`) + utils.Exec(t, conn, `create table ddl_targeted (id bigint primary key)`) + // implicit commit on ddl would have closed the open transaction + // so this execution should happen as autocommit. + utils.Exec(t, conn, `insert into ddl_targeted (id) values (1)`) + // this will have not impact and the row would have inserted. + utils.Exec(t, conn, `rollback`) + // validating the row + utils.AssertMatches(t, conn, `select id from ddl_targeted`, `[[INT64(1)]]`) +} + func TestLookupErrorMetric(t *testing.T) { conn, closer := start(t) defer closer() diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 31c9e9e0eb0..6867ff543af 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -47,6 +47,8 @@ type Send struct { // IsDML specifies how to deal with autocommit behaviour IsDML bool + IsDDL bool + // SingleShardOnly specifies that the query must be send to only single shard SingleShardOnly bool @@ -94,6 +96,10 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str ctx, cancelFunc := addQueryTimeout(ctx, vcursor, s.QueryTimeout) defer cancelFunc() + if err := s.commitIfDDL(ctx, vcursor); err != nil { + return nil, err + } + rss, err := s.checkAndReturnShards(ctx, vcursor) if err != nil { return nil, err @@ -158,6 +164,10 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV // TryStreamExecute implements Primitive interface func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + if err := s.commitIfDDL(ctx, vcursor); err != nil { + return err + } + rss, err := s.checkAndReturnShards(ctx, vcursor) if err != nil { return err @@ -204,3 +214,11 @@ func (s *Send) description() PrimitiveDescription { Other: other, } } + +// commitIfDDL commits any open transaction before executing the ddl query. +func (s *Send) commitIfDDL(ctx context.Context, vcursor VCursor) error { + if s.IsDDL { + return vcursor.Session().Commit(ctx) + } + return nil +} diff --git a/go/vt/vtgate/planbuilder/ddl.go b/go/vt/vtgate/planbuilder/ddl.go index 4c4b3791c20..f4b8ab6976f 100644 --- a/go/vt/vtgate/planbuilder/ddl.go +++ b/go/vt/vtgate/planbuilder/ddl.go @@ -45,7 +45,7 @@ func (fk *fkContraint) FkWalk(node sqlparser.SQLNode) (kontinue bool, err error) // and which chooses which of the two to invoke at runtime. func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser.DDLStatement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) { if vschema.Destination() != nil { - return buildByPassPlan(sql, vschema) + return buildByPassPlan(sql, vschema, true) } normalDDLPlan, onlineDDLPlan, err := buildDDLPlans(ctx, sql, ddlStatement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) if err != nil { @@ -80,7 +80,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser return newPlanResult(eddl, tc.getTables()...), nil } -func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, error) { +func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*planResult, error) { keyspace, err := vschema.DefaultKeyspace() if err != nil { return nil, err @@ -89,6 +89,7 @@ func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, erro Keyspace: keyspace, TargetDestination: vschema.Destination(), Query: sql, + IsDDL: isDDL, } return newPlanResult(send), nil } diff --git a/go/vt/vtgate/planbuilder/show.go b/go/vt/vtgate/planbuilder/show.go index f5a5282ceab..82035adaa87 100644 --- a/go/vt/vtgate/planbuilder/show.go +++ b/go/vt/vtgate/planbuilder/show.go @@ -45,7 +45,7 @@ const ( func buildShowPlan(sql string, stmt *sqlparser.Show, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) { if vschema.Destination() != nil { - return buildByPassPlan(sql, vschema) + return buildByPassPlan(sql, vschema, false) } var prim engine.Primitive diff --git a/go/vt/vttablet/endtoend/settings_test.go b/go/vt/vttablet/endtoend/settings_test.go index a459ad15844..0ccaf958737 100644 --- a/go/vt/vttablet/endtoend/settings_test.go +++ b/go/vt/vttablet/endtoend/settings_test.go @@ -138,16 +138,9 @@ func TestDDLNoConnectionReservationOnSettings(t *testing.T) { query := "create table temp(c_date datetime default '0000-00-00')" setting := "set sql_mode='TRADITIONAL'" - for _, withTx := range []bool{false, true} { - if withTx { - err := client.Begin(false) - require.NoError(t, err) - } - _, err := client.ReserveExecute(query, []string{setting}, nil) - require.Error(t, err, "create table should have failed with TRADITIONAL mode") - require.Contains(t, err.Error(), "Invalid default value") - assert.Zero(t, client.ReservedID()) - } + _, err := client.ReserveExecute(query, []string{setting}, nil) + assert.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode") + assert.Zero(t, client.ReservedID()) } func TestDMLNoConnectionReservationOnSettings(t *testing.T) { @@ -156,7 +149,10 @@ func TestDMLNoConnectionReservationOnSettings(t *testing.T) { _, err := client.Execute("create table temp(c_date datetime)", nil) require.NoError(t, err) - defer client.Execute("drop table temp", nil) + defer func() { + client.Rollback() + client.Execute("drop table temp", nil) + }() _, err = client.Execute("insert into temp values ('2022-08-25')", nil) require.NoError(t, err) @@ -211,9 +207,8 @@ func TestDDLNoConnectionReservationOnSettingsWithTx(t *testing.T) { query := "create table temp(c_date datetime default '0000-00-00')" setting := "set sql_mode='TRADITIONAL'" - _, err := client.ReserveBeginExecute(query, []string{setting}, nil, nil) - require.Error(t, err, "create table should have failed with TRADITIONAL mode") - require.Contains(t, err.Error(), "Invalid default value") + _, err := client.ReserveExecute(query, []string{setting}, nil) + require.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode") assert.Zero(t, client.ReservedID()) } @@ -297,12 +292,6 @@ func TestTempTableOnReserveExecute(t *testing.T) { require.NoError(t, client.Release()) - _, err = client.ReserveBeginExecute(tempTblQuery, nil, nil, nil) - require.NoError(t, err) - assert.NotZero(t, client.ReservedID()) - require.NoError(t, - client.Release()) - // drop the table _, err = client.Execute("drop table if exists temp", nil) require.NoError(t, err) @@ -318,13 +307,6 @@ func TestTempTableOnReserveExecute(t *testing.T) { assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables") require.NoError(t, client.Release()) - - _, err = client.ReserveBeginExecute(tempTblQuery, []string{setting}, nil, nil) - require.Error(t, err, "create table should have failed with TRADITIONAL mode") - require.Contains(t, err.Error(), "Invalid default value") - assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables") - require.NoError(t, - client.Release()) } func TestInfiniteSessions(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index e89dee889dc..1318f2b90ab 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -29,12 +29,14 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/pools/smartconnpool" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/callinfo" "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/tableacl" @@ -45,10 +47,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" eschema "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" ) // QueryExecutor is used for executing a query request. @@ -192,8 +191,10 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { return qr, nil case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush, p.PlanSavepoint, p.PlanRelease, p.PlanSRollback: return qre.execOther() - case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanDDL, p.PlanLoad: + case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanLoad: return qre.execAutocommit(qre.txConnExec) + case p.PlanDDL: + return qre.execDDL(nil) case p.PlanUpdateLimit, p.PlanDeleteLimit: return qre.execAsTransaction(qre.txConnExec) case p.PlanCallProc: @@ -538,7 +539,7 @@ func (qre *QueryExecutor) checkAccess(authorized *tableacl.ACLResult, tableName return nil } -func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, error) { +func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (result *sqltypes.Result, err error) { // Let's see if this is a normal DDL statement or an Online DDL statement. // An Online DDL statement is identified by /*vt+ .. */ comment with expected directives, like uuid etc. if onlineDDL, err := schema.OnlineDDLFromCommentedStatement(qre.plan.FullStmt); err == nil { @@ -549,6 +550,21 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e } } + if conn == nil { + conn, err = qre.tsv.te.txPool.createConn(qre.ctx, qre.options, qre.setting) + if err != nil { + return nil, err + } + defer conn.Release(tx.ConnRelease) + } + + // A DDL statement should commit the current transaction in the VTGate. + // The change was made in PR: https://github.com/vitessio/vitess/pull/14110 in v18. + // DDL statement received by vttablet will be outside of a transaction. + if conn.txProps != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL statement executed inside a transaction") + } + isTemporaryTable := false if ddlStmt, ok := qre.plan.FullStmt.(sqlparser.DDLStatement); ok { isTemporaryTable = ddlStmt.IsTemporary() @@ -580,19 +596,7 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e return nil, err } } - result, err := qre.execStatefulConn(conn, sql, true) - if err != nil { - return nil, err - } - // Only perform this operation when the connection has transaction open. - // TODO: This actually does not retain the old transaction. We should see how to provide correct behaviour to client. - if conn.txProps != nil { - err = qre.BeginAgain(qre.ctx, conn) - if err != nil { - return nil, err - } - } - return result, nil + return qre.execStatefulConn(conn, sql, true) } func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result, error) { @@ -603,20 +607,6 @@ func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result, return result, nil } -// BeginAgain commits the existing transaction and begins a new one -func (*QueryExecutor) BeginAgain(ctx context.Context, dc *StatefulConnection) error { - if dc.IsClosed() || dc.TxProperties().Autocommit { - return nil - } - if _, err := dc.Exec(ctx, "commit", 1, false); err != nil { - return err - } - if _, err := dc.Exec(ctx, "begin", 1, false); err != nil { - return err - } - return nil -} - func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) { env := evalengine.NewExpressionEnv(qre.ctx, qre.bindVars, evalengine.NewEmptyVCursor(qre.tsv.Environment(), time.Local)) result, err := env.Evaluate(qre.plan.NextCount) diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index cc72c629ddb..78daad2e616 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -87,7 +87,8 @@ func TestQueryExecutorPlans(t *testing.T) { // If empty, then we should expect the same as logWant. inTxWant string // errorWant is the error we expect to get, if any, and should be nil if no error should be returned - errorWant error + errorWant string + onlyInTxErr bool // TxThrottler allows the test case to override the transaction throttler txThrottler txthrottler.TxThrottler }{{ @@ -196,9 +197,11 @@ func TestQueryExecutorPlans(t *testing.T) { query: "alter table test_table add column zipcode int", result: dmlResult, }}, - resultWant: dmlResult, - planWant: "DDL", - logWant: "alter table test_table add column zipcode int", + resultWant: dmlResult, + planWant: "DDL", + logWant: "alter table test_table add column zipcode int", + onlyInTxErr: true, + errorWant: "DDL statement executed inside a transaction", }, { input: "savepoint a", dbResponses: []dbResponse{{ @@ -215,20 +218,24 @@ func TestQueryExecutorPlans(t *testing.T) { query: "alter table `user` add key a (id)", result: emptyResult, }}, - resultWant: emptyResult, - planWant: "DDL", - logWant: "alter table `user` add key a (id)", - inTxWant: "alter table `user` add key a (id)", + resultWant: emptyResult, + planWant: "DDL", + logWant: "alter table `user` add key a (id)", + inTxWant: "alter table `user` add key a (id)", + onlyInTxErr: true, + errorWant: "DDL statement executed inside a transaction", }, { input: "create index a on user(id1 + id2)", dbResponses: []dbResponse{{ query: "create index a on user(id1 + id2)", result: emptyResult, }}, - resultWant: emptyResult, - planWant: "DDL", - logWant: "create index a on user(id1 + id2)", - inTxWant: "create index a on user(id1 + id2)", + resultWant: emptyResult, + planWant: "DDL", + logWant: "create index a on user(id1 + id2)", + inTxWant: "create index a on user(id1 + id2)", + onlyInTxErr: true, + errorWant: "DDL statement executed inside a transaction", }, { input: "ROLLBACK work to SAVEPOINT a", dbResponses: []dbResponse{{ @@ -282,7 +289,7 @@ func TestQueryExecutorPlans(t *testing.T) { query: "update test_table set a = 1 limit 10001", result: dmlResult, }}, - errorWant: errTxThrottled, + errorWant: "Transaction throttled", txThrottler: &mockTxThrottler{true}, }, { input: "update test_table set a=1", @@ -291,7 +298,7 @@ func TestQueryExecutorPlans(t *testing.T) { query: "update test_table set a = 1 limit 10001", result: dmlResult, }}, - errorWant: errTxThrottled, + errorWant: "Transaction throttled", txThrottler: &mockTxThrottler{true}, }, } @@ -315,13 +322,13 @@ func TestQueryExecutorPlans(t *testing.T) { // Test outside a transaction. qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0) got, err := qre.Execute() - if tcase.errorWant == nil { + if tcase.errorWant != "" && !tcase.onlyInTxErr { + assert.EqualError(t, err, tcase.errorWant) + } else { require.NoError(t, err, tcase.input) assert.Equal(t, tcase.resultWant, got, tcase.input) assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input) assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input) - } else { - assert.True(t, vterrors.Equals(err, tcase.errorWant)) } // Wait for the existing query to be processed by the cache time.Sleep(100 * time.Millisecond) @@ -329,25 +336,29 @@ func TestQueryExecutorPlans(t *testing.T) { // Test inside a transaction. target := tsv.sm.Target() state, err := tsv.Begin(ctx, target, nil) - if tcase.errorWant == nil { - require.NoError(t, err) - require.NotNil(t, state.TabletAlias, "alias should not be nil") - assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin") - defer tsv.Commit(ctx, target, state.TransactionID) - - qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID) - got, err = qre.Execute() - require.NoError(t, err, tcase.input) - assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input) - assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input) - want := tcase.logWant - if tcase.inTxWant != "" { - want = tcase.inTxWant - } - assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input) - } else { - assert.True(t, vterrors.Equals(err, tcase.errorWant)) + if tcase.errorWant != "" && !tcase.onlyInTxErr { + require.EqualError(t, err, tcase.errorWant) + return + } + require.NoError(t, err) + require.NotNil(t, state.TabletAlias, "alias should not be nil") + assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin") + defer tsv.Commit(ctx, target, state.TransactionID) + + qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID) + got, err = qre.Execute() + if tcase.onlyInTxErr { + require.EqualError(t, err, tcase.errorWant) + return } + require.NoError(t, err, tcase.input) + assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input) + assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input) + want := tcase.logWant + if tcase.inTxWant != "" { + want = tcase.inTxWant + } + assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input) }) } }