diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index efef24ff9fe..97dc035f461 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -85,7 +85,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { } for _, query := range conn.TxProperties().Queries { - qr := dte.qe.queryRuleSources.FilterByPlan(query, 0) + qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...) if qr != nil { act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{}) if act != rules.QRContinue { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 02b8dd9171a..70d9f67d0a2 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -670,7 +670,6 @@ func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) { newLast += cache } query = fmt.Sprintf("update %s set next_id = %d where id = 0", sqlparser.String(tableName), newLast) - conn.TxProperties().RecordQuery(query) _, err = qre.execStatefulConn(conn, query, false) if err != nil { return nil, err @@ -807,7 +806,7 @@ func (qre *QueryExecutor) txFetch(conn *StatefulConnection, record bool) (*sqlty } // Only record successful queries. if record { - conn.TxProperties().RecordQuery(sql) + conn.TxProperties().RecordQuery(sql, qre.plan.PlanID, qre.plan.TableNames()) } return qr, nil } diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index b3c5ab628c3..ffb4bae39a5 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -169,7 +169,7 @@ func (tpc *TwoPC) Close() { } // SaveRedo saves the statements in the redo log using the supplied connection. -func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid string, queries []string) error { +func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid string, queries []tx.Query) error { bindVars := map[string]*querypb.BindVariable{ "dtid": sqltypes.StringBindVariable(dtid), "state": sqltypes.Int64BindVariable(RedoStatePrepared), @@ -185,7 +185,7 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid s rows[i] = []sqltypes.Value{ sqltypes.NewVarBinary(dtid), sqltypes.NewInt64(int64(i + 1)), - sqltypes.NewVarBinary(query), + sqltypes.NewVarBinary(query.Sql), } } extras := map[string]sqlparser.Encodable{ diff --git a/go/vt/vttablet/tabletserver/tx/api.go b/go/vt/vttablet/tabletserver/tx/api.go index a392e530ffa..a52357c3cb8 100644 --- a/go/vt/vttablet/tabletserver/tx/api.go +++ b/go/vt/vttablet/tabletserver/tx/api.go @@ -25,17 +25,18 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" ) type ( // ConnID as type int64 ConnID = int64 - //DTID as type string + // DTID as type string DTID = string - //EngineStateMachine is used to control the state the transactional engine - - //whether new connections and/or transactions are allowed or not. + // EngineStateMachine is used to control the state the transactional engine - + // whether new connections and/or transactions are allowed or not. EngineStateMachine interface { Init() error AcceptReadWrite() error @@ -46,14 +47,14 @@ type ( // ReleaseReason as type int ReleaseReason int - //Properties contains all information that is related to the currently running - //transaction on the connection + // Properties contains all information that is related to the currently running + // transaction on the connection Properties struct { EffectiveCaller *vtrpcpb.CallerID ImmediateCaller *querypb.VTGateCallerID StartTime time.Time EndTime time.Time - Queries []string + Queries []Query Autocommit bool Conclusion string LogToFile bool @@ -62,6 +63,12 @@ type ( } ) +type Query struct { + Sql string + PlanType planbuilder.PlanType + Tables []string +} + const ( // TxClose - connection released on close. TxClose ReleaseReason = iota @@ -115,11 +122,15 @@ var txNames = map[ReleaseReason]string{ } // RecordQuery records the query against this transaction. -func (p *Properties) RecordQuery(query string) { +func (p *Properties) RecordQuery(query string, planType planbuilder.PlanType, tables []string) { if p == nil { return } - p.Queries = append(p.Queries, query) + p.Queries = append(p.Queries, Query{ + Sql: query, + PlanType: planType, + Tables: tables, + }) } // InTransaction returns true as soon as this struct is not nil @@ -134,10 +145,11 @@ func (p *Properties) String(sanitize bool, parser *sqlparser.Parser) string { printQueries := func() string { sb := strings.Builder{} for _, query := range p.Queries { + sql := query.Sql if sanitize { - query, _ = parser.RedactSQLQuery(query) + sql, _ = parser.RedactSQLQuery(sql) } - sb.WriteString(query) + sb.WriteString(sql) sb.WriteString(";") } return sb.String() diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index ea4e0b1e41d..8ad4c359456 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -427,7 +427,6 @@ outer: continue } for _, stmt := range preparedTx.Queries { - conn.TxProperties().RecordQuery(stmt) _, err := conn.Exec(ctx, stmt, 1, false) if err != nil { allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) diff --git a/go/vt/vttablet/tabletserver/txlogz_test.go b/go/vt/vttablet/tabletserver/txlogz_test.go index 319669a0023..8faec74d07b 100644 --- a/go/vt/vttablet/tabletserver/txlogz_test.go +++ b/go/vt/vttablet/tabletserver/txlogz_test.go @@ -60,7 +60,7 @@ func testHandler(req *http.Request, t *testing.T) { ImmediateCaller: callerid.NewImmediateCallerID("immediate-caller"), StartTime: time.Now(), Conclusion: "unknown", - Queries: []string{"select * from test"}, + Queries: []tx.Query{{Sql: "select * from test"}}, }, } txConn.txProps.EndTime = txConn.txProps.StartTime