Skip to content

Commit

Permalink
chore(cli): set default timeout for cli commands (#5021)
Browse files Browse the repository at this point in the history
* chore(cli): set default timeout for cli commands

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: update comments

* fix: treats `None` as `0s` to disable server-side default timeout

* chore: update comments
  • Loading branch information
WenyXu authored Nov 20, 2024
1 parent 6a958e2 commit 027284e
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 15 deletions.
15 changes: 7 additions & 8 deletions src/cmd/src/cli/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ pub(crate) struct DatabaseClient {
addr: String,
catalog: String,
auth_header: Option<String>,
timeout: Option<Duration>,
timeout: Duration,
}

impl DatabaseClient {
pub fn new(
addr: String,
catalog: String,
auth_basic: Option<String>,
timeout: Option<Duration>,
timeout: Duration,
) -> Self {
let auth_header = if let Some(basic) = auth_basic {
let encoded = general_purpose::STANDARD.encode(basic);
Expand Down Expand Up @@ -73,12 +73,11 @@ impl DatabaseClient {
if let Some(ref auth) = self.auth_header {
request = request.header("Authorization", auth);
}
if let Some(ref timeout) = self.timeout {
request = request.header(
GREPTIME_DB_HEADER_TIMEOUT,
format_duration(*timeout).to_string(),
);
}

request = request.header(
GREPTIME_DB_HEADER_TIMEOUT,
format_duration(self.timeout).to_string(),
);

let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
reason: format!("bad url: {}", url),
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub struct ExportCommand {
auth_basic: Option<String>,

/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
Expand All @@ -98,7 +101,8 @@ impl ExportCommand {
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
self.timeout,
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);

Ok(Instance::new(
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/src/cli/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ pub struct ImportCommand {
auth_basic: Option<String>,

/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
Expand All @@ -82,7 +85,8 @@ impl ImportCommand {
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
self.timeout,
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);

Ok(Instance::new(
Expand Down
23 changes: 23 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,29 @@ mod test {
assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
let elapsed = now.elapsed();
assert!(elapsed > Duration::from_millis(15));

tokio::time::timeout(
Duration::from_millis(15),
client
.get("/test/timeout")
.header(GREPTIME_DB_HEADER_TIMEOUT, "0s")
.send(),
)
.await
.unwrap_err();

tokio::time::timeout(
Duration::from_millis(15),
client
.get("/test/timeout")
.header(
GREPTIME_DB_HEADER_TIMEOUT,
humantime::format_duration(Duration::default()).to_string(),
)
.send(),
)
.await
.unwrap_err();
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl RequestBuilder {
/// This is convenient for tests where panics are what you want. For access to
/// non-panicking versions or the complete `Response` API use `into_inner()` or
/// `as_ref()`.
#[derive(Debug)]
pub struct TestResponse {
response: reqwest::Response,
}
Expand Down
18 changes: 13 additions & 5 deletions src/servers/src/http/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use axum::body::Body;
use axum::http::Request;
use axum::response::Response;
use pin_project::pin_project;
use tokio::time::Sleep;
use tokio::time::{Instant, Sleep};
use tower::timeout::error::Elapsed;
use tower::{BoxError, Layer, Service};

Expand Down Expand Up @@ -128,17 +128,25 @@ where
}

fn call(&mut self, request: Request<Body>) -> Self::Future {
let user_timeout = request
let timeout = request
.headers()
.get(GREPTIME_DB_HEADER_TIMEOUT)
.and_then(|value| {
value
.to_str()
.ok()
.and_then(|value| humantime::parse_duration(value).ok())
});
})
.unwrap_or(self.default_timeout);
let response = self.inner.call(request);
let sleep = tokio::time::sleep(user_timeout.unwrap_or(self.default_timeout));
ResponseFuture::new(response, sleep)

if timeout.is_zero() {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
ResponseFuture::new(response, tokio::time::sleep_until(far_future))
} else {
let sleep = tokio::time::sleep(timeout);
ResponseFuture::new(response, sleep)
}
}
}

0 comments on commit 027284e

Please sign in to comment.