Skip to content

Commit

Permalink
feat: use authoritative timeout from execute options if provided
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 9, 2024
1 parent f598f70 commit 5a693ff
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c
tsv.exporter.NewGaugesFuncWithMultiLabels("TabletServerState", "Tablet server state labeled by state name", []string{"name"}, func() map[string]int64 {
return map[string]int64{tsv.sm.IsServingString(): 1}
})
tsv.exporter.NewGaugeDurationFunc("QueryTimeout", "Tablet server query timeout", tsv.loadQueryTimeout)
tsv.exporter.NewGaugeDurationFunc("QueryTimeout", "Tablet server query timeout", func() time.Duration {
return time.Duration(tsv.QueryTimeout.Load())
})

tsv.registerHealthzHealthHandler()
tsv.registerDebugHealthHandler()
Expand All @@ -237,6 +239,35 @@ func (tsv *TabletServer) loadQueryTimeout() time.Duration {
return time.Duration(tsv.QueryTimeout.Load())
}

func (tsv *TabletServer) loadQueryTimeoutWithTx(txID int64, options *querypb.ExecuteOptions) time.Duration {
timeout := tsv.loadQueryTimeoutWithOptions(options)

if txID == 0 {
return timeout
}

// fetch the transaction timeout.
txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP)

// Use the smaller of the two values (0 means infinity).
return smallerTimeout(timeout, txTimeout)
}

func (tsv *TabletServer) loadQueryTimeoutWithOptions(options *querypb.ExecuteOptions) time.Duration {
authoritativeTimeout := loadAuthoritativeTimeout(options)
if authoritativeTimeout >= 0 {
return authoritativeTimeout
}
return time.Duration(tsv.QueryTimeout.Load())
}

func loadAuthoritativeTimeout(options *querypb.ExecuteOptions) time.Duration {
if options == nil || options.GetTimeout() == nil {
return -1
}
return time.Duration(options.GetAuthoritativeTimeout())
}

// onlineDDLExecutorToggleTableBuffer is called by onlineDDLExecutor as a callback function. onlineDDLExecutor
// uses it to start/stop query buffering for a given table.
// It is onlineDDLExecutor's responsibility to make sure buffering is stopped after some definite amount of time.
Expand Down Expand Up @@ -489,7 +520,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti
func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, savepointQueries []string, reservedID int64, settings []string, options *querypb.ExecuteOptions) (state queryservice.TransactionState, err error) {
state.TabletAlias = tsv.alias
err = tsv.execRequest(
ctx, tsv.loadQueryTimeout(),
ctx, tsv.loadQueryTimeoutWithOptions(options),
"Begin", "begin", nil,
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
Expand Down Expand Up @@ -775,17 +806,8 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
}

func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, reservedID int64, settings []string, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) {
allowOnShutdown := false
timeout := tsv.loadQueryTimeout()
if transactionID != 0 {
allowOnShutdown = true
// Execute calls happen for OLTP only, so we can directly fetch the
// OLTP TX timeout.
txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP)
// Use the smaller of the two values (0 means infinity).
// TODO(sougou): Assign deadlines to each transaction and set query timeout accordingly.
timeout = smallerTimeout(timeout, txTimeout)
}
allowOnShutdown := transactionID != 0
timeout := tsv.loadQueryTimeoutWithTx(transactionID, options)
err = tsv.execRequest(
ctx, timeout,
"Execute", sql, bindVariables,
Expand Down Expand Up @@ -1002,7 +1024,7 @@ func (tsv *TabletServer) beginWaitForSameRangeTransactions(ctx context.Context,
err := tsv.execRequest(
// Use (potentially longer) -queryserver-config-query-timeout and not
// -queryserver-config-txpool-timeout (defaults to 1s) to limit the waiting.
ctx, tsv.loadQueryTimeout(),
ctx, tsv.loadQueryTimeoutWithOptions(options),
"", "waitForSameRangeTransactions", nil,
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
Expand Down Expand Up @@ -1221,7 +1243,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
state.TabletAlias = tsv.alias

err = tsv.execRequest(
ctx, tsv.loadQueryTimeout(),
ctx, tsv.loadQueryTimeoutWithOptions(options),
"ReserveBegin", "begin", bindVariables,
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
Expand Down Expand Up @@ -1286,16 +1308,8 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
// needs a reserved connection to execute the query.
state.TabletAlias = tsv.alias

allowOnShutdown := false
timeout := tsv.loadQueryTimeout()
if transactionID != 0 {
allowOnShutdown = true
// ReserveExecute is for OLTP only, so we can directly fetch the OLTP
// TX timeout.
txTimeout := tsv.config.TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP)
// Use the smaller of the two values (0 means infinity).
timeout = smallerTimeout(timeout, txTimeout)
}
allowOnShutdown := transactionID != 0
timeout := tsv.loadQueryTimeoutWithTx(transactionID, options)

err = tsv.execRequest(
ctx, timeout,
Expand Down

0 comments on commit 5a693ff

Please sign in to comment.