From 55fc90a963dd18411713f2257c91be3bcf7b1286 Mon Sep 17 00:00:00 2001 From: yongman Date: Tue, 9 Aug 2022 11:41:47 +0800 Subject: [PATCH 1/7] Update toolchain and fix clippy Signed-off-by: yongman --- rust-toolchain | 2 +- src/bin/server.rs | 27 ++++++++++++--------------- src/client.rs | 4 ++-- src/cmd/zcount.rs | 8 ++------ src/cmd/zincrby.rs | 2 +- src/cmd/zrangebyscore.rs | 16 ++++------------ src/connection.rs | 4 ++-- src/db.rs | 1 + src/gc.rs | 23 ++++++++++------------- src/parse.rs | 2 +- src/server.rs | 4 ++-- src/tikv/hash.rs | 10 ++++------ src/tikv/list.rs | 12 ++++-------- src/tikv/lua.rs | 4 ++-- src/tikv/set.rs | 10 ++++------ src/tikv/string.rs | 37 +++++++++++++++---------------------- src/tikv/zset.rs | 10 ++++------ 17 files changed, 71 insertions(+), 105 deletions(-) diff --git a/rust-toolchain b/rust-toolchain index b91c1b1..335cdaf 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2022-05-01 +nightly-2022-07-27 diff --git a/src/bin/server.rs b/src/bin/server.rs index ef595bd..e29d35e 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -24,21 +24,18 @@ pub async fn main() -> tikv_service::Result<()> { let mut config: Option = None; - match cli.config { - Some(config_file_name) => { - let config_content = - fs::read_to_string(config_file_name).expect("Failed to read config file"); - - // deserialize toml config - config = match toml::from_str(&config_content) { - Ok(d) => Some(d), - Err(e) => { - println!("Unable to load config file {}", e); - exit(1); - } - }; - } - None => (), + if let Some(config_file_name) = cli.config { + let config_content = + fs::read_to_string(config_file_name).expect("Failed to read config file"); + + // deserialize toml config + config = match toml::from_str(&config_content) { + Ok(d) => Some(d), + Err(e) => { + println!("Unable to load config file {}", e); + exit(1); + } + }; }; match &config { diff --git a/src/client.rs b/src/client.rs index 704f743..f14bdd0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -35,8 +35,8 @@ impl Client { name: "".to_owned(), fd: socket.as_raw_fd(), cmd: "".to_owned(), - local_addr: (&socket).local_addr().unwrap().to_string(), - peer_addr: (&socket).peer_addr().unwrap().to_string(), + local_addr: socket.local_addr().unwrap().to_string(), + peer_addr: socket.peer_addr().unwrap().to_string(), create_time: now, last_interaction: now, kill_tx, diff --git a/src/cmd/zcount.rs b/src/cmd/zcount.rs index dfb60da..c10cd48 100644 --- a/src/cmd/zcount.rs +++ b/src/cmd/zcount.rs @@ -48,18 +48,14 @@ impl Zcount { bmin.advance(1); min_inclusive = false; } - let min = String::from_utf8_lossy(&bmin.to_vec()) - .parse::() - .unwrap(); + let min = String::from_utf8_lossy(&bmin).parse::().unwrap(); let mut bmax = parse.next_bytes()?; if bmax[0] == b'(' { bmax.advance(1); max_inclusive = false; } - let max = String::from_utf8_lossy(&bmax.to_vec()) - .parse::() - .unwrap(); + let max = String::from_utf8_lossy(&bmax).parse::().unwrap(); let z = Zcount::new(&key, min, min_inclusive, max, max_inclusive); diff --git a/src/cmd/zincrby.rs b/src/cmd/zincrby.rs index cb5ff74..7cf717c 100644 --- a/src/cmd/zincrby.rs +++ b/src/cmd/zincrby.rs @@ -36,7 +36,7 @@ impl Zincrby { let step_byte = parse.next_bytes()?; let member = parse.next_string()?; - let step = String::from_utf8_lossy(&step_byte.to_vec()).parse::()?; + let step = String::from_utf8_lossy(&step_byte).parse::()?; Ok(Zincrby::new(&key, step, &member)) } diff --git a/src/cmd/zrangebyscore.rs b/src/cmd/zrangebyscore.rs index 5439982..a61a358 100644 --- a/src/cmd/zrangebyscore.rs +++ b/src/cmd/zrangebyscore.rs @@ -66,9 +66,7 @@ impl Zrangebyscore { } if min == 0f64 { - min = String::from_utf8_lossy(&bmin.to_vec()) - .parse::() - .unwrap(); + min = String::from_utf8_lossy(&bmin).parse::().unwrap(); } let mut bmax = parse.next_bytes()?; @@ -82,9 +80,7 @@ impl Zrangebyscore { } if max == 0f64 { - max = String::from_utf8_lossy(&bmax.to_vec()) - .parse::() - .unwrap(); + max = String::from_utf8_lossy(&bmax).parse::().unwrap(); } let mut withscores = false; @@ -128,9 +124,7 @@ impl Zrangebyscore { } if min == 0f64 { - min = String::from_utf8_lossy(&bmin.to_vec()) - .parse::() - .unwrap(); + min = String::from_utf8_lossy(&bmin).parse::().unwrap(); } let mut bmax = argv[2].clone(); @@ -144,9 +138,7 @@ impl Zrangebyscore { } if max == 0f64 { - max = String::from_utf8_lossy(&bmax.to_vec()) - .parse::() - .unwrap(); + max = String::from_utf8_lossy(&bmax).parse::().unwrap(); } let mut withscores = false; diff --git a/src/connection.rs b/src/connection.rs index ef9929c..578e648 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -46,8 +46,8 @@ impl Connection { pub fn new(socket: TcpStream) -> Connection { Connection { tls: false, - local_addr: (&socket).local_addr().unwrap().to_string(), - peer_addr: (&socket).peer_addr().unwrap().to_string(), + local_addr: socket.local_addr().unwrap().to_string(), + peer_addr: socket.peer_addr().unwrap().to_string(), w: Some(BufWriter::new(socket.clone())), r: Some(BufReader::new(socket)), diff --git a/src/db.rs b/src/db.rs index 04b2847..4667e53 100644 --- a/src/db.rs +++ b/src/db.rs @@ -254,6 +254,7 @@ impl Db { /// /// The returned `Receiver` is used to receive values broadcast by `PUBLISH` /// commands. + #[allow(clippy::significant_drop_in_scrutinee)] pub(crate) fn subscribe(&self, key: String) -> broadcast::Receiver { use std::collections::hash_map::Entry; diff --git a/src/gc.rs b/src/gc.rs index 4c626fd..989fa6b 100644 --- a/src/gc.rs +++ b/src/gc.rs @@ -315,20 +315,17 @@ impl GcWorker { // also delete gc key if version in gc key is same as task.version let gc_key = KEY_ENCODER.encode_txnkv_gc_key(&user_key); let version = task.version; - match txn.get(gc_key.clone()).await? { - Some(v) => { - let ver = u16::from_be_bytes(v[..2].try_into().unwrap()); - if ver == version { - debug!( - LOGGER, - "[GC] clean gc key for user key {} with version {}", - user_key, - version - ); - txn.delete(gc_key).await?; - } + if let Some(v) = txn.get(gc_key.clone()).await? { + let ver = u16::from_be_bytes(v[..2].try_into().unwrap()); + if ver == version { + debug!( + LOGGER, + "[GC] clean gc key for user key {} with version {}", + user_key, + version + ); + txn.delete(gc_key).await?; } - None => {} } Ok(()) } diff --git a/src/parse.rs b/src/parse.rs index d2ac27f..08bc18b 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -114,7 +114,7 @@ impl Parse { Err(_) => Err(MSG.into()), }, //Frame::Bulk(data) => atoi::(&data).ok_or_else(|| MSG.into()), - Frame::Bulk(data) => match String::from_utf8_lossy(&data.to_vec()).parse::() { + Frame::Bulk(data) => match String::from_utf8_lossy(&data).parse::() { Ok(value) => Ok(value), Err(_) => Err(MSG.into()), }, diff --git a/src/server.rs b/src/server.rs index c6e3dda..b953762 100644 --- a/src/server.rs +++ b/src/server.rs @@ -543,8 +543,8 @@ impl TlsListener { .await .insert(client_id, arc_client.clone()); - let local_addr = (&stream).local_addr().unwrap().to_string(); - let peer_addr = (&stream).peer_addr().unwrap().to_string(); + let local_addr = stream.local_addr().unwrap().to_string(); + let peer_addr = stream.peer_addr().unwrap().to_string(); // start tls handshake let handshake = acceptor.accept(stream); diff --git a/src/tikv/hash.rs b/src/tikv/hash.rs index 7c1412f..3fc2f57 100644 --- a/src/tikv/hash.rs +++ b/src/tikv/hash.rs @@ -775,7 +775,7 @@ impl<'a> HashCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_arc| { async move { if self.txn.is_none() { @@ -831,8 +831,7 @@ impl<'a> HashCommandCtx { } .boxed() }) - .await; - resp + .await } pub async fn do_async_txnkv_hash_expire_if_needed(mut self, key: &str) -> AsyncResult { @@ -840,7 +839,7 @@ impl<'a> HashCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_arc| { async move { if self.txn.is_none() { @@ -902,7 +901,6 @@ impl<'a> HashCommandCtx { } .boxed() }) - .await; - resp + .await } } diff --git a/src/tikv/list.rs b/src/tikv/list.rs index 5196617..bf47f98 100644 --- a/src/tikv/list.rs +++ b/src/tikv/list.rs @@ -951,7 +951,7 @@ impl<'a> ListCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -996,9 +996,7 @@ impl<'a> ListCommandCtx { } .boxed() }) - .await; - - resp + .await } pub async fn do_async_txnkv_list_expire_if_needed(mut self, key: &str) -> AsyncResult { @@ -1006,7 +1004,7 @@ impl<'a> ListCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -1058,8 +1056,6 @@ impl<'a> ListCommandCtx { } .boxed() }) - .await; - - resp + .await } } diff --git a/src/tikv/lua.rs b/src/tikv/lua.rs index 7481e8e..931b8fa 100644 --- a/src/tikv/lua.rs +++ b/src/tikv/lua.rs @@ -64,7 +64,7 @@ impl<'a> LuaCommandCtx<'a> { let txn_rc = txn_rc.clone(); // package arguments(without cmd) to argv async move { - if (&args).len() == 0 { + if args.len() == 0 { let table = _lua.create_table().unwrap(); table.raw_set("err", "Invalid arguments, please specify at least one argument for redis.call()").unwrap(); return Ok(LuaValue::Table(table)); @@ -242,7 +242,7 @@ impl<'a> LuaCommandCtx<'a> { match script { Some(script) => Ok(self .clone() - .do_async_eval_inner(&String::from_utf8_lossy(&script.to_vec()), keys, args) + .do_async_eval_inner(&String::from_utf8_lossy(&script), keys, args) .await?), None => Ok(resp_err(REDIS_NO_MATCHING_SCRIPT_ERR)), } diff --git a/src/tikv/set.rs b/src/tikv/set.rs index 4c0a6b0..08488e0 100644 --- a/src/tikv/set.rs +++ b/src/tikv/set.rs @@ -695,7 +695,7 @@ impl SetCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -751,8 +751,7 @@ impl SetCommandCtx { } .boxed() }) - .await; - resp + .await } pub async fn do_async_txnkv_set_expire_if_needed(mut self, key: &str) -> AsyncResult { @@ -760,7 +759,7 @@ impl SetCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -820,7 +819,6 @@ impl SetCommandCtx { } .boxed() }) - .await; - resp + .await } } diff --git a/src/tikv/string.rs b/src/tikv/string.rs index 0b7c538..ef07bca 100644 --- a/src/tikv/string.rs +++ b/src/tikv/string.rs @@ -174,7 +174,7 @@ impl StringCommandCtx { pub async fn do_async_rawkv_put(self, key: &str, val: &Bytes) -> AsyncResult { let client = get_client()?; let ekey = KEY_ENCODER.encode_rawkv_string(key); - let _ = client.put(ekey, val.to_vec()).await?; + client.put(ekey, val.to_vec()).await?; Ok(resp_ok()) } @@ -264,7 +264,7 @@ impl StringCommandCtx { pub async fn do_async_rawkv_batch_put(self, kvs: Vec) -> AsyncResult { let client = get_client()?; - let _ = client.batch_put(kvs).await?; + client.batch_put(kvs).await?; Ok(resp_ok()) } @@ -379,19 +379,16 @@ impl StringCommandCtx { for key in &keys { let ekey = KEY_ENCODER.encode_txnkv_string(key); let mut txn = txn_rc.lock().await; - match txn.get(ekey).await? { - Some(v) => { - let ttl = KeyDecoder::decode_key_ttl(&v); - if key_is_expired(ttl) { - drop(txn); - self.clone() - .do_async_txnkv_string_expire_if_needed(key) - .await?; - } else { - cnt += 1; - } + if let Some(v) = txn.get(ekey).await? { + let ttl = KeyDecoder::decode_key_ttl(&v); + if key_is_expired(ttl) { + drop(txn); + self.clone() + .do_async_txnkv_string_expire_if_needed(key) + .await?; + } else { + cnt += 1; } - None => {} } } Ok(resp_int(cnt as i64)) @@ -494,7 +491,7 @@ impl StringCommandCtx { let mut client = get_txn_client()?; let key = key.to_owned(); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -510,16 +507,14 @@ impl StringCommandCtx { } .boxed() }) - .await; - - resp + .await } pub async fn do_async_txnkv_string_expire_if_needed(mut self, key: &str) -> AsyncResult { let mut client = get_txn_client()?; let key = key.to_owned(); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -541,9 +536,7 @@ impl StringCommandCtx { } .boxed() }) - .await; - - resp + .await } pub async fn do_async_txnkv_expire(mut self, key: &str, timestamp: u64) -> AsyncResult { diff --git a/src/tikv/zset.rs b/src/tikv/zset.rs index b24df29..70929bf 100644 --- a/src/tikv/zset.rs +++ b/src/tikv/zset.rs @@ -1336,7 +1336,7 @@ impl ZsetCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -1407,8 +1407,7 @@ impl ZsetCommandCtx { } .boxed() }) - .await; - resp + .await } pub async fn do_async_txnkv_zset_expire_if_needed(mut self, key: &str) -> AsyncResult { @@ -1416,7 +1415,7 @@ impl ZsetCommandCtx { let key = key.to_owned(); let meta_key = KEY_ENCODER.encode_txnkv_meta_key(&key); - let resp = client + client .exec_in_txn(self.txn.clone(), |txn_rc| { async move { if self.txn.is_none() { @@ -1496,7 +1495,6 @@ impl ZsetCommandCtx { } .boxed() }) - .await; - resp + .await } } From 8407e5a624359a7249eb12255602813dc5d58837 Mon Sep 17 00:00:00 2001 From: yongman Date: Tue, 9 Aug 2022 18:12:58 +0800 Subject: [PATCH 2/7] Add more kv client config Signed-off-by: yongman --- config.toml | 11 ++++-- src/config.rs | 93 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 7 ++++ src/tikv/mod.rs | 20 ++++++++--- 4 files changed, 125 insertions(+), 6 deletions(-) diff --git a/config.toml b/config.toml index 2dfe6a8..4a6dfe2 100644 --- a/config.toml +++ b/config.toml @@ -18,10 +18,17 @@ log_file = "tikv-service.log" use_txn_api = true use_async_commit = true try_one_pc_commit = true -use_pessimistic_txn = true -local_pool_number = 4 +use_pessimistic_txn = false +local_pool_number = 10 txn_retry_count = 10 txn_region_backoff_delay_ms = 2 txn_region_backoff_delay_attemps = 5 txn_lock_backoff_delay_ms = 2 txn_lock_backoff_delay_attemps = 5 + +completion_queue_size = 1 +grpc_keepalive_time = 10000 +grpc_keepalive_timeout = 2000 +allow_batch = false +max_batch_wait_time = 10 +max_batch_size = 10 diff --git a/src/config.rs b/src/config.rs index 04eafba..33c927d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,6 +65,16 @@ struct Backend { try_one_pc_commit: Option, use_pessimistic_txn: Option, local_pool_number: Option, + + // kv client config + completion_queue_size: Option, + grpc_keepalive_time: Option, + grpc_keepalive_timeout: Option, + allow_batch: Option, + overload_threshold: Option, + max_batch_wait_time: Option, + max_batch_size: Option, + txn_retry_count: Option, txn_region_backoff_delay_ms: Option, txn_region_backoff_delay_attemps: Option, @@ -722,3 +732,86 @@ pub fn conn_concurrency_or_default() -> usize { // default backend connection concurrency 10 } + +pub fn backend_completion_queue_size_or_default() -> usize { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.completion_queue_size { + return b; + } + } + } + // default backend completion queue size + 1 +} + +pub fn backend_grpc_keepalive_time_or_default() -> u64 { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.grpc_keepalive_time { + return b; + } + } + } + // default backend grpc keepalive time in ms + 10000 +} + +pub fn backend_grpc_keepalive_timeout_or_default() -> u64 { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.grpc_keepalive_timeout { + return b; + } + } + } + // default backend grpc keepalive timeout in ms + 2000 +} + +pub fn backend_allow_batch_or_default() -> bool { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.allow_batch { + return b; + } + } + } + // default backend not allow batch + false +} + +pub fn backend_overload_threshold_or_default() -> usize { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.overload_threshold { + return b; + } + } + } + 100 +} + +pub fn backend_max_batch_wait_time_or_default() -> u64 { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.max_batch_wait_time { + return b; + } + } + } + // default backend max batch wait time in ms + 10 +} + +pub fn backend_max_batch_size_or_default() -> usize { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.max_batch_size { + return b; + } + } + } + // default backend max batch size + 8 +} diff --git a/src/lib.rs b/src/lib.rs index c2833d2..78353d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,9 +85,16 @@ pub use config::async_expire_zset_threshold_or_default; pub use config::async_gc_interval_or_default; pub use config::async_gc_worker_number_or_default; pub use config::async_gc_worker_queue_size_or_default; +pub use config::backend_allow_batch_or_default; pub use config::backend_ca_file_or_default; pub use config::backend_cert_file_or_default; +pub use config::backend_completion_queue_size_or_default; +pub use config::backend_grpc_keepalive_time_or_default; +pub use config::backend_grpc_keepalive_timeout_or_default; pub use config::backend_key_file_or_default; +pub use config::backend_max_batch_size_or_default; +pub use config::backend_max_batch_wait_time_or_default; +pub use config::backend_overload_threshold_or_default; pub use config::backend_timeout_or_default; pub use config::cmd_linsert_length_limit_or_default; pub use config::cmd_lrem_length_limit_or_default; diff --git a/src/tikv/mod.rs b/src/tikv/mod.rs index d7160fd..d49ea79 100644 --- a/src/tikv/mod.rs +++ b/src/tikv/mod.rs @@ -14,9 +14,12 @@ use crate::config::LOGGER; use crate::tikv::encoding::KeyEncoder; use crate::tikv::errors::REDIS_BACKEND_NOT_CONNECTED_ERR; use crate::{ - backend_ca_file_or_default, backend_cert_file_or_default, backend_key_file_or_default, - backend_timeout_or_default, config_meta_key_number_or_default, conn_concurrency_or_default, - fetch_idx_and_add, + backend_allow_batch_or_default, backend_ca_file_or_default, backend_cert_file_or_default, + backend_completion_queue_size_or_default, backend_grpc_keepalive_time_or_default, + backend_grpc_keepalive_timeout_or_default, backend_key_file_or_default, + backend_max_batch_size_or_default, backend_max_batch_wait_time_or_default, + backend_overload_threshold_or_default, backend_timeout_or_default, + config_meta_key_number_or_default, conn_concurrency_or_default, fetch_idx_and_add, }; use self::client::RawClientWrapper; @@ -125,7 +128,16 @@ pub async fn do_async_txn_connect(addrs: Vec) -> AsyncResult<()> { PD_ADDRS.write().unwrap().replace(addrs.clone()); let mut config = tikv_client::Config::default() - .with_timeout(Duration::from_millis(backend_timeout_or_default())); + .with_timeout(Duration::from_millis(backend_timeout_or_default())) + .with_kv_timeout(backend_timeout_or_default()) + .with_kv_allow_batch(backend_allow_batch_or_default()) + .with_kv_completion_queue_size(backend_completion_queue_size_or_default()) + .with_kv_grpc_keepalive_time(backend_grpc_keepalive_time_or_default()) + .with_kv_grpc_keepalive_timeout(backend_grpc_keepalive_timeout_or_default()) + .with_kv_allow_batch(backend_allow_batch_or_default()) + .with_kv_overload_threshold(backend_overload_threshold_or_default()) + .with_kv_max_batch_size(backend_max_batch_size_or_default()) + .with_kv_max_batch_wait_time(backend_max_batch_wait_time_or_default()); if !backend_ca_file_or_default().is_empty() || !backend_cert_file_or_default().is_empty() || !backend_key_file_or_default().is_empty() From 93c39f7e379ea3306c193918650dc4cc710a72bb Mon Sep 17 00:00:00 2001 From: yongman Date: Thu, 11 Aug 2022 17:10:14 +0800 Subject: [PATCH 3/7] Add load threshold and inflight request config for super batch Signed-off-by: yongman --- src/config.rs | 17 +++++++++++++++-- src/lib.rs | 1 + src/tikv/mod.rs | 6 ++++-- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 33c927d..d3148eb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -71,9 +71,10 @@ struct Backend { grpc_keepalive_time: Option, grpc_keepalive_timeout: Option, allow_batch: Option, - overload_threshold: Option, + overload_threshold: Option, max_batch_wait_time: Option, max_batch_size: Option, + max_inflight_requests: Option, txn_retry_count: Option, txn_region_backoff_delay_ms: Option, @@ -781,7 +782,7 @@ pub fn backend_allow_batch_or_default() -> bool { false } -pub fn backend_overload_threshold_or_default() -> usize { +pub fn backend_overload_threshold_or_default() -> u64 { unsafe { if let Some(c) = &SERVER_CONFIG { if let Some(b) = c.backend.overload_threshold { @@ -815,3 +816,15 @@ pub fn backend_max_batch_size_or_default() -> usize { // default backend max batch size 8 } + +pub fn backend_max_inflight_requests_or_default() -> usize { + unsafe { + if let Some(c) = &SERVER_CONFIG { + if let Some(b) = c.backend.max_inflight_requests { + return b; + } + } + } + // default backend max inflight requests + 100 +} diff --git a/src/lib.rs b/src/lib.rs index 78353d0..d1428e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,6 +94,7 @@ pub use config::backend_grpc_keepalive_timeout_or_default; pub use config::backend_key_file_or_default; pub use config::backend_max_batch_size_or_default; pub use config::backend_max_batch_wait_time_or_default; +pub use config::backend_max_inflight_requests_or_default; pub use config::backend_overload_threshold_or_default; pub use config::backend_timeout_or_default; pub use config::cmd_linsert_length_limit_or_default; diff --git a/src/tikv/mod.rs b/src/tikv/mod.rs index d49ea79..e2829af 100644 --- a/src/tikv/mod.rs +++ b/src/tikv/mod.rs @@ -18,8 +18,9 @@ use crate::{ backend_completion_queue_size_or_default, backend_grpc_keepalive_time_or_default, backend_grpc_keepalive_timeout_or_default, backend_key_file_or_default, backend_max_batch_size_or_default, backend_max_batch_wait_time_or_default, - backend_overload_threshold_or_default, backend_timeout_or_default, - config_meta_key_number_or_default, conn_concurrency_or_default, fetch_idx_and_add, + backend_max_inflight_requests_or_default, backend_overload_threshold_or_default, + backend_timeout_or_default, config_meta_key_number_or_default, conn_concurrency_or_default, + fetch_idx_and_add, }; use self::client::RawClientWrapper; @@ -137,6 +138,7 @@ pub async fn do_async_txn_connect(addrs: Vec) -> AsyncResult<()> { .with_kv_allow_batch(backend_allow_batch_or_default()) .with_kv_overload_threshold(backend_overload_threshold_or_default()) .with_kv_max_batch_size(backend_max_batch_size_or_default()) + .with_kv_max_inflight_requests(backend_max_inflight_requests_or_default()) .with_kv_max_batch_wait_time(backend_max_batch_wait_time_or_default()); if !backend_ca_file_or_default().is_empty() || !backend_cert_file_or_default().is_empty() From 5046465c6357ad3d310e8818af4c20a29fb1ce96 Mon Sep 17 00:00:00 2001 From: yongman Date: Fri, 12 Aug 2022 10:04:02 +0800 Subject: [PATCH 4/7] Revert "Fix client leak after connection disconnect (#55)" This reverts commit 9ed8a3b2cafbef60b2dd50a5360de11559236007. --- src/cmd/fake.rs | 111 ++++++++++++++++++++++++------------------------ src/cmd/mod.rs | 10 ++--- src/server.rs | 28 +++++------- 3 files changed, 70 insertions(+), 79 deletions(-) diff --git a/src/cmd/fake.rs b/src/cmd/fake.rs index 9dc51b5..c24451d 100644 --- a/src/cmd/fake.rs +++ b/src/cmd/fake.rs @@ -9,7 +9,7 @@ use crate::tikv::errors::{ REDIS_INVALID_CLIENT_ID_ERR, REDIS_NOT_SUPPORTED_ERR, REDIS_NO_SUCH_CLIENT_ERR, REDIS_VALUE_IS_NOT_INTEGER_ERR, }; -use crate::utils::resp_int; +use crate::utils::{resp_int, resp_str}; use crate::{ config::LOGGER, tikv::errors::REDIS_UNKNOWN_SUBCOMMAND, @@ -42,7 +42,7 @@ impl Fake { self, command: &str, dst: &mut Connection, - cur_client: u64, + cur_client: Arc>, clients: Arc>>>>, ) -> crate::Result<()> { let response = self.do_apply(command, cur_client, clients).await; @@ -63,7 +63,7 @@ impl Fake { async fn do_apply( self, command: &str, - cur_client: u64, + cur_client: Arc>, clients: Arc>>>>, ) -> Frame { if !self.valid { @@ -73,14 +73,16 @@ impl Fake { "READWRITE" => resp_ok(), "READONLY" => resp_ok(), "CLIENT" => { + // TODO client more management will be added later match self.args[0].clone().to_uppercase().as_str() { - "ID" => resp_int(cur_client as i64), + "ID" => resp_int(cur_client.lock().await.id() as i64), "LIST" => { if self.args.len() == 1 { - let clients_guard = clients.lock().await; return resp_bulk( - encode_clients_info(clients_guard.clone().into_values().collect()) - .await, + encode_clients_info( + clients.lock().await.clone().into_values().collect(), + ) + .await, ); } @@ -113,19 +115,23 @@ impl Fake { // three arguments format (old format) if self.args.len() == 2 { let mut target_client = None; - let clients_guard = clients.lock().await; - for client in clients_guard.values() { - // make sure get the client guard with clients aguard obtained - let client = client.lock().await; - if client.peer_addr() == self.args[1] { - target_client = Some(client.clone()); - break; + { + let lk_clients = clients.lock().await; + for client in lk_clients.values() { + let lk_client = client.lock().await; + if lk_client.peer_addr() == self.args[1] { + target_client = Some(client.clone()); + break; + } } } return match target_client { Some(client) => { - client.kill().await; + let lk_client = client.lock().await; + let mut lk_clients = clients.lock().await; + lk_client.kill().await; + lk_clients.remove(&lk_client.id()); resp_ok() } None => resp_err(REDIS_NO_SUCH_CLIENT_ERR), @@ -163,34 +169,39 @@ impl Fake { } // retrieve current client id in advance for preventing dead lock during clients traverse + let cur_client_id = cur_client.lock().await.id(); let mut eligible_clients: Vec>> = vec![]; - let clients_guard = clients.lock().await; - for client in clients_guard.values() { - let client_guard = client.lock().await; - if !filter_peer_addr.is_empty() - && client_guard.peer_addr() != filter_peer_addr - { - continue; - } - if !filter_local_addr.is_empty() - && client_guard.local_addr() != filter_local_addr - { - continue; - } - if filter_id != 0 && client_guard.id() != filter_id { - continue; - } - if cur_client == client_guard.id() && filter_skipme { - continue; - } + { + let lk_clients = clients.lock().await; + for client in lk_clients.values() { + let lk_client = client.lock().await; + if !filter_peer_addr.is_empty() + && lk_client.peer_addr() != filter_peer_addr + { + continue; + } + if !filter_local_addr.is_empty() + && lk_client.local_addr() != filter_local_addr + { + continue; + } + if filter_id != 0 && lk_client.id() != filter_id { + continue; + } + if cur_client_id == lk_client.id() && filter_skipme { + continue; + } - eligible_clients.push(client.clone()); + eligible_clients.push(client.clone()); + } } let killed = eligible_clients.len() as i64; + let mut lk_clients = clients.lock().await; for eligible_client in eligible_clients { - // make sure get the client guard with clients aguard obtained - eligible_client.lock().await.kill().await; + let lk_eligible_client = eligible_client.lock().await; + lk_eligible_client.kill().await; + lk_clients.remove(&lk_eligible_client.id()); } resp_int(killed) @@ -200,30 +211,18 @@ impl Fake { return resp_invalid_arguments(); } - let clients_guard = clients.lock().await; - clients_guard - .get(&cur_client) - .unwrap() - .lock() - .await - .set_name(&self.args[1]); - + let mut w_cur_client = cur_client.lock().await; + w_cur_client.set_name(&self.args[1]); resp_ok() } "GETNAME" => { - let clients_guard = clients.lock().await; - let name = clients_guard - .get(&cur_client) - .unwrap() - .lock() - .await - .name() - .to_owned(); + let r_cur_client = cur_client.lock().await; + let name = r_cur_client.name(); if name.is_empty() { return resp_nil(); } - resp_bulk(name.into_bytes()) + resp_str(name) } _ => resp_err(REDIS_UNKNOWN_SUBCOMMAND), } @@ -249,8 +248,8 @@ impl Fake { async fn encode_clients_info(clients: Vec>>) -> Vec { let mut resp_list = String::new(); for client in clients { - let client = client.lock().await; - resp_list.push_str(&client.to_string()); + let r_client = client.lock().await; + resp_list.push_str(&r_client.to_string()); resp_list.push('\n'); } diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 856aee6..a79e11b 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -681,7 +681,7 @@ impl Command { db: &Db, topo: &Topo, dst: &mut Connection, - cur_client_id: u64, + cur_client: Arc>, clients: Arc>>>>, lua: &mut Option, shutdown: &mut Shutdown, @@ -766,10 +766,10 @@ impl Command { Debug(cmd) => cmd.apply(dst).await, Cluster(cmd) => cmd.apply(topo, dst).await, - ReadWrite(cmd) => cmd.apply("readwrite", dst, cur_client_id, clients).await, - ReadOnly(cmd) => cmd.apply("readonly", dst, cur_client_id, clients).await, - Client(cmd) => cmd.apply("client", dst, cur_client_id, clients).await, - Info(cmd) => cmd.apply("info", dst, cur_client_id, clients).await, + ReadWrite(cmd) => cmd.apply("readwrite", dst, cur_client, clients).await, + ReadOnly(cmd) => cmd.apply("readonly", dst, cur_client, clients).await, + Client(cmd) => cmd.apply("client", dst, cur_client, clients).await, + Info(cmd) => cmd.apply("info", dst, cur_client, clients).await, Scan(cmd) => cmd.apply(dst).await, Xscan(cmd) => cmd.apply(dst).await, diff --git a/src/server.rs b/src/server.rs index b953762..92364a5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -132,7 +132,7 @@ struct Handler { db: Db, topo: Cluster, - cur_client_id: u64, + cur_client: Arc>, clients: Arc>>>>, /// The TCP connection decorated with the redis protocol encoder / decoder @@ -449,7 +449,7 @@ impl Listener { db: self.db_holder.db(), topo: self.topo_holder.clone(), - cur_client_id: client_id, + cur_client: arc_client.clone(), clients: self.clients.clone(), // Initialize the connection state. This allocates read/write @@ -475,15 +475,13 @@ impl Listener { // dropped. _shutdown_complete: self.shutdown_complete_tx.clone(), }; - - local_pool.spawn_pinned(move || async move { + local_pool.spawn_pinned(|| async move { // Process the connection. If an error is encountered, log it. CURRENT_CONNECTION_COUNTER.inc(); TOTAL_CONNECTION_PROCESSED.inc(); if let Err(err) = handler.run().await { error!(LOGGER, "connection error {:?}", err); } - handler.clients.lock().await.remove(&client_id); CURRENT_CONNECTION_COUNTER.dec(); }); } @@ -566,7 +564,7 @@ impl TlsListener { let mut handler = Handler { db: self.db_holder.db(), topo: self.topo_holder.clone(), - cur_client_id: client_id, + cur_client: arc_client.clone(), clients: self.clients.clone(), connection: Connection::new_tls(&local_addr, &peer_addr, tls_stream), inner_txn: false, @@ -577,14 +575,13 @@ impl TlsListener { _shutdown_complete: self.tls_shutdown_complete_tx.clone(), }; - local_pool.spawn_pinned(move || async move { + local_pool.spawn_pinned(|| async move { // Process the connection. If an error is encountered, log it. CURRENT_TLS_CONNECTION_COUNTER.inc(); TOTAL_CONNECTION_PROCESSED.inc(); if let Err(err) = handler.run().await { error!(LOGGER, "tls connection error {:?}", err); } - handler.clients.lock().await.remove(&client_id); CURRENT_TLS_CONNECTION_COUNTER.dec(); }); } @@ -709,15 +706,10 @@ impl Handler { let cmd = Command::from_frame(frame)?; let cmd_name = cmd.get_name().to_owned(); - let clients_guard = self.clients.lock().await; - // make sure get the client guard with clients aguard obtained - clients_guard - .get(&self.cur_client_id) - .unwrap() - .lock() - .await - .interact(&cmd_name); - drop(clients_guard); + { + let mut w_client = self.cur_client.lock().await; + w_client.interact(&cmd_name); + } let start_at = Instant::now(); REQUEST_COUNTER.inc(); @@ -827,7 +819,7 @@ impl Handler { &self.db, &self.topo, &mut self.connection, - self.cur_client_id, + self.cur_client.clone(), self.clients.clone(), &mut self.lua, &mut self.shutdown, From 8b037cb4e7634b7b0868fe54ff170d60e5acaf8a Mon Sep 17 00:00:00 2001 From: yongman Date: Fri, 12 Aug 2022 10:30:32 +0800 Subject: [PATCH 5/7] Fix client leak after connection disconnect Signed-off-by: yongman --- src/cmd/fake.rs | 11 +++-------- src/server.rs | 10 ++++++++++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/cmd/fake.rs b/src/cmd/fake.rs index c24451d..7b8a149 100644 --- a/src/cmd/fake.rs +++ b/src/cmd/fake.rs @@ -9,11 +9,10 @@ use crate::tikv::errors::{ REDIS_INVALID_CLIENT_ID_ERR, REDIS_NOT_SUPPORTED_ERR, REDIS_NO_SUCH_CLIENT_ERR, REDIS_VALUE_IS_NOT_INTEGER_ERR, }; -use crate::utils::{resp_int, resp_str}; use crate::{ config::LOGGER, tikv::errors::REDIS_UNKNOWN_SUBCOMMAND, - utils::{resp_bulk, resp_err, resp_invalid_arguments, resp_nil, resp_ok}, + utils::{resp_bulk, resp_err, resp_int, resp_invalid_arguments, resp_nil, resp_ok}, Connection, Frame, Parse, }; @@ -129,9 +128,7 @@ impl Fake { return match target_client { Some(client) => { let lk_client = client.lock().await; - let mut lk_clients = clients.lock().await; lk_client.kill().await; - lk_clients.remove(&lk_client.id()); resp_ok() } None => resp_err(REDIS_NO_SUCH_CLIENT_ERR), @@ -197,11 +194,9 @@ impl Fake { } let killed = eligible_clients.len() as i64; - let mut lk_clients = clients.lock().await; for eligible_client in eligible_clients { let lk_eligible_client = eligible_client.lock().await; lk_eligible_client.kill().await; - lk_clients.remove(&lk_eligible_client.id()); } resp_int(killed) @@ -217,12 +212,12 @@ impl Fake { } "GETNAME" => { let r_cur_client = cur_client.lock().await; - let name = r_cur_client.name(); + let name = r_cur_client.name().to_owned(); if name.is_empty() { return resp_nil(); } - resp_str(name) + resp_bulk(name.into_bytes()) } _ => resp_err(REDIS_UNKNOWN_SUBCOMMAND), } diff --git a/src/server.rs b/src/server.rs index 92364a5..65882da 100644 --- a/src/server.rs +++ b/src/server.rs @@ -482,6 +482,11 @@ impl Listener { if let Err(err) = handler.run().await { error!(LOGGER, "connection error {:?}", err); } + handler + .clients + .lock() + .await + .remove(&handler.cur_client.lock().await.id()); CURRENT_CONNECTION_COUNTER.dec(); }); } @@ -582,6 +587,11 @@ impl TlsListener { if let Err(err) = handler.run().await { error!(LOGGER, "tls connection error {:?}", err); } + handler + .clients + .lock() + .await + .remove(&handler.cur_client.lock().await.id()); CURRENT_TLS_CONNECTION_COUNTER.dec(); }); } From 85aa0b2453fa274da8d35e8d1a932cff0b40ed0f Mon Sep 17 00:00:00 2001 From: yongman Date: Fri, 12 Aug 2022 10:34:12 +0800 Subject: [PATCH 6/7] Update overload_threshold batch default config Signed-off-by: yongman --- config.toml | 5 +++-- src/config.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/config.toml b/config.toml index 4a6dfe2..4c4edc1 100644 --- a/config.toml +++ b/config.toml @@ -29,6 +29,7 @@ txn_lock_backoff_delay_attemps = 5 completion_queue_size = 1 grpc_keepalive_time = 10000 grpc_keepalive_timeout = 2000 -allow_batch = false +allow_batch = true max_batch_wait_time = 10 -max_batch_size = 10 +max_batch_size = 20 +max_inflight_requests = 10000 diff --git a/src/config.rs b/src/config.rs index d3148eb..8076f04 100644 --- a/src/config.rs +++ b/src/config.rs @@ -790,7 +790,7 @@ pub fn backend_overload_threshold_or_default() -> u64 { } } } - 100 + 0 } pub fn backend_max_batch_wait_time_or_default() -> u64 { From d7602e04c4cc46b9b84f70aecbcb9e4511463574 Mon Sep 17 00:00:00 2001 From: yongman Date: Fri, 12 Aug 2022 10:34:53 +0800 Subject: [PATCH 7/7] Update cargo Signed-off-by: yongman --- Cargo.lock | 34 ++++++++++++++++++---------------- Cargo.toml | 2 +- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4344517..af52c27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -57,9 +57,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.60" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142" +checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8" [[package]] name = "arrayvec" @@ -72,9 +72,9 @@ dependencies = [ [[package]] name = "async-channel" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b31b87a3367ed04dbcbc252bce3f2a8172fef861d47177524c503c908dff2c6" +checksum = "e14485364214912d3b19cc3435dde4df66065127f05fa0d75c712f36f12c2f28" dependencies = [ "concurrent-queue", "event-listener", @@ -1069,12 +1069,12 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.41" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1779539f58004e5dba1c1f093d44325ebeb244bfc04b791acdc0aaeca9c04570" +checksum = "808cf7d67cf4a22adc5be66e75ebdf769b3f2ea032041437a7061f97a63dad4b" dependencies = [ "android_system_properties", - "core-foundation", + "core-foundation-sys", "js-sys", "wasm-bindgen", "winapi", @@ -1187,9 +1187,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.129" +version = "0.2.131" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64de3cc433455c14174d42e554d4027ee631c4d046d43e3ecc6efc4636cdc7a7" +checksum = "04c3b4822ccebfa39c02fc03d1534441b22ead323fa0f48bb7ddd8e6ba076a40" [[package]] name = "libloading" @@ -1265,9 +1265,9 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "memmap2" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a79b39c93a7a5a27eeaf9a23b5ff43f1b9e0ad6b1cdd441140ae53c35613fc7" +checksum = "8e2e4455be2010e8c5e77f0d10234b30f3a636a5305725609b5a71ad00d22577" dependencies = [ "libc", ] @@ -2409,7 +2409,7 @@ dependencies = [ [[package]] name = "tikv-client" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=perfomance-optimize#7411b2b82d6a5e6ab152f3306dd7655189e9afcc" +source = "git+https://github.com/yongman/client-rust.git?branch=super-batch#795772cc4b94cd3212b7c54253b44a81b3857e30" dependencies = [ "async-recursion", "async-trait", @@ -2440,7 +2440,7 @@ dependencies = [ [[package]] name = "tikv-client-common" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=perfomance-optimize#7411b2b82d6a5e6ab152f3306dd7655189e9afcc" +source = "git+https://github.com/yongman/client-rust.git?branch=super-batch#795772cc4b94cd3212b7c54253b44a81b3857e30" dependencies = [ "futures 0.3.21", "grpcio", @@ -2456,7 +2456,7 @@ dependencies = [ [[package]] name = "tikv-client-pd" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=perfomance-optimize#7411b2b82d6a5e6ab152f3306dd7655189e9afcc" +source = "git+https://github.com/yongman/client-rust.git?branch=super-batch#795772cc4b94cd3212b7c54253b44a81b3857e30" dependencies = [ "async-trait", "futures 0.3.21", @@ -2469,7 +2469,7 @@ dependencies = [ [[package]] name = "tikv-client-proto" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=perfomance-optimize#7411b2b82d6a5e6ab152f3306dd7655189e9afcc" +source = "git+https://github.com/yongman/client-rust.git?branch=super-batch#795772cc4b94cd3212b7c54253b44a81b3857e30" dependencies = [ "futures 0.3.21", "grpcio", @@ -2483,13 +2483,15 @@ dependencies = [ [[package]] name = "tikv-client-store" version = "0.1.0" -source = "git+https://github.com/yongman/client-rust.git?branch=perfomance-optimize#7411b2b82d6a5e6ab152f3306dd7655189e9afcc" +source = "git+https://github.com/yongman/client-rust.git?branch=super-batch#795772cc4b94cd3212b7c54253b44a81b3857e30" dependencies = [ "async-trait", "derive-new", "futures 0.3.21", "grpcio", "log", + "serde", + "serde_derive", "tikv-client-common", "tikv-client-proto", ] diff --git a/Cargo.toml b/Cargo.toml index 2aaaff9..2b99aea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ slog-term = { version = "2.4" } tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" tokio-util = { version = "0.7.1", features = ["rt"] } -tikv-client = { git = "https://github.com/yongman/client-rust.git", branch = "perfomance-optimize" } +tikv-client = { git = "https://github.com/yongman/client-rust.git", branch = "super-batch" } #tikv-client = { path = "../client-rust" } lazy_static = "1.4.0" thiserror = "1"