diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 840d5920a4d..a7a96b8199d 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -217,7 +217,9 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c tsv.exporter.NewGaugesFuncWithMultiLabels("TabletServerState", "Tablet server state labeled by state name", []string{"name"}, func() map[string]int64 { return map[string]int64{tsv.sm.IsServingString(): 1} }) - tsv.exporter.NewGaugeDurationFunc("QueryTimeout", "Tablet server query timeout", tsv.loadQueryTimeout) + tsv.exporter.NewGaugeDurationFunc("QueryTimeout", "Tablet server query timeout", func() time.Duration { + return time.Duration(tsv.QueryTimeout.Load()) + }) tsv.registerHealthzHealthHandler() tsv.registerDebugHealthHandler() @@ -237,6 +239,35 @@ func (tsv *TabletServer) loadQueryTimeout() time.Duration { return time.Duration(tsv.QueryTimeout.Load()) } +func (tsv *TabletServer) loadQueryTimeoutWithTx(txID int64, options *querypb.ExecuteOptions) time.Duration { + timeout := tsv.loadQueryTimeoutWithOptions(options) + + if txID == 0 { + return timeout + } + + // fetch the transaction timeout. + txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP) + + // Use the smaller of the two values (0 means infinity). + return smallerTimeout(timeout, txTimeout) +} + +func (tsv *TabletServer) loadQueryTimeoutWithOptions(options *querypb.ExecuteOptions) time.Duration { + authoritativeTimeout := loadAuthoritativeTimeout(options) + if authoritativeTimeout >= 0 { + return authoritativeTimeout + } + return time.Duration(tsv.QueryTimeout.Load()) +} + +func loadAuthoritativeTimeout(options *querypb.ExecuteOptions) time.Duration { + if options == nil || options.GetTimeout() == nil { + return -1 + } + return time.Duration(options.GetAuthoritativeTimeout()) +} + // onlineDDLExecutorToggleTableBuffer is called by onlineDDLExecutor as a callback function. onlineDDLExecutor // uses it to start/stop query buffering for a given table. // It is onlineDDLExecutor's responsibility to make sure buffering is stopped after some definite amount of time. @@ -489,7 +520,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, savepointQueries []string, reservedID int64, settings []string, options *querypb.ExecuteOptions) (state queryservice.TransactionState, err error) { state.TabletAlias = tsv.alias err = tsv.execRequest( - ctx, tsv.loadQueryTimeout(), + ctx, tsv.loadQueryTimeoutWithOptions(options), "Begin", "begin", nil, target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { @@ -775,17 +806,8 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq } func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, reservedID int64, settings []string, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) { - allowOnShutdown := false - timeout := tsv.loadQueryTimeout() - if transactionID != 0 { - allowOnShutdown = true - // Execute calls happen for OLTP only, so we can directly fetch the - // OLTP TX timeout. - txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP) - // Use the smaller of the two values (0 means infinity). - // TODO(sougou): Assign deadlines to each transaction and set query timeout accordingly. - timeout = smallerTimeout(timeout, txTimeout) - } + allowOnShutdown := transactionID != 0 + timeout := tsv.loadQueryTimeoutWithTx(transactionID, options) err = tsv.execRequest( ctx, timeout, "Execute", sql, bindVariables, @@ -1002,7 +1024,7 @@ func (tsv *TabletServer) beginWaitForSameRangeTransactions(ctx context.Context, err := tsv.execRequest( // Use (potentially longer) -queryserver-config-query-timeout and not // -queryserver-config-txpool-timeout (defaults to 1s) to limit the waiting. - ctx, tsv.loadQueryTimeout(), + ctx, tsv.loadQueryTimeoutWithOptions(options), "", "waitForSameRangeTransactions", nil, target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { @@ -1221,7 +1243,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp state.TabletAlias = tsv.alias err = tsv.execRequest( - ctx, tsv.loadQueryTimeout(), + ctx, tsv.loadQueryTimeoutWithOptions(options), "ReserveBegin", "begin", bindVariables, target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { @@ -1286,16 +1308,8 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar // needs a reserved connection to execute the query. state.TabletAlias = tsv.alias - allowOnShutdown := false - timeout := tsv.loadQueryTimeout() - if transactionID != 0 { - allowOnShutdown = true - // ReserveExecute is for OLTP only, so we can directly fetch the OLTP - // TX timeout. - txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP) - // Use the smaller of the two values (0 means infinity). - timeout = smallerTimeout(timeout, txTimeout) - } + allowOnShutdown := transactionID != 0 + timeout := tsv.loadQueryTimeoutWithTx(transactionID, options) err = tsv.execRequest( ctx, timeout,