From 3eb7a47d86ce6b85f5c8471b67cced927dd1dd85 Mon Sep 17 00:00:00 2001 From: themanforfree Date: Tue, 6 Feb 2024 10:19:00 +0800 Subject: [PATCH] feat: move token authentication to the network layer Signed-off-by: themanforfree --- crates/curp/src/client/mod.rs | 2 + crates/curp/src/client/retry.rs | 3 +- crates/curp/src/client/tests.rs | 211 ++++++++++--------- crates/curp/src/client/unary.rs | 16 +- crates/curp/src/rpc/connect.rs | 9 + crates/curp/tests/it/read_state.rs | 7 +- crates/curp/tests/it/server.rs | 33 +-- crates/simulation/src/curp_group.rs | 2 +- crates/xline-client/src/clients/auth.rs | 24 ++- crates/xline-client/src/clients/kv.rs | 62 +++--- crates/xline-client/src/clients/lease.rs | 28 +-- crates/xline-client/src/clients/lock.rs | 16 +- crates/xline/src/server/auth_server.rs | 63 +++--- crates/xline/src/server/auth_wrapper.rs | 31 ++- crates/xline/src/server/command.rs | 34 ++- crates/xline/src/server/kv_server.rs | 81 ++++--- crates/xline/src/server/lease_server.rs | 15 +- crates/xline/src/server/lock_server.rs | 69 +++--- crates/xline/src/server/maintenance.rs | 22 +- crates/xline/src/server/watch_server.rs | 16 +- crates/xline/src/server/xline_server.rs | 8 +- crates/xline/src/storage/alarm_store.rs | 11 +- crates/xline/src/storage/auth_store/perms.rs | 12 +- crates/xline/src/storage/auth_store/store.rs | 204 ++++++++---------- crates/xline/src/storage/compact/mod.rs | 16 +- crates/xline/src/storage/kv_store.rs | 164 +++++++------- crates/xline/src/storage/kvwatcher.rs | 16 +- crates/xline/src/storage/lease_store/mod.rs | 36 ++-- crates/xlineapi/proto | 2 +- crates/xlineapi/src/command.rs | 199 +++++++---------- crates/xlineapi/src/lib.rs | 130 +++++++----- 31 files changed, 791 insertions(+), 751 deletions(-) diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 494bb15d7..34c02c1c1 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -64,6 +64,7 @@ pub trait ClientApi { async fn propose( &self, cmd: &Self::Cmd, + token: Option<&String>, // TODO: Allow external custom interceptors, do not pass token in parameters use_fast_path: bool, ) -> Result, Self::Error>; @@ -126,6 +127,7 @@ trait RepeatableClientApi: ClientApi { &self, propose_id: ProposeId, cmd: &Self::Cmd, + token: Option<&String>, use_fast_path: bool, ) -> Result, Self::Error>; diff --git a/crates/curp/src/client/retry.rs b/crates/curp/src/client/retry.rs index 77482c23d..db350e36d 100644 --- a/crates/curp/src/client/retry.rs +++ b/crates/curp/src/client/retry.rs @@ -198,13 +198,14 @@ where async fn propose( &self, cmd: &Self::Cmd, + token: Option<&String>, use_fast_path: bool, ) -> Result, tonic::Status> { let propose_id = self .retry::<_, _>(RepeatableClientApi::gen_propose_id) .await?; self.retry::<_, _>(|client| { - RepeatableClientApi::propose(client, propose_id, cmd, use_fast_path) + RepeatableClientApi::propose(client, propose_id, cmd, token, use_fast_path) }) .await } diff --git a/crates/curp/src/client/tests.rs b/crates/curp/src/client/tests.rs index 50f64d7c9..dc58d7003 100644 --- a/crates/curp/src/client/tests.rs +++ b/crates/curp/src/client/tests.rs @@ -262,19 +262,22 @@ async fn test_unary_fetch_clusters_linearizable_failed() { #[tokio::test] async fn test_unary_fast_round_works() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose().return_once(move |_req, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), - 1 | 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), - }; - Ok(tonic::Response::new(resp)) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok( + TestCommandResult::default(), + )), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); let unary = init_unary_client(connects, None, None, 0, 0, None); let res = unary - .fast_round(ProposeId(0, 0), &TestCommand::default()) + .fast_round(ProposeId(0, 0), &TestCommand::default(), None) .await .unwrap() .unwrap(); @@ -300,14 +303,15 @@ async fn test_unary_fast_round_return_early_err() { let connects = init_mocked_connects(3, |_id, conn| { let counter_c = Arc::clone(&counter); let err = early_err.clone(); - conn.expect_propose().return_once(move |_req, _timeout| { - counter_c.lock().unwrap().add_assign(1); - Err(err) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); }); let unary = init_unary_client(connects, None, None, 0, 0, None); let err = unary - .fast_round(ProposeId(0, 0), &TestCommand::default()) + .fast_round(ProposeId(0, 0), &TestCommand::default(), None) .await .unwrap_err(); assert_eq!(err, early_err); @@ -319,19 +323,22 @@ async fn test_unary_fast_round_return_early_err() { #[tokio::test] async fn test_unary_fast_round_less_quorum() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose().return_once(move |_req, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), - 1 | 2 => ProposeResponse::new_empty(), - 3 | 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), - }; - Ok(tonic::Response::new(resp)) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok( + TestCommandResult::default(), + )), + 1 | 2 => ProposeResponse::new_empty(), + 3 | 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); let unary = init_unary_client(connects, None, None, 0, 0, None); let err = unary - .fast_round(ProposeId(0, 0), &TestCommand::default()) + .fast_round(ProposeId(0, 0), &TestCommand::default(), None) .await .unwrap_err(); assert_eq!(err, CurpError::KeyConflict(())); @@ -344,29 +351,29 @@ async fn test_unary_fast_round_less_quorum() { #[should_panic] async fn test_unary_fast_round_with_two_leader() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose().return_once(move |_req, _timeout| { - let resp = match id { - // The execution result has been returned, indicating that server(0) has also recorded the command. - 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::new( - vec![1], - vec![1], - ))), - // imagine that server(1) is the new leader - 1 => ProposeResponse::new_result::(&Ok(TestCommandResult::new( - vec![2], - vec![2], - ))), - 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), - }; - Ok(tonic::Response::new(resp)) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + let resp = + match id { + // The execution result has been returned, indicating that server(0) has also recorded the command. + 0 => ProposeResponse::new_result::(&Ok( + TestCommandResult::new(vec![1], vec![1]), + )), + // imagine that server(1) is the new leader + 1 => ProposeResponse::new_result::(&Ok( + TestCommandResult::new(vec![2], vec![2]), + )), + 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); // old local leader(0), term 1 let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let res = unary - .fast_round(ProposeId(0, 0), &TestCommand::default()) + .fast_round(ProposeId(0, 0), &TestCommand::default(), None) .await .unwrap() .unwrap(); @@ -379,18 +386,19 @@ async fn test_unary_fast_round_with_two_leader() { #[tokio::test] async fn test_unary_fast_round_without_leader() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose().return_once(move |_req, _timeout| { - let resp = match id { - 0 | 1 | 2 | 3 | 4 => ProposeResponse::new_empty(), - _ => unreachable!("there are only 5 nodes"), - }; - Ok(tonic::Response::new(resp)) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + let resp = match id { + 0 | 1 | 2 | 3 | 4 => ProposeResponse::new_empty(), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); }); // old local leader(0), term 1 let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let res = unary - .fast_round(ProposeId(0, 0), &TestCommand::default()) + .fast_round(ProposeId(0, 0), &TestCommand::default(), None) .await .unwrap_err(); // quorum: server(0, 1, 2, 3) @@ -444,15 +452,18 @@ async fn test_unary_slow_round_fetch_leader_first() { #[tokio::test] async fn test_unary_propose_fast_path_works() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose().return_once(move |_req, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), - 1 | 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), - }; - Ok(tonic::Response::new(resp)) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok( + TestCommandResult::default(), + )), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); conn.expect_wait_synced() .return_once(move |_req, _timeout| { assert!(id == 0, "wait synced should send to leader"); @@ -467,7 +478,7 @@ async fn test_unary_propose_fast_path_works() { }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let res = unary - .propose(&TestCommand::default(), true) + .propose(&TestCommand::default(), None, true) .await .unwrap() .unwrap(); @@ -478,15 +489,18 @@ async fn test_unary_propose_fast_path_works() { #[tokio::test] async fn test_unary_propose_slow_path_works() { let connects = init_mocked_connects(5, |id, conn| { - conn.expect_propose().return_once(move |_req, _timeout| { - let resp = match id { - 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), - 1 | 2 | 3 => ProposeResponse::new_empty(), - 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), - }; - Ok(tonic::Response::new(resp)) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok( + TestCommandResult::default(), + )), + 1 | 2 | 3 => ProposeResponse::new_empty(), + 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); conn.expect_wait_synced() .return_once(move |_req, _timeout| { assert!(id == 0, "wait synced should send to leader"); @@ -502,7 +516,7 @@ async fn test_unary_propose_slow_path_works() { let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let start_at = Instant::now(); let res = unary - .propose(&TestCommand::default(), false) + .propose(&TestCommand::default(), None, false) .await .unwrap() .unwrap(); @@ -523,17 +537,20 @@ async fn test_unary_propose_fast_path_fallback_slow_path() { let counter = Arc::new(Mutex::new(0)); let connects = init_mocked_connects(5, |id, conn| { let counter_c = Arc::clone(&counter); - conn.expect_propose().return_once(move |_req, _timeout| { - counter_c.lock().unwrap().add_assign(1); - // insufficient quorum - let resp = match id { - 0 => ProposeResponse::new_result::(&Ok(TestCommandResult::default())), - 1 | 2 => ProposeResponse::new_empty(), - 3 | 4 => return Err(CurpError::key_conflict()), - _ => unreachable!("there are only 5 nodes"), - }; - Ok(tonic::Response::new(resp)) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + counter_c.lock().unwrap().add_assign(1); + // insufficient quorum + let resp = match id { + 0 => ProposeResponse::new_result::(&Ok( + TestCommandResult::default(), + )), + 1 | 2 => ProposeResponse::new_empty(), + 3 | 4 => return Err(CurpError::key_conflict()), + _ => unreachable!("there are only 5 nodes"), + }; + Ok(tonic::Response::new(resp)) + }); conn.expect_wait_synced() .return_once(move |_req, _timeout| { assert!(id == 0, "wait synced should send to leader"); @@ -549,7 +566,7 @@ async fn test_unary_propose_fast_path_fallback_slow_path() { let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let start_at = Instant::now(); let res = unary - .propose(&TestCommand::default(), true) + .propose(&TestCommand::default(), None, true) .await .unwrap() .unwrap(); @@ -583,10 +600,11 @@ async fn test_unary_propose_return_early_err() { let connects = init_mocked_connects(5, |id, conn| { let err = early_err.clone(); let counter_c = Arc::clone(&counter); - conn.expect_propose().return_once(move |_req, _timeout| { - counter_c.lock().unwrap().add_assign(1); - Err(err) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); let err = early_err.clone(); let counter_c = Arc::clone(&counter); conn.expect_wait_synced() @@ -598,7 +616,7 @@ async fn test_unary_propose_return_early_err() { }); let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let err = unary - .propose(&TestCommand::default(), true) + .propose(&TestCommand::default(), None, true) .await .unwrap_err(); assert_eq!(err, early_err); @@ -623,10 +641,11 @@ async fn test_retry_propose_return_no_retry_error() { let connects = init_mocked_connects(5, |id, conn| { let err = early_err.clone(); let counter_c = Arc::clone(&counter); - conn.expect_propose().return_once(move |_req, _timeout| { - counter_c.lock().unwrap().add_assign(1); - Err(err) - }); + conn.expect_propose() + .return_once(move |_req, _token, _timeout| { + counter_c.lock().unwrap().add_assign(1); + Err(err) + }); let err = early_err.clone(); let counter_c = Arc::clone(&counter); conn.expect_wait_synced() @@ -639,7 +658,7 @@ async fn test_retry_propose_return_no_retry_error() { let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let retry = Retry::new(unary, RetryConfig::new_fixed(Duration::from_millis(100), 5)); let err = retry - .propose(&TestCommand::default(), false) + .propose(&TestCommand::default(), None, false) .await .unwrap_err(); assert_eq!(err.message(), tonic::Status::from(early_err).message()); @@ -676,7 +695,7 @@ async fn test_retry_propose_return_retry_error() { })) }); conn.expect_propose() - .returning(move |_req, _timeout| Err(err.clone())); + .returning(move |_req, _token, _timeout| Err(err.clone())); if id == 0 { let err = early_err.clone(); conn.expect_wait_synced() @@ -687,7 +706,7 @@ async fn test_retry_propose_return_retry_error() { let unary = init_unary_client(connects, None, Some(0), 1, 0, None); let retry = Retry::new(unary, RetryConfig::new_fixed(Duration::from_millis(10), 5)); let err = retry - .propose(&TestCommand::default(), false) + .propose(&TestCommand::default(), None, false) .await .unwrap_err(); assert!(err.message().contains("request timeout")); diff --git a/crates/curp/src/client/unary.rs b/crates/curp/src/client/unary.rs index 3e8e8505f..fb423f814 100644 --- a/crates/curp/src/client/unary.rs +++ b/crates/curp/src/client/unary.rs @@ -83,6 +83,7 @@ impl Unary { &self, propose_id: ProposeId, cmd: &C, + token: Option<&String>, ) -> Result, CurpError> { let req = ProposeRequest::new(propose_id, cmd, self.state.cluster_version().await); let timeout = self.config.propose_timeout; @@ -91,7 +92,8 @@ impl Unary { .state .for_each_server(|conn| { let req_c = req.clone(); - async move { (conn.id(), conn.propose(req_c, timeout).await) } + let token_c = token.cloned(); + async move { (conn.id(), conn.propose(req_c, token_c, timeout).await) } }) .await; let super_quorum = super_quorum(responses.len()); @@ -209,9 +211,14 @@ impl ClientApi for Unary { /// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered /// requests (event the requests are commutative). - async fn propose(&self, cmd: &C, use_fast_path: bool) -> Result, CurpError> { + async fn propose( + &self, + cmd: &C, + token: Option<&String>, + use_fast_path: bool, + ) -> Result, CurpError> { let propose_id = self.gen_propose_id().await?; - RepeatableClientApi::propose(self, propose_id, cmd, use_fast_path).await + RepeatableClientApi::propose(self, propose_id, cmd, token, use_fast_path).await } /// Send propose configuration changes to the cluster @@ -393,10 +400,11 @@ impl RepeatableClientApi for Unary { &self, propose_id: ProposeId, cmd: &Self::Cmd, + token: Option<&String>, use_fast_path: bool, ) -> Result, Self::Error> { tokio::pin! { - let fast_round = self.fast_round(propose_id, cmd); + let fast_round = self.fast_round(propose_id, cmd, token); let slow_round = self.slow_round(propose_id); } diff --git a/crates/curp/src/rpc/connect.rs b/crates/curp/src/rpc/connect.rs index 91614e4ac..82714558e 100644 --- a/crates/curp/src/rpc/connect.rs +++ b/crates/curp/src/rpc/connect.rs @@ -161,6 +161,7 @@ pub(crate) trait ConnectApi: Send + Sync + 'static { async fn propose( &self, request: ProposeRequest, + token: Option, timeout: Duration, ) -> Result, CurpError>; @@ -385,12 +386,16 @@ impl ConnectApi for Connect> { async fn propose( &self, request: ProposeRequest, + token: Option, timeout: Duration, ) -> Result, CurpError> { let mut client = self.rpc_connect.clone(); let mut req = tonic::Request::new(request); req.set_timeout(timeout); req.metadata_mut().inject_current(); + if let Some(token) = token { + _ = req.metadata_mut().insert("token", token.parse()?); + } client.propose(req).await.map_err(Into::into) } @@ -667,11 +672,15 @@ where async fn propose( &self, request: ProposeRequest, + token: Option, _timeout: Duration, ) -> Result, CurpError> { let mut req = tonic::Request::new(request); req.metadata_mut().inject_bypassed(); req.metadata_mut().inject_current(); + if let Some(token) = token { + _ = req.metadata_mut().insert("token", token.parse()?); + } self.server.propose(req).await.map_err(Into::into) } diff --git a/crates/curp/tests/it/read_state.rs b/crates/curp/tests/it/read_state.rs index 7e2acd410..f47dd303a 100644 --- a/crates/curp/tests/it/read_state.rs +++ b/crates/curp/tests/it/read_state.rs @@ -18,7 +18,12 @@ async fn read_state() { let put_cmd = TestCommand::new_put(vec![0], 0).set_exe_dur(Duration::from_millis(100)); tokio::spawn(async move { assert_eq!( - put_client.propose(&put_cmd, true).await.unwrap().unwrap().0, + put_client + .propose(&put_cmd, None, true) + .await + .unwrap() + .unwrap() + .0, TestCommandResult::default(), ); }); diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index fe9fa1243..603d17730 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -27,11 +27,12 @@ async fn basic_propose() { init_logger(); let group = CurpGroup::new(3).await; + sleep_secs(1).await; let client = group.new_client().await; assert_eq!( client - .propose(&TestCommand::new_put(vec![0], 0), true) + .propose(&TestCommand::new_put(vec![0], 0), None, true) .await .unwrap() .unwrap() @@ -40,7 +41,7 @@ async fn basic_propose() { ); assert_eq!( client - .propose(&TestCommand::new_get(vec![0]), true) + .propose(&TestCommand::new_get(vec![0]), None, true) .await .unwrap() .unwrap() @@ -58,7 +59,7 @@ async fn synced_propose() { let client = group.new_client().await; let cmd = TestCommand::new_get(vec![0]); - let (er, index) = client.propose(&cmd, false).await.unwrap().unwrap(); + let (er, index) = client.propose(&cmd, None, false).await.unwrap().unwrap(); assert_eq!(er, TestCommandResult::new(vec![], vec![])); assert_eq!(index.unwrap(), 1.into()); // log[0] is a fake one @@ -84,7 +85,7 @@ async fn exe_exact_n_times() { let client = group.new_client().await; let cmd = TestCommand::new_get(vec![0]); - let er = client.propose(&cmd, true).await.unwrap().unwrap().0; + let er = client.propose(&cmd, None, true).await.unwrap().unwrap().0; assert_eq!(er, TestCommandResult::new(vec![], vec![])); for exe_rx in group.exe_rxs() { @@ -216,7 +217,7 @@ async fn concurrent_cmd_order() { assert_eq!( client - .propose(&TestCommand::new_get(vec![1]), true) + .propose(&TestCommand::new_get(vec![1]), None, true) .await .unwrap() .unwrap() @@ -240,7 +241,11 @@ async fn concurrent_cmd_order_should_have_correct_revision() { for i in sample_range.clone() { let rand_dur = Duration::from_millis(thread_rng().gen_range(0..500).numeric_cast()); let _er = client - .propose(&TestCommand::new_put(vec![i], i).set_as_dur(rand_dur), true) + .propose( + &TestCommand::new_put(vec![i], i).set_as_dur(rand_dur), + None, + true, + ) .await .unwrap() .unwrap(); @@ -249,7 +254,7 @@ async fn concurrent_cmd_order_should_have_correct_revision() { for i in sample_range { assert_eq!( client - .propose(&TestCommand::new_get(vec![i]), true) + .propose(&TestCommand::new_get(vec![i]), None, true) .await .unwrap() .unwrap() @@ -272,7 +277,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() { let mut collection = vec![]; for i in 0..10 { let cmd = TestCommand::new_put(vec![i], i); - let res = req_client.propose(&cmd, true).await; + let res = req_client.propose(&cmd, None, true).await; if res.is_ok() && res.unwrap().is_ok() { collection.push(i); } @@ -284,7 +289,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() { client.propose_shutdown().await.unwrap(); let res = client - .propose(&TestCommand::new_put(vec![888], 1), false) + .propose(&TestCommand::new_put(vec![888], 1), None, false) .await; assert!(matches!( CurpError::from(res.unwrap_err()), @@ -299,7 +304,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() { let client = group.new_client().await; for i in collection { let res = client - .propose(&TestCommand::new_get(vec![i]), true) + .propose(&TestCommand::new_get(vec![i]), None, true) .await .unwrap(); assert_eq!(res.unwrap().0.values, vec![i]); @@ -346,7 +351,7 @@ async fn propose_remove_follower_should_success() { .is_finished()); // check if the old client can propose to the new cluster client - .propose(&TestCommand::new_get(vec![1]), true) + .propose(&TestCommand::new_get(vec![1]), None, true) .await .unwrap() .unwrap(); @@ -375,7 +380,7 @@ async fn propose_remove_leader_should_success() { assert_ne!(new_leader_id, leader_id); // check if the old client can propose to the new cluster client - .propose(&TestCommand::new_get(vec![1]), true) + .propose(&TestCommand::new_get(vec![1]), None, true) .await .unwrap() .unwrap(); @@ -448,7 +453,7 @@ async fn check_new_node(is_learner: bool) { let mut group = CurpGroup::new(3).await; let client = group.new_client().await; let req = TestCommand::new_put(vec![123], 123); - let _res = client.propose(&req, true).await.unwrap().unwrap(); + let _res = client.propose(&req, None, true).await.unwrap().unwrap(); let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); let addr = listener.local_addr().unwrap().to_string(); @@ -505,7 +510,7 @@ async fn check_new_node(is_learner: bool) { // 5. check if the old client can propose to the new cluster client - .propose(&TestCommand::new_get(vec![1]), true) + .propose(&TestCommand::new_get(vec![1]), None, true) .await .unwrap() .unwrap(); diff --git a/crates/simulation/src/curp_group.rs b/crates/simulation/src/curp_group.rs index 204928988..68488723f 100644 --- a/crates/simulation/src/curp_group.rs +++ b/crates/simulation/src/curp_group.rs @@ -457,7 +457,7 @@ impl SimClient { ) -> Result), C::Error>, tonic::Status> { let inner = self.inner.clone(); self.handle - .spawn(async move { inner.propose(&cmd, use_fast_path).await }) + .spawn(async move { inner.propose(&cmd, None, use_fast_path).await }) .await .unwrap() } diff --git a/crates/xline-client/src/clients/auth.rs b/crates/xline-client/src/clients/auth.rs index b5a3293c4..d9b16cbe3 100644 --- a/crates/xline-client/src/clients/auth.rs +++ b/crates/xline-client/src/clients/auth.rs @@ -6,13 +6,12 @@ use pbkdf2::{ }; use tonic::transport::Channel; use xlineapi::{ - command::command_from_request_wrapper, AuthDisableResponse, AuthEnableResponse, - AuthRoleAddResponse, AuthRoleDeleteResponse, AuthRoleGetResponse, - AuthRoleGrantPermissionResponse, AuthRoleListResponse, AuthRoleRevokePermissionResponse, - AuthStatusResponse, AuthUserAddResponse, AuthUserChangePasswordResponse, - AuthUserDeleteResponse, AuthUserGetResponse, AuthUserGrantRoleResponse, AuthUserListResponse, - AuthUserRevokeRoleResponse, AuthenticateResponse, RequestWithToken, RequestWrapper, - ResponseWrapper, + command::Command, AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse, + AuthRoleDeleteResponse, AuthRoleGetResponse, AuthRoleGrantPermissionResponse, + AuthRoleListResponse, AuthRoleRevokePermissionResponse, AuthStatusResponse, + AuthUserAddResponse, AuthUserChangePasswordResponse, AuthUserDeleteResponse, + AuthUserGetResponse, AuthUserGrantRoleResponse, AuthUserListResponse, + AuthUserRevokeRoleResponse, AuthenticateResponse, RequestWrapper, ResponseWrapper, }; use crate::{ @@ -720,14 +719,17 @@ impl AuthClient { request: Req, use_fast_path: bool, ) -> Result { - let request = RequestWithToken::new_with_token(request.into(), self.token.clone()); - let cmd = command_from_request_wrapper(request); + let request = request.into(); + let cmd = Command::new(request.keys(), request); let res_wrapper = if use_fast_path { - let (cmd_res, _sync_error) = self.curp_client.propose(&cmd, true).await??; + let (cmd_res, _sync_error) = self + .curp_client + .propose(&cmd, self.token.as_ref(), true) + .await??; cmd_res.into_inner() } else { - let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd, false).await?? else { + let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(),false).await?? else { unreachable!("sync_res is always Some when use_fast_path is false"); }; let mut res_wrapper = cmd_res.into_inner(); diff --git a/crates/xline-client/src/clients/kv.rs b/crates/xline-client/src/clients/kv.rs index 12859a345..f11f173e4 100644 --- a/crates/xline-client/src/clients/kv.rs +++ b/crates/xline-client/src/clients/kv.rs @@ -2,9 +2,8 @@ use std::{fmt::Debug, sync::Arc}; use tonic::transport::Channel; use xlineapi::{ - command::{command_from_request_wrapper, Command}, - CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, - TxnResponse, + command::Command, CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, + RequestWrapper, TxnResponse, }; use crate::{ @@ -84,12 +83,12 @@ impl KvClient { /// ``` #[inline] pub async fn put(&self, request: PutRequest) -> Result { - let request = RequestWithToken::new_with_token( - xlineapi::PutRequest::from(request).into(), - self.token.clone(), - ); - let cmd = command_from_request_wrapper(request); - let (cmd_res, _sync_res) = self.curp_client.propose(&cmd, true).await??; + let request = RequestWrapper::from(xlineapi::PutRequest::from(request)); + let cmd = Command::new(request.keys(), request); + let (cmd_res, _sync_res) = self + .curp_client + .propose(&cmd, self.token.as_ref(), true) + .await??; Ok(cmd_res.into_inner().into()) } @@ -128,12 +127,12 @@ impl KvClient { /// ``` #[inline] pub async fn range(&self, request: RangeRequest) -> Result { - let request = RequestWithToken::new_with_token( - xlineapi::RangeRequest::from(request).into(), - self.token.clone(), - ); - let cmd = command_from_request_wrapper(request); - let (cmd_res, _sync_res) = self.curp_client.propose(&cmd, true).await??; + let request = RequestWrapper::from(xlineapi::RangeRequest::from(request)); + let cmd = Command::new(request.keys(), request); + let (cmd_res, _sync_res) = self + .curp_client + .propose(&cmd, self.token.as_ref(), true) + .await??; Ok(cmd_res.into_inner().into()) } @@ -165,12 +164,12 @@ impl KvClient { /// ``` #[inline] pub async fn delete(&self, request: DeleteRangeRequest) -> Result { - let request = RequestWithToken::new_with_token( - xlineapi::DeleteRangeRequest::from(request).into(), - self.token.clone(), - ); - let cmd = command_from_request_wrapper(request); - let (cmd_res, _sync_res) = self.curp_client.propose(&cmd, true).await??; + let request = RequestWrapper::from(xlineapi::DeleteRangeRequest::from(request)); + let cmd = Command::new(request.keys(), request); + let (cmd_res, _sync_res) = self + .curp_client + .propose(&cmd, self.token.as_ref(), true) + .await??; Ok(cmd_res.into_inner().into()) } @@ -213,12 +212,9 @@ impl KvClient { /// ``` #[inline] pub async fn txn(&self, request: TxnRequest) -> Result { - let request = RequestWithToken::new_with_token( - xlineapi::TxnRequest::from(request).into(), - self.token.clone(), - ); - let cmd = command_from_request_wrapper(request); - let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd, false).await?? else { + let request = RequestWrapper::from(xlineapi::TxnRequest::from(request)); + let cmd = Command::new(request.keys(), request); + let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(), false).await?? else { unreachable!("sync_res is always Some when use_fast_path is false"); }; let mut res_wrapper = cmd_res.into_inner(); @@ -272,12 +268,12 @@ impl KvClient { .map(tonic::Response::into_inner) .map_err(Into::into); } - let request = RequestWithToken::new_with_token( - xlineapi::CompactionRequest::from(request).into(), - self.token.clone(), - ); - let cmd = Command::new(vec![], request); - let (cmd_res, _sync_res) = self.curp_client.propose(&cmd, true).await??; + let request = RequestWrapper::from(xlineapi::CompactionRequest::from(request)); + let cmd = Command::new(request.keys(), request); + let (cmd_res, _sync_res) = self + .curp_client + .propose(&cmd, self.token.as_ref(), true) + .await??; Ok(cmd_res.into_inner().into()) } } diff --git a/crates/xline-client/src/clients/lease.rs b/crates/xline-client/src/clients/lease.rs index 72dc93ec0..3189630f5 100644 --- a/crates/xline-client/src/clients/lease.rs +++ b/crates/xline-client/src/clients/lease.rs @@ -3,8 +3,8 @@ use std::{fmt::Debug, sync::Arc}; use futures::channel::mpsc::channel; use tonic::{transport::Channel, Streaming}; use xlineapi::{ - command::command_from_request_wrapper, LeaseGrantResponse, LeaseKeepAliveResponse, - LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveResponse, RequestWithToken, + command::Command, LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse, + LeaseRevokeResponse, LeaseTimeToLiveResponse, RequestWrapper, }; use crate::{ @@ -99,12 +99,12 @@ impl LeaseClient { if request.inner.id == 0 { request.inner.id = self.id_gen.next(); } - let request = RequestWithToken::new_with_token( - xlineapi::LeaseGrantRequest::from(request).into(), - self.token.clone(), - ); - let cmd = command_from_request_wrapper(request); - let (cmd_res, _sync_res) = self.curp_client.propose(&cmd, true).await??; + let request = RequestWrapper::from(xlineapi::LeaseGrantRequest::from(request)); + let cmd = Command::new(request.keys(), request); + let (cmd_res, _sync_res) = self + .curp_client + .propose(&cmd, self.token.as_ref(), true) + .await??; Ok(cmd_res.into_inner().into()) } @@ -274,12 +274,12 @@ impl LeaseClient { /// ``` #[inline] pub async fn leases(&self) -> Result { - let request = RequestWithToken::new_with_token( - xlineapi::LeaseLeasesRequest {}.into(), - self.token.clone(), - ); - let cmd = command_from_request_wrapper(request); - let (cmd_res, _sync_res) = self.curp_client.propose(&cmd, true).await??; + let request = RequestWrapper::from(xlineapi::LeaseLeasesRequest {}); + let cmd = Command::new(request.keys(), request); + let (cmd_res, _sync_res) = self + .curp_client + .propose(&cmd, self.token.as_ref(), true) + .await??; Ok(cmd_res.into_inner().into()) } } diff --git a/crates/xline-client/src/clients/lock.rs b/crates/xline-client/src/clients/lock.rs index bec122071..ed17b52c2 100644 --- a/crates/xline-client/src/clients/lock.rs +++ b/crates/xline-client/src/clients/lock.rs @@ -11,11 +11,11 @@ use clippy_utilities::OverflowArithmetic; use futures::{Future, FutureExt}; use tonic::transport::Channel; use xlineapi::{ - command::{command_from_request_wrapper, CommandResponse, KeyRange, SyncResponse}, + command::{Command, CommandResponse, KeyRange, SyncResponse}, Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, EventType, - LockResponse, PutRequest, RangeRequest, RangeResponse, Request, RequestOp, RequestWithToken, - RequestWrapper, Response, ResponseHeader, SortOrder, SortTarget, TargetUnion, TxnRequest, - TxnResponse, UnlockResponse, + LockResponse, PutRequest, RangeRequest, RangeResponse, Request, RequestOp, RequestWrapper, + Response, ResponseHeader, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, + UnlockResponse, }; use crate::{ @@ -254,12 +254,10 @@ impl LockClient { where T: Into, { - let request_with_token = - RequestWithToken::new_with_token(request.into(), self.token.clone()); - - let cmd = command_from_request_wrapper(request_with_token); + let request = request.into(); + let cmd = Command::new(request.keys(), request); self.curp_client - .propose(&cmd, use_fast_path) + .propose(&cmd, self.token.as_ref(), use_fast_path) .await? .map_err(Into::into) } diff --git a/crates/xline/src/server/auth_server.rs b/crates/xline/src/server/auth_server.rs index efd70f05b..5b32298cd 100644 --- a/crates/xline/src/server/auth_server.rs +++ b/crates/xline/src/server/auth_server.rs @@ -7,29 +7,36 @@ use pbkdf2::{ use tonic::metadata::MetadataMap; use tracing::debug; use xlineapi::{ - command::{command_from_request_wrapper, CommandResponse, CurpClient, SyncResponse}, + command::{Command, CommandResponse, CurpClient, SyncResponse}, request_validation::RequestValidator, - RequestWithToken, }; -use crate::rpc::{ - Auth, AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse, - AuthRoleAddRequest, AuthRoleAddResponse, AuthRoleDeleteRequest, AuthRoleDeleteResponse, - AuthRoleGetRequest, AuthRoleGetResponse, AuthRoleGrantPermissionRequest, - AuthRoleGrantPermissionResponse, AuthRoleListRequest, AuthRoleListResponse, - AuthRoleRevokePermissionRequest, AuthRoleRevokePermissionResponse, AuthStatusRequest, - AuthStatusResponse, AuthUserAddRequest, AuthUserAddResponse, AuthUserChangePasswordRequest, - AuthUserChangePasswordResponse, AuthUserDeleteRequest, AuthUserDeleteResponse, - AuthUserGetRequest, AuthUserGetResponse, AuthUserGrantRoleRequest, AuthUserGrantRoleResponse, - AuthUserListRequest, AuthUserListResponse, AuthUserRevokeRoleRequest, - AuthUserRevokeRoleResponse, AuthenticateRequest, AuthenticateResponse, RequestWrapper, - ResponseWrapper, +use crate::{ + rpc::{ + Auth, AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse, + AuthRoleAddRequest, AuthRoleAddResponse, AuthRoleDeleteRequest, AuthRoleDeleteResponse, + AuthRoleGetRequest, AuthRoleGetResponse, AuthRoleGrantPermissionRequest, + AuthRoleGrantPermissionResponse, AuthRoleListRequest, AuthRoleListResponse, + AuthRoleRevokePermissionRequest, AuthRoleRevokePermissionResponse, AuthStatusRequest, + AuthStatusResponse, AuthUserAddRequest, AuthUserAddResponse, AuthUserChangePasswordRequest, + AuthUserChangePasswordResponse, AuthUserDeleteRequest, AuthUserDeleteResponse, + AuthUserGetRequest, AuthUserGetResponse, AuthUserGrantRoleRequest, + AuthUserGrantRoleResponse, AuthUserListRequest, AuthUserListResponse, + AuthUserRevokeRoleRequest, AuthUserRevokeRoleResponse, AuthenticateRequest, + AuthenticateResponse, RequestWrapper, ResponseWrapper, + }, + storage::{storage_api::StorageApi, AuthStore}, }; /// Auth Server -pub(crate) struct AuthServer { +pub(crate) struct AuthServer +where + S: StorageApi, +{ /// Consensus client client: Arc, + /// Auth Store + auth_store: Arc>, } /// Get token from metadata @@ -40,10 +47,13 @@ pub(crate) fn get_token(metadata: &MetadataMap) -> Option { .and_then(|v| v.to_str().map(String::from).ok()) } -impl AuthServer { +impl AuthServer +where + S: StorageApi, +{ /// New `AuthServer` - pub(crate) fn new(client: Arc) -> Self { - Self { client } + pub(crate) fn new(client: Arc, auth_store: Arc>) -> Self { + Self { client, auth_store } } /// Propose request and get result with fast/slow path @@ -55,11 +65,13 @@ impl AuthServer { where T: Into, { - let token = get_token(request.metadata()); - let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper(wrapper); - - let res = self.client.propose(&cmd, use_fast_path).await??; + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_store.verify(&token)?), + None => None, + }; + let request = request.into_inner().into(); + let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); + let res = self.client.propose(&cmd, None, use_fast_path).await??; Ok(res) } @@ -92,7 +104,10 @@ impl AuthServer { } #[tonic::async_trait] -impl Auth for AuthServer { +impl Auth for AuthServer +where + S: StorageApi, +{ async fn auth_enable( &self, request: tonic::Request, diff --git a/crates/xline/src/server/auth_wrapper.rs b/crates/xline/src/server/auth_wrapper.rs index e06d782f5..cdbc57d27 100644 --- a/crates/xline/src/server/auth_wrapper.rs +++ b/crates/xline/src/server/auth_wrapper.rs @@ -1,15 +1,22 @@ use std::sync::Arc; -use curp::rpc::{ - FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest, FetchReadStateResponse, - LeaseKeepAliveMsg, MoveLeaderRequest, MoveLeaderResponse, ProposeConfChangeRequest, - ProposeConfChangeResponse, ProposeRequest, ProposeResponse, Protocol, PublishRequest, - PublishResponse, ShutdownRequest, ShutdownResponse, WaitSyncedRequest, WaitSyncedResponse, +use curp::{ + cmd::PbCodec, + rpc::{ + FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest, FetchReadStateResponse, + LeaseKeepAliveMsg, MoveLeaderRequest, MoveLeaderResponse, ProposeConfChangeRequest, + ProposeConfChangeResponse, ProposeRequest, ProposeResponse, Protocol, PublishRequest, + PublishResponse, ShutdownRequest, ShutdownResponse, WaitSyncedRequest, WaitSyncedResponse, + }, }; use tracing::debug; +use xlineapi::command::Command; use super::xline_server::CurpServer; -use crate::storage::{storage_api::StorageApi, AuthStore}; +use crate::{ + server::auth_server::get_token, + storage::{storage_api::StorageApi, AuthStore}, +}; /// Auth wrapper pub(crate) struct AuthWrapper @@ -19,7 +26,6 @@ where /// Curp server curp_server: CurpServer, /// Auth store - #[allow(unused)] // TODO: this will be used in the future auth_store: Arc>, } @@ -43,12 +49,21 @@ where { async fn propose( &self, - request: tonic::Request, + mut request: tonic::Request, ) -> Result, tonic::Status> { debug!( "AuthWrapper received propose request: {}", request.get_ref().propose_id() ); + if let Some(token) = get_token(request.metadata()) { + let auth_info = self.auth_store.verify(&token)?; + let mut command: Command = request + .get_ref() + .cmd() + .map_err(|e| tonic::Status::internal(e.to_string()))?; + command.set_auth_info(auth_info); + request.get_mut().command = command.encode(); + }; self.curp_server.propose(request).await } diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index 0ba402f01..754c4d49d 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -21,7 +21,7 @@ use xlineapi::{ use super::barriers::{IdBarrier, IndexBarrier}; use crate::{ revision_number::RevisionNumberGenerator, - rpc::{RequestBackend, RequestWithToken, RequestWrapper}, + rpc::{RequestBackend, RequestWrapper}, storage::{db::WriteOp, storage_api::StorageApi, AlarmStore, AuthStore, KvStore, LeaseStore}, }; @@ -178,7 +178,7 @@ where if !cmd.need_check_quota() { return true; } - let cmd_size = size_estimate::cmd_size(&cmd.request().request); + let cmd_size = size_estimate::cmd_size(cmd.request()); if self.persistent.estimated_file_size().overflow_add(cmd_size) > self.quota { let Ok(file_size) = self.persistent.file_size() else { return false @@ -218,12 +218,9 @@ impl Alarmer { /// Propose alarm request to other nodes async fn alarm(&self, action: AlarmAction, alarm: AlarmType) -> Result<(), tonic::Status> { - let alarm_req = AlarmRequest::new(action, self.id, alarm); - let cmd = Command::new( - vec![], - RequestWithToken::new(RequestWrapper::AlarmRequest(alarm_req)), - ); - let _ig = self.client.propose(&cmd, true).await?; + let request = RequestWrapper::from(AlarmRequest::new(action, self.id, alarm)); + let cmd = Command::new(request.keys(), request); + let _ig = self.client.propose(&cmd, None, true).await?; Ok(()) } } @@ -273,7 +270,7 @@ where /// Check if the alarm is activated fn check_alarm(&self, cmd: &Command) -> Result<(), ExecuteError> { #[allow(clippy::wildcard_enum_match_arm)] - match cmd.request().request { + match *cmd.request() { RequestWrapper::PutRequest(_) | RequestWrapper::TxnRequest(_) | RequestWrapper::LeaseGrantRequest(_) => match self.alarm_storage.current_alarm() { @@ -306,17 +303,18 @@ where ) -> Result<::PR, ::Error> { self.check_alarm(cmd)?; let wrapper = cmd.request(); - self.auth_storage.check_permission(wrapper)?; - let revision = match wrapper.request.backend() { + let auth_info = cmd.auth_info(); + self.auth_storage.check_permission(wrapper, auth_info)?; + let revision = match wrapper.backend() { RequestBackend::Auth => { - if wrapper.request.skip_auth_revision() { + if wrapper.skip_auth_revision() { -1 } else { self.auth_rev.next() } } RequestBackend::Kv | RequestBackend::Lease => { - if wrapper.request.skip_general_revision() { + if wrapper.skip_general_revision() { -1 } else { self.general_rev.next() @@ -332,7 +330,7 @@ where cmd: &Command, ) -> Result<::ER, ::Error> { let wrapper = cmd.request(); - match wrapper.request.backend() { + match wrapper.backend() { RequestBackend::Kv => self.kv_storage.execute(wrapper), RequestBackend::Auth => self.auth_storage.execute(wrapper), RequestBackend::Lease => self.lease_storage.execute(wrapper), @@ -349,20 +347,20 @@ where let quota_enough = self.quota_checker.check(cmd); let mut ops = vec![WriteOp::PutAppliedIndex(index)]; let wrapper = cmd.request(); - let (res, mut wr_ops) = match wrapper.request.backend() { + let (res, mut wr_ops) = match wrapper.backend() { RequestBackend::Kv => self.kv_storage.after_sync(wrapper, revision).await?, RequestBackend::Auth => self.auth_storage.after_sync(wrapper, revision)?, RequestBackend::Lease => self.lease_storage.after_sync(wrapper, revision).await?, RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, revision), }; - if let RequestWrapper::CompactionRequest(ref compact_req) = wrapper.request { + if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper { if compact_req.physical { if let Some(n) = self.compact_events.get(&cmd.compact_id()) { n.notify(usize::MAX); } } }; - if let RequestWrapper::CompactionRequest(ref compact_req) = wrapper.request { + if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper { if compact_req.physical { if let Some(n) = self.compact_events.get(&cmd.compact_id()) { n.notify(usize::MAX); @@ -374,7 +372,7 @@ where if !key_revisions.is_empty() { self.kv_storage.insert_index(key_revisions); } - self.lease_storage.mark_lease_synced(&wrapper.request); + self.lease_storage.mark_lease_synced(wrapper); if !quota_enough { if let Some(alarmer) = self.alarmer.read().clone() { let _ig = tokio::spawn(async move { diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index 20ea383e5..2d1ea9881 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -14,10 +14,10 @@ use futures::future::{join_all, Either}; use tokio::time::timeout; use tracing::{debug, instrument}; use xlineapi::{ - command::{command_from_request_wrapper, Command, CommandResponse, CurpClient, SyncResponse}, + command::{Command, CommandResponse, CurpClient, SyncResponse}, execute_error::ExecuteError, request_validation::RequestValidator, - RequestWithToken, ResponseWrapper, + AuthInfo, ResponseWrapper, }; use super::{ @@ -99,27 +99,26 @@ where } /// serializable execute request in current node - fn do_serializable(&self, wrapper: &RequestWithToken) -> Result { - self.auth_storage.check_permission(wrapper)?; - let cmd_res = self.kv_storage.execute(wrapper)?; - + fn do_serializable(&self, command: &Command) -> Result { + self.auth_storage + .check_permission(command.request(), command.auth_info())?; + let cmd_res = self.kv_storage.execute(command.request())?; Ok(Self::parse_response_op(cmd_res.into_inner().into())) } /// Propose request and get result with fast/slow path async fn propose( &self, - request: tonic::Request, + request: T, + auth_info: Option, use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into + Debug, { - let token = get_token(request.metadata()); - let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper(wrapper); - - let res = self.client.propose(&cmd, use_fast_path).await??; + let request = request.into(); + let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); + let res = self.client.propose(&cmd, None, use_fast_path).await??; Ok(res) } @@ -215,11 +214,14 @@ where self.kv_storage.compacted_revision(), self.kv_storage.revision(), )?; + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_storage.verify(&token)?), + None => None, + }; let range_required_revision = range_req.revision; let is_serializable = range_req.serializable; - let token = get_token(request.metadata()); - let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper(wrapper); + let request = RequestWrapper::from(request.into_inner()); + let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); if !is_serializable { self.wait_read_state(&cmd).await?; // Double check whether the range request is compacted or not since the compaction request @@ -231,7 +233,7 @@ where )?; } - let res = self.do_serializable(cmd.request())?; + let res = self.do_serializable(&cmd)?; if let Response::ResponseRange(response) = res { Ok(tonic::Response::new(response)) } else { @@ -250,9 +252,14 @@ where let put_req: &PutRequest = request.get_ref(); put_req.validation()?; debug!("Receive grpc request: {:?}", put_req); + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_storage.verify(&token)?), + None => None, + }; let is_fast_path = true; - let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?; - + let (cmd_res, sync_res) = self + .propose(request.into_inner(), auth_info, is_fast_path) + .await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); @@ -277,9 +284,14 @@ where let delete_range_req = request.get_ref(); delete_range_req.validation()?; debug!("Receive grpc request: {:?}", delete_range_req); + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_storage.verify(&token)?), + None => None, + }; let is_fast_path = true; - let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?; - + let (cmd_res, sync_res) = self + .propose(request.into_inner(), auth_info, is_fast_path) + .await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); @@ -309,21 +321,24 @@ where self.kv_storage.compacted_revision(), self.kv_storage.revision(), )?; - + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_storage.verify(&token)?), + None => None, + }; let res = if txn_req.is_read_only() { debug!("TxnRequest is read only"); let is_serializable = txn_req.is_serializable(); - let token = get_token(request.metadata()); - let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper(wrapper); + let request = RequestWrapper::from(request.into_inner()); + let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); if !is_serializable { self.wait_read_state(&cmd).await?; } - self.do_serializable(cmd.request())? + self.do_serializable(&cmd)? } else { let is_fast_path = true; - let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?; - + let (cmd_res, sync_res) = self + .propose(request.into_inner(), auth_info, is_fast_path) + .await?; let mut res = Self::parse_response_op(cmd_res.into_inner().into()); if let Some(sync_res) = sync_res { let revision = sync_res.revision(); @@ -352,10 +367,13 @@ where let current_revision = self.kv_storage.revision(); let req = request.get_ref(); req.check_revision(compacted_revision, current_revision)?; - + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_storage.verify(&token)?), + None => None, + }; let physical = req.physical; - let token = get_token(request.metadata()); - let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); + let request = RequestWrapper::from(request.into_inner()); + let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); let compact_id = self.next_compact_id.fetch_add(1, Ordering::Relaxed); let compact_physical_fut = if physical { let event = Arc::new(Event::new()); @@ -365,8 +383,7 @@ where } else { Either::Right(async {}) }; - let cmd = command_from_request_wrapper(wrapper); - let (cmd_res, _sync_res) = self.client.propose(&cmd, !physical).await??; + let (cmd_res, _sync_res) = self.client.propose(&cmd, None, !physical).await??; let resp = cmd_res.into_inner(); if timeout(self.compact_timeout, compact_physical_fut) .await diff --git a/crates/xline/src/server/lease_server.rs b/crates/xline/src/server/lease_server.rs index 367280c57..98cd16d93 100644 --- a/crates/xline/src/server/lease_server.rs +++ b/crates/xline/src/server/lease_server.rs @@ -18,7 +18,6 @@ use utils::{ use xlineapi::{ command::{Command, CommandResponse, CurpClient, KeyRange, SyncResponse}, execute_error::ExecuteError, - RequestWithToken, }; use super::auth_server::get_token; @@ -132,10 +131,13 @@ where where T: Into, { - let token = get_token(request.metadata()); - let request_inner = request.into_inner().into(); + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_storage.verify(&token)?), + None => None, + }; + let request = request.into_inner().into(); let keys = { - if let RequestWrapper::LeaseRevokeRequest(ref req) = request_inner { + if let RequestWrapper::LeaseRevokeRequest(ref req) = request { self.lease_storage .get_keys(req.id) .into_iter() @@ -145,9 +147,8 @@ where vec![] } }; - let wrapper = RequestWithToken::new_with_token(request_inner, token); - let cmd = Command::new(keys, wrapper); - let res = self.client.propose(&cmd, use_fast_path).await??; + let cmd = Command::new_with_auth_info(keys, request, auth_info); + let res = self.client.propose(&cmd, None, use_fast_path).await??; Ok(res) } diff --git a/crates/xline/src/server/lock_server.rs b/crates/xline/src/server/lock_server.rs index ff2669780..d04f1f623 100644 --- a/crates/xline/src/server/lock_server.rs +++ b/crates/xline/src/server/lock_server.rs @@ -10,9 +10,9 @@ use utils::build_endpoint; #[cfg(madsim)] use utils::ClientTlsConfig; use xlineapi::{ - command::{command_from_request_wrapper, CommandResponse, CurpClient, KeyRange, SyncResponse}, + command::{Command, CommandResponse, CurpClient, KeyRange, SyncResponse}, execute_error::ExecuteError, - EventType, RequestWithToken, + AuthInfo, EventType, }; use super::auth_server::get_token; @@ -25,25 +25,35 @@ use crate::{ ResponseHeader, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, UnlockRequest, UnlockResponse, WatchClient, WatchCreateRequest, WatchRequest, }, + storage::{storage_api::StorageApi, AuthStore}, }; /// Default session ttl const DEFAULT_SESSION_TTL: i64 = 60; /// Lock Server -pub(super) struct LockServer { +pub(super) struct LockServer +where + S: StorageApi, +{ /// Consensus client client: Arc, + /// Auth store + auth_store: Arc>, /// Id Generator id_gen: Arc, /// Server addresses addrs: Vec, } -impl LockServer { +impl LockServer +where + S: StorageApi, +{ /// New `LockServer` pub(super) fn new( client: Arc, + auth_store: Arc>, id_gen: Arc, addrs: &[String], client_tls_config: Option<&ClientTlsConfig>, @@ -57,6 +67,7 @@ impl LockServer { .collect(); Self { client, + auth_store, id_gen, addrs, } @@ -66,16 +77,15 @@ impl LockServer { async fn propose( &self, request: T, - token: Option, + auth_info: Option, use_fast_path: bool, ) -> Result<(CommandResponse, Option), tonic::Status> where T: Into, { - let wrapper = RequestWithToken::new_with_token(request.into(), token); - let cmd = command_from_request_wrapper(wrapper); - - let res = self.client.propose(&cmd, use_fast_path).await??; + let request = request.into(); + let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); + let res = self.client.propose(&cmd, None, use_fast_path).await??; Ok(res) } @@ -128,7 +138,7 @@ impl LockServer { &self, pfx: String, my_rev: i64, - token: Option<&String>, + auth_info: Option<&AuthInfo>, ) -> Result<(), tonic::Status> { let rev = my_rev.overflow_sub(1); let mut watch_client = @@ -145,7 +155,7 @@ impl LockServer { max_create_revision: rev, ..Default::default() }; - let (cmd_res, _sync_res) = self.propose(get_req, token.cloned(), false).await?; + let (cmd_res, _sync_res) = self.propose(get_req, auth_info.cloned(), false).await?; let response = Into::::into(cmd_res.into_inner()); let last_key = match response.kvs.first() { Some(kv) => kv.key.clone(), @@ -177,32 +187,35 @@ impl LockServer { async fn delete_key( &self, key: &[u8], - token: Option, + auth_info: Option, ) -> Result, tonic::Status> { let del_req = DeleteRangeRequest { key: key.into(), ..Default::default() }; - let (cmd_res, _) = self.propose(del_req, token, true).await?; + let (cmd_res, _) = self.propose(del_req, auth_info, true).await?; let res = Into::::into(cmd_res.into_inner()); Ok(res.header) } /// Lease grant - async fn lease_grant(&self, token: Option) -> Result { + async fn lease_grant(&self, auth_info: Option) -> Result { let lease_id = self.id_gen.next(); let lease_grant_req = LeaseGrantRequest { ttl: DEFAULT_SESSION_TTL, id: lease_id, }; - let (cmd_res, _) = self.propose(lease_grant_req, token, true).await?; + let (cmd_res, _) = self.propose(lease_grant_req, auth_info, true).await?; let res = Into::::into(cmd_res.into_inner()); Ok(res.id) } } #[tonic::async_trait] -impl Lock for LockServer { +impl Lock for LockServer +where + S: StorageApi, +{ /// Lock acquires a distributed shared lock on a given named lock. /// On success, it will return a unique key that exists so long as the /// lock is held by the caller. This key can be used in conjunction with @@ -214,10 +227,13 @@ impl Lock for LockServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive LockRequest {:?}", request); - let token = get_token(request.metadata()); + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_store.verify(&token)?), + None => None, + }; let lock_req = request.into_inner(); let lease_id = if lock_req.lease == 0 { - self.lease_grant(token.clone()).await? + self.lease_grant(auth_info.clone()).await? } else { lock_req.lease }; @@ -226,7 +242,7 @@ impl Lock for LockServer { let key = format!("{prefix}{lease_id:x}"); let txn = Self::create_acquire_txn(&prefix, lease_id); - let (cmd_res, sync_res) = self.propose(txn, token.clone(), false).await?; + let (cmd_res, sync_res) = self.propose(txn, auth_info.clone(), false).await?; let mut txn_res = Into::::into(cmd_res.into_inner()); #[allow(clippy::unwrap_used)] // sync_res always has value when use slow path let my_rev = sync_res.unwrap().revision(); @@ -250,15 +266,15 @@ impl Lock for LockServer { { owner_res.header } else { - if let Err(e) = self.wait_delete(prefix, my_rev, token.as_ref()).await { - let _ignore = self.delete_key(key.as_bytes(), token).await; + if let Err(e) = self.wait_delete(prefix, my_rev, auth_info.as_ref()).await { + let _ignore = self.delete_key(key.as_bytes(), auth_info).await; return Err(e); } let range_req = RangeRequest { key: key.as_bytes().to_vec(), ..Default::default() }; - let result = self.propose(range_req, token.clone(), true).await; + let result = self.propose(range_req, auth_info.clone(), true).await; match result { Ok(res) => { let res = Into::::into(res.0.into_inner()); @@ -268,7 +284,7 @@ impl Lock for LockServer { res.header } Err(e) => { - let _ignore = self.delete_key(key.as_bytes(), token).await; + let _ignore = self.delete_key(key.as_bytes(), auth_info).await; return Err(e); } } @@ -288,8 +304,11 @@ impl Lock for LockServer { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Receive UnlockRequest {:?}", request); - let token = get_token(request.metadata()); - let header = self.delete_key(&request.get_ref().key, token).await?; + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_store.verify(&token)?), + None => None, + }; + let header = self.delete_key(&request.get_ref().key, auth_info).await?; Ok(tonic::Response::new(UnlockResponse { header })) } } diff --git a/crates/xline/src/server/maintenance.rs b/crates/xline/src/server/maintenance.rs index 3ac3aa66a..17741e8ea 100644 --- a/crates/xline/src/server/maintenance.rs +++ b/crates/xline/src/server/maintenance.rs @@ -9,8 +9,8 @@ use futures::stream::Stream; use sha2::{Digest, Sha256}; use tracing::{debug, error}; use xlineapi::{ - command::{command_from_request_wrapper, Command, CommandResponse, CurpClient, SyncResponse}, - RequestWithToken, RequestWrapper, + command::{Command, CommandResponse, CurpClient, SyncResponse}, + RequestWrapper, }; use super::{auth_server::get_token, command::CommandExecutor}; @@ -23,7 +23,7 @@ use crate::{ StatusResponse, }, state::State, - storage::{storage_api::StorageApi, AlarmStore, KvStore}, + storage::{storage_api::StorageApi, AlarmStore, AuthStore, KvStore}, }; /// Minimum page size @@ -38,6 +38,8 @@ where { /// Kv Storage kv_store: Arc>, + /// Auth Storage + auth_store: Arc>, /// persistent storage persistent: Arc, // TODO: `persistent` is not a good name, rename it in a better way /// Header generator @@ -62,6 +64,7 @@ where #[allow(clippy::too_many_arguments)] // Consistent with other servers pub(crate) fn new( kv_store: Arc>, + auth_store: Arc>, client: Arc, persistent: Arc, header_gen: Arc, @@ -72,6 +75,7 @@ where ) -> Self { Self { kv_store, + auth_store, persistent, header_gen, client, @@ -91,11 +95,13 @@ where where T: Into + Debug, { - let token = get_token(request.metadata()); - let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); - let cmd = command_from_request_wrapper(wrapper); - - let res = self.client.propose(&cmd, use_fast_path).await??; + let auth_info = match get_token(request.metadata()) { + Some(token) => Some(self.auth_store.verify(&token)?), + None => None, + }; + let request = request.into_inner().into(); + let cmd = Command::new_with_auth_info(request.keys(), request, auth_info); + let res = self.client.propose(&cmd, None, use_fast_path).await??; Ok(res) } } diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index 3f89759c2..2b071aa62 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -436,10 +436,11 @@ mod test { time::{sleep, timeout}, }; use utils::config::{default_watch_progress_notify_interval, EngineConfig}; + use xlineapi::RequestWrapper; use super::*; use crate::{ - rpc::{PutRequest, RequestWithToken, WatchProgressRequest}, + rpc::{PutRequest, WatchProgressRequest}, storage::{ compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, kv_store::KvStoreInner, kvwatcher::MockKvWatcherOps, lease_store::LeaseCollection, KvStore, @@ -461,14 +462,11 @@ mod test { value: impl Into>, revision: i64, ) { - let req = RequestWithToken::new( - PutRequest { - key: key.into(), - value: value.into(), - ..Default::default() - } - .into(), - ); + let req = RequestWrapper::from(PutRequest { + key: key.into(), + value: value.into(), + ..Default::default() + }); let (_sync_res, ops) = store.after_sync(&req, revision).await.unwrap(); let key_revisions = db.flush_ops(ops).unwrap(); store.insert_index(key_revisions); diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index 015b49961..a6277d517 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -440,9 +440,9 @@ impl XlineServer { key_pair: Option<(EncodingKey, DecodingKey)>, ) -> Result<( KvServer, - LockServer, + LockServer, Arc>, - AuthServer, + AuthServer, WatchServer, MaintenanceServer, ClusterServer, @@ -555,6 +555,7 @@ impl XlineServer { ), LockServer::new( Arc::clone(&client), + Arc::clone(&auth_storage), Arc::clone(&id_gen), &self.cluster_info.self_peer_urls(), self.client_tls_config.as_ref(), @@ -568,7 +569,7 @@ impl XlineServer { self.client_tls_config.clone(), &self.task_manager, ), - AuthServer::new(Arc::clone(&client)), + AuthServer::new(Arc::clone(&client), Arc::clone(&auth_storage)), WatchServer::new( watcher, Arc::clone(&header_gen), @@ -577,6 +578,7 @@ impl XlineServer { ), MaintenanceServer::new( kv_storage, + Arc::clone(&auth_storage), Arc::clone(&client), persistent, Arc::clone(&header_gen), diff --git a/crates/xline/src/storage/alarm_store.rs b/crates/xline/src/storage/alarm_store.rs index 3c047f5c2..cacff460e 100644 --- a/crates/xline/src/storage/alarm_store.rs +++ b/crates/xline/src/storage/alarm_store.rs @@ -13,8 +13,7 @@ use utils::table_names::ALARM_TABLE; use xlineapi::{ command::{CommandResponse, SyncResponse}, execute_error::ExecuteError, - AlarmAction, AlarmMember, AlarmResponse, AlarmType, RequestWithToken, RequestWrapper, - ResponseWrapper, + AlarmAction, AlarmMember, AlarmResponse, AlarmType, RequestWrapper, ResponseWrapper, }; use super::{db::WriteOp, storage_api::StorageApi}; @@ -41,9 +40,9 @@ where DB: StorageApi, { /// execute a alarm request - pub(crate) fn execute(&self, request: &RequestWithToken) -> CommandResponse { + pub(crate) fn execute(&self, request: &RequestWrapper) -> CommandResponse { #[allow(clippy::wildcard_enum_match_arm)] - let alarms = match request.request { + let alarms = match *request { RequestWrapper::AlarmRequest(ref req) => match req.action() { AlarmAction::Get => self.handle_alarm_get(req.alarm()), AlarmAction::Activate => { @@ -68,11 +67,11 @@ where /// sync a alarm request pub(crate) fn after_sync( &self, - request: &RequestWithToken, + request: &RequestWrapper, revision: i64, ) -> (SyncResponse, Vec) { #[allow(clippy::wildcard_enum_match_arm)] - let ops = match request.request { + let ops = match *request { RequestWrapper::AlarmRequest(ref req) => match req.action() { AlarmAction::Get => vec![], AlarmAction::Activate => self.sync_alarm_activate(req.member_id, req.alarm()), diff --git a/crates/xline/src/storage/auth_store/perms.rs b/crates/xline/src/storage/auth_store/perms.rs index 6848bf17a..afb29952b 100644 --- a/crates/xline/src/storage/auth_store/perms.rs +++ b/crates/xline/src/storage/auth_store/perms.rs @@ -6,7 +6,7 @@ use jsonwebtoken::{ use merged_range::MergedRange; use serde::{Deserialize, Serialize}; use utils::timestamp; -use xlineapi::command::KeyRange; +use xlineapi::{command::KeyRange, AuthInfo}; use crate::rpc::{Permission, Type}; @@ -24,6 +24,16 @@ pub(super) struct TokenClaims { exp: u64, } +impl From for AuthInfo { + #[inline] + fn from(value: TokenClaims) -> Self { + Self { + username: value.username, + auth_revision: value.revision, + } + } +} + /// Operations of token manager pub(super) trait TokenOperate { /// Claims type diff --git a/crates/xline/src/storage/auth_store/store.rs b/crates/xline/src/storage/auth_store/store.rs index 08b15a1fe..c98a251c0 100644 --- a/crates/xline/src/storage/auth_store/store.rs +++ b/crates/xline/src/storage/auth_store/store.rs @@ -1,3 +1,5 @@ +#![allow(unused)] // TODO: remove this when refactoring is done + use std::{ cmp::Ordering, collections::{HashMap, VecDeque}, @@ -20,6 +22,7 @@ use utils::parking_lot_lock::RwLockMap; use xlineapi::{ command::{CommandResponse, KeyRange, SyncResponse}, execute_error::ExecuteError, + AuthInfo, }; use super::{ @@ -41,8 +44,7 @@ use crate::{ AuthUserGrantRoleResponse, AuthUserListRequest, AuthUserListResponse, AuthUserRevokeRoleRequest, AuthUserRevokeRoleResponse, AuthenticateRequest, AuthenticateResponse, DeleteRangeRequest, LeaseRevokeRequest, Permission, PutRequest, - RangeRequest, Request, RequestOp, RequestWithToken, RequestWrapper, Role, TxnRequest, Type, - User, + RangeRequest, Request, RequestOp, RequestWrapper, Role, TxnRequest, Type, User, }, storage::{ auth_store::backend::AuthStoreBackend, @@ -121,10 +123,11 @@ where } /// verify token - fn verify_token(&self, token: &str) -> Result { + pub(crate) fn verify(&self, token: &str) -> Result { match self.token_manager { Some(ref token_manager) => token_manager .verify(token) + .map(Into::into) .map_err(|_ignore| ExecuteError::InvalidAuthToken), None => Err(ExecuteError::TokenManagerNotInit), } @@ -172,10 +175,10 @@ where /// execute a auth request pub(crate) fn execute( &self, - request: &RequestWithToken, + request: &RequestWrapper, ) -> Result { #[allow(clippy::wildcard_enum_match_arm)] - let res = match request.request { + let res = match *request { RequestWrapper::AuthEnableRequest(ref req) => { self.handle_auth_enable_request(req).map(Into::into) } @@ -509,11 +512,11 @@ where /// sync a auth request pub(crate) fn after_sync<'a>( &self, - request: &'a RequestWithToken, + request: &'a RequestWrapper, revision: i64, ) -> Result<(SyncResponse, Vec>), ExecuteError> { #[allow(clippy::wildcard_enum_match_arm)] - let ops = match request.request { + let ops = match *request { RequestWrapper::AuthEnableRequest(ref req) => { debug!("Sync AuthEnableRequest {:?}", req); self.sync_auth_enable_request(req)? @@ -888,9 +891,9 @@ where } /// Check if the request need admin permission - fn need_admin_permission(wrapper: &RequestWithToken) -> bool { + fn need_admin_permission(wrapper: &RequestWrapper) -> bool { matches!( - wrapper.request, + *wrapper, RequestWrapper::AuthEnableRequest(_) | RequestWrapper::AuthDisableRequest(_) | RequestWrapper::AuthStatusRequest(_) @@ -914,49 +917,53 @@ where } /// check if the request is permitted - pub(crate) fn check_permission(&self, wrapper: &RequestWithToken) -> Result<(), ExecuteError> { + pub(crate) fn check_permission( + &self, + wrapper: &RequestWrapper, + auth_info: Option<&AuthInfo>, + ) -> Result<(), ExecuteError> { if !self.is_enabled() { return Ok(()); } - if let RequestWrapper::AuthenticateRequest(_) = wrapper.request { + if let RequestWrapper::AuthenticateRequest(_) = *wrapper { return Ok(()); } - let claims = match wrapper.token { - Some(ref token) => self.verify_token(token)?, - None => { - // TODO: some requests are allowed without token when auth is enabled - return Err(ExecuteError::TokenNotProvided); - } + let Some(auth_info) = auth_info else { + // TODO: some requests are allowed without token when auth is enabled + return Err(ExecuteError::TokenNotProvided); }; let cur_rev = self.revision(); - if claims.revision < cur_rev { - return Err(ExecuteError::TokenOldRevision(claims.revision, cur_rev)); + if auth_info.auth_revision < cur_rev { + return Err(ExecuteError::TokenOldRevision( + auth_info.auth_revision, + cur_rev, + )); } - let username = claims.username; + let username = &auth_info.username; if Self::need_admin_permission(wrapper) { - self.check_admin_permission(&username)?; + self.check_admin_permission(username)?; } else { #[allow(clippy::wildcard_enum_match_arm)] - match wrapper.request { + match *wrapper { RequestWrapper::RangeRequest(ref range_req) => { - self.check_range_permission(&username, range_req)?; + self.check_range_permission(username, range_req)?; } RequestWrapper::PutRequest(ref put_req) => { - self.check_put_permission(&username, put_req)?; + self.check_put_permission(username, put_req)?; } RequestWrapper::DeleteRangeRequest(ref del_range_req) => { - self.check_delete_permission(&username, del_range_req)?; + self.check_delete_permission(username, del_range_req)?; } RequestWrapper::TxnRequest(ref txn_req) => { - self.check_txn_permission(&username, txn_req)?; + self.check_txn_permission(username, txn_req)?; } RequestWrapper::LeaseRevokeRequest(ref lease_revoke_req) => { - self.check_lease_revoke_permission(&username, lease_revoke_req)?; + self.check_lease_revoke_permission(username, lease_revoke_req)?; } RequestWrapper::AuthUserGetRequest(ref user_get_req) => { - self.check_admin_permission(&username).map_or_else( + self.check_admin_permission(username).map_or_else( |e| { - if user_get_req.name == username { + if user_get_req.name == *username { Ok(()) } else { Err(e) @@ -966,9 +973,9 @@ where )?; } RequestWrapper::AuthRoleGetRequest(ref role_get_req) => { - self.check_admin_permission(&username).map_or_else( + self.check_admin_permission(username).map_or_else( |e| { - let user = self.backend.get_user(&username)?; + let user = self.backend.get_user(username)?; if user.has_role(&role_get_req.role) { Ok(()) } else { @@ -1161,27 +1168,24 @@ mod test { let store = init_auth_store(db); let current_revision = store.revision(); let token = store.assign("xline").unwrap(); - let claim = store.verify_token(token.as_str()).unwrap(); - assert_eq!(claim.revision, current_revision); - assert_eq!(claim.username, "xline"); + let auth_info = store.verify(token.as_str()).unwrap(); + assert_eq!(auth_info.auth_revision, current_revision); + assert_eq!(auth_info.username, "xline"); } #[test] fn test_role_grant_permission() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; let store = init_auth_store(db); - let req = RequestWithToken::new( - AuthRoleGrantPermissionRequest { - name: "r".to_owned(), - perm: Some(Permission { + let req = RequestWrapper::from(AuthRoleGrantPermissionRequest { + name: "r".to_owned(), + perm: Some(Permission { #[allow(clippy::as_conversions)] // This cast is always valid perm_type: Type::Write as i32, key: "fop".into(), range_end: "foz".into(), }), - } - .into(), - ); + }); assert!(exe_and_sync(&store, &req, 6).is_ok()); assert_eq!( store.permission_cache(), @@ -1206,14 +1210,11 @@ mod test { fn test_role_revoke_permission() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; let store = init_auth_store(db); - let req = RequestWithToken::new( - AuthRoleRevokePermissionRequest { - role: "r".to_owned(), - key: "foo".into(), - range_end: "".into(), - } - .into(), - ); + let req = RequestWrapper::from(AuthRoleRevokePermissionRequest { + role: "r".to_owned(), + key: "foo".into(), + range_end: "".into(), + }); assert!(exe_and_sync(&store, &req, 6).is_ok()); assert_eq!( store.permission_cache(), @@ -1229,12 +1230,9 @@ mod test { fn test_role_delete() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; let store = init_auth_store(db); - let req = RequestWithToken::new( - AuthRoleDeleteRequest { - role: "r".to_owned(), - } - .into(), - ); + let req = RequestWrapper::from(AuthRoleDeleteRequest { + role: "r".to_owned(), + }); assert!(exe_and_sync(&store, &req, 6).is_ok()); assert_eq!( store.permission_cache(), @@ -1250,12 +1248,9 @@ mod test { fn test_user_delete() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; let store = init_auth_store(db); - let req = RequestWithToken::new( - AuthUserDeleteRequest { - name: "u".to_owned(), - } - .into(), - ); + let req = RequestWrapper::from(AuthUserDeleteRequest { + name: "u".to_owned(), + }); assert!(exe_and_sync(&store, &req, 6).is_ok()); assert_eq!( store.permission_cache(), @@ -1274,36 +1269,27 @@ mod test { let revision = store.revision(); let rev_gen = Arc::clone(&store.revision); assert!(!store.is_enabled()); - let enable_req = RequestWithToken::new(AuthEnableRequest {}.into()); + let enable_req = RequestWrapper::from(AuthEnableRequest {}); // AuthEnableRequest won't increase the auth revision, but AuthDisableRequest will assert!(exe_and_sync(&store, &enable_req, store.revision()).is_err()); - let req_1 = RequestWithToken::new( - AuthUserAddRequest { - name: "root".to_owned(), - password: String::new(), - hashed_password: "123".to_owned(), - options: None, - } - .into(), - ); + let req_1 = RequestWrapper::from(AuthUserAddRequest { + name: "root".to_owned(), + password: String::new(), + hashed_password: "123".to_owned(), + options: None, + }); assert!(exe_and_sync(&store, &req_1, rev_gen.next()).is_ok()); - let req_2 = RequestWithToken::new( - AuthRoleAddRequest { - name: "root".to_owned(), - } - .into(), - ); + let req_2 = RequestWrapper::from(AuthRoleAddRequest { + name: "root".to_owned(), + }); assert!(exe_and_sync(&store, &req_2, rev_gen.next()).is_ok()); - let req_3 = RequestWithToken::new( - AuthUserGrantRoleRequest { - user: "root".to_owned(), - role: "root".to_owned(), - } - .into(), - ); + let req_3 = RequestWrapper::from(AuthUserGrantRoleRequest { + user: "root".to_owned(), + role: "root".to_owned(), + }); assert!(exe_and_sync(&store, &req_3, rev_gen.next()).is_ok()); assert_eq!(store.revision(), revision + 3); @@ -1311,7 +1297,7 @@ mod test { assert_eq!(store.revision(), 8); assert!(store.is_enabled()); - let disable_req = RequestWithToken::new(AuthDisableRequest {}.into()); + let disable_req = RequestWrapper::from(AuthDisableRequest {}); assert!(exe_and_sync(&store, &disable_req, rev_gen.next()).is_ok()); assert_eq!(store.revision(), revision + 4); @@ -1335,43 +1321,31 @@ mod test { fn init_auth_store(db: Arc) -> AuthStore { let store = init_empty_store(db); let rev = Arc::clone(&store.revision); - let req1 = RequestWithToken::new( - AuthRoleAddRequest { - name: "r".to_owned(), - } - .into(), - ); + let req1 = RequestWrapper::from(AuthRoleAddRequest { + name: "r".to_owned(), + }); assert!(exe_and_sync(&store, &req1, rev.next()).is_ok()); - let req2 = RequestWithToken::new( - AuthUserAddRequest { - name: "u".to_owned(), - password: String::new(), - hashed_password: "123".to_owned(), - options: None, - } - .into(), - ); + let req2 = RequestWrapper::from(AuthUserAddRequest { + name: "u".to_owned(), + password: String::new(), + hashed_password: "123".to_owned(), + options: None, + }); assert!(exe_and_sync(&store, &req2, rev.next()).is_ok()); - let req3 = RequestWithToken::new( - AuthUserGrantRoleRequest { - user: "u".to_owned(), - role: "r".to_owned(), - } - .into(), - ); + let req3 = RequestWrapper::from(AuthUserGrantRoleRequest { + user: "u".to_owned(), + role: "r".to_owned(), + }); assert!(exe_and_sync(&store, &req3, rev.next()).is_ok()); - let req4 = RequestWithToken::new( - AuthRoleGrantPermissionRequest { - name: "r".to_owned(), - perm: Some(Permission { + let req4 = RequestWrapper::from(AuthRoleGrantPermissionRequest { + name: "r".to_owned(), + perm: Some(Permission { #[allow(clippy::as_conversions)] // This cast is always valid perm_type: Type::Readwrite as i32, key: b"foo".to_vec(), range_end: vec![], }), - } - .into(), - ); + }); assert!(exe_and_sync(&store, &req4, rev.next()).is_ok()); assert_eq!( store.permission_cache(), @@ -1398,7 +1372,7 @@ mod test { fn exe_and_sync( store: &AuthStore, - req: &RequestWithToken, + req: &RequestWrapper, revision: i64, ) -> Result<(CommandResponse, SyncResponse), ExecuteError> { let cmd_res = store.execute(req)?; diff --git a/crates/xline/src/storage/compact/mod.rs b/crates/xline/src/storage/compact/mod.rs index 778bbb7a0..772a76088 100644 --- a/crates/xline/src/storage/compact/mod.rs +++ b/crates/xline/src/storage/compact/mod.rs @@ -10,17 +10,14 @@ use utils::{ config::AutoCompactConfig, task_manager::{tasks::TaskName, Listener, TaskManager}, }; -use xlineapi::{command::Command, execute_error::ExecuteError}; +use xlineapi::{command::Command, execute_error::ExecuteError, RequestWrapper}; use super::{ index::{Index, IndexOperate}, storage_api::StorageApi, KvStore, }; -use crate::{ - revision_number::RevisionNumberGenerator, - rpc::{CompactionRequest, RequestWithToken}, -}; +use crate::{revision_number::RevisionNumberGenerator, rpc::CompactionRequest}; /// mod revision compactor; mod revision_compactor; @@ -57,13 +54,12 @@ impl Compactable for Arc + Sync + Send + 'static> { async fn compact(&self, revision: i64) -> Result { - let request = CompactionRequest { + let request = RequestWrapper::from(CompactionRequest { revision, physical: false, - }; - let request_wrapper = RequestWithToken::new_with_token(request.into(), None); - let cmd = Command::new(vec![], request_wrapper); - let err = match self.propose(&cmd, true).await? { + }); + let cmd = Command::new(request.keys(), request); + let err = match self.propose(&cmd, None, true).await? { Ok(_) => return Ok(revision), Err(err) => err, }; diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 8e0c79343..d3029aa0d 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -31,8 +31,8 @@ use crate::{ rpc::{ CompactionRequest, CompactionResponse, Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, Event, EventType, KeyValue, PutRequest, - PutResponse, RangeRequest, RangeResponse, Request, RequestWithToken, RequestWrapper, - ResponseWrapper, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, + PutResponse, RangeRequest, RangeResponse, Request, RequestWrapper, ResponseWrapper, + SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, }, storage::db::{WriteOp, FINISHED_COMPACT_REVISION}, }; @@ -189,19 +189,18 @@ where /// execute a kv request pub(crate) fn execute( &self, - request: &RequestWithToken, + request: &RequestWrapper, ) -> Result { - self.handle_kv_requests(&request.request) - .map(CommandResponse::new) + self.handle_kv_requests(request).map(CommandResponse::new) } /// sync a kv request pub(crate) async fn after_sync( &self, - request: &RequestWithToken, + request: &RequestWrapper, revision: i64, ) -> Result<(SyncResponse, Vec), ExecuteError> { - self.sync_request(&request.request, revision) + self.sync_request(request, revision) .await .map(|(rev, ops)| (SyncResponse::new(rev), ops)) } @@ -994,14 +993,11 @@ mod test { let vals = vec!["a", "b", "c", "d", "e", "z1", "z2", "z3"]; let revision = RevisionNumberGenerator::default(); for (key, val) in keys.into_iter().zip(vals.into_iter()) { - let req = RequestWithToken::new( - PutRequest { - key: key.into(), - value: val.into(), - ..Default::default() - } - .into(), - ); + let req = RequestWrapper::from(PutRequest { + key: key.into(), + value: val.into(), + ..Default::default() + }); exe_as_and_flush(&store, &req, revision.next()).await?; } Ok((store, revision)) @@ -1043,7 +1039,7 @@ mod test { async fn exe_as_and_flush( store: &Arc>, - request: &RequestWithToken, + request: &RequestWrapper, revision: i64, ) -> Result<(), ExecuteError> { let (_sync_res, ops) = store.after_sync(request, revision).await?; @@ -1215,44 +1211,41 @@ mod test { #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn test_txn() -> Result<(), ExecuteError> { - let txn_req = RequestWithToken::new( - TxnRequest { - compare: vec![Compare { - result: CompareResult::Equal as i32, - target: CompareTarget::Value as i32, - key: "a".into(), - range_end: vec![], - target_union: Some(TargetUnion::Value("a".into())), - }], - success: vec![RequestOp { - request: Some(Request::RequestTxn(TxnRequest { - compare: vec![Compare { - result: CompareResult::Equal as i32, - target: CompareTarget::Value as i32, - key: "b".into(), - range_end: vec![], - target_union: Some(TargetUnion::Value("b".into())), - }], - success: vec![RequestOp { - request: Some(Request::RequestPut(PutRequest { - key: "success".into(), - value: "1".into(), - ..Default::default() - })), - }], - failure: vec![], - })), - }], - failure: vec![RequestOp { - request: Some(Request::RequestPut(PutRequest { - key: "success".into(), - value: "0".into(), - ..Default::default() - })), - }], - } - .into(), - ); + let txn_req = RequestWrapper::from(TxnRequest { + compare: vec![Compare { + result: CompareResult::Equal as i32, + target: CompareTarget::Value as i32, + key: "a".into(), + range_end: vec![], + target_union: Some(TargetUnion::Value("a".into())), + }], + success: vec![RequestOp { + request: Some(Request::RequestTxn(TxnRequest { + compare: vec![Compare { + result: CompareResult::Equal as i32, + target: CompareTarget::Value as i32, + key: "b".into(), + range_end: vec![], + target_union: Some(TargetUnion::Value("b".into())), + }], + success: vec![RequestOp { + request: Some(Request::RequestPut(PutRequest { + key: "success".into(), + value: "1".into(), + ..Default::default() + })), + }], + failure: vec![], + })), + }], + failure: vec![RequestOp { + request: Some(Request::RequestPut(PutRequest { + key: "success".into(), + value: "0".into(), + ..Default::default() + })), + }], + }); let db = DB::open(&EngineConfig::Memory)?; let (store, rev) = init_store(db).await?; exe_as_and_flush(&store, &txn_req, rev.next()).await?; @@ -1278,14 +1271,11 @@ mod test { let store = Arc::clone(&store); async move { for i in 0..100_u8 { - let req = RequestWithToken::new( - PutRequest { - key: "foo".into(), - value: vec![i], - ..Default::default() - } - .into(), - ); + let req = RequestWrapper::from(PutRequest { + key: "foo".into(), + value: vec![i], + ..Default::default() + }); exe_as_and_flush(&store, &req, revision.next()) .await .unwrap(); @@ -1311,37 +1301,25 @@ mod test { // sample requests: (a, 1) (b, 2) (a, 3) (del a) // their revisions: 2 3 4 5 let requests = vec![ - RequestWithToken::new( - PutRequest { - key: "a".into(), - value: "1".into(), - ..Default::default() - } - .into(), - ), - RequestWithToken::new( - PutRequest { - key: "b".into(), - value: "2".into(), - ..Default::default() - } - .into(), - ), - RequestWithToken::new( - PutRequest { - key: "a".into(), - value: "3".into(), - ..Default::default() - } - .into(), - ), - RequestWithToken::new( - DeleteRangeRequest { - key: "a".into(), - ..Default::default() - } - .into(), - ), + RequestWrapper::from(PutRequest { + key: "a".into(), + value: "1".into(), + ..Default::default() + }), + RequestWrapper::from(PutRequest { + key: "b".into(), + value: "2".into(), + ..Default::default() + }), + RequestWrapper::from(PutRequest { + key: "a".into(), + value: "3".into(), + ..Default::default() + }), + RequestWrapper::from(DeleteRangeRequest { + key: "a".into(), + ..Default::default() + }), ]; for req in requests { diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 8518050a9..174ee0a9e 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -594,11 +594,12 @@ mod test { use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; use utils::config::EngineConfig; + use xlineapi::RequestWrapper; use super::*; use crate::{ header_gen::HeaderGenerator, - rpc::{PutRequest, RequestWithToken}, + rpc::PutRequest, storage::{ compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, lease_store::LeaseCollection, KvStore, @@ -760,14 +761,11 @@ mod test { value: impl Into>, revision: i64, ) { - let req = RequestWithToken::new( - PutRequest { - key: key.into(), - value: value.into(), - ..Default::default() - } - .into(), - ); + let req = RequestWrapper::from(PutRequest { + key: key.into(), + value: value.into(), + ..Default::default() + }); let (_sync_res, ops) = store.after_sync(&req, revision).await.unwrap(); let key_revisions = db.flush_ops(ops).unwrap(); store.insert_index(key_revisions); diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index 424fd06f5..efedf8e27 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -30,8 +30,8 @@ use crate::{ header_gen::HeaderGenerator, rpc::{ Event, LeaseGrantRequest, LeaseGrantResponse, LeaseLeasesRequest, LeaseLeasesResponse, - LeaseRevokeRequest, LeaseRevokeResponse, LeaseStatus, PbLease, RequestWithToken, - RequestWrapper, ResponseHeader, ResponseWrapper, + LeaseRevokeRequest, LeaseRevokeResponse, LeaseStatus, PbLease, RequestWrapper, + ResponseHeader, ResponseWrapper, }, storage::KvStore, }; @@ -91,19 +91,19 @@ where /// execute a lease request pub(crate) fn execute( &self, - request: &RequestWithToken, + request: &RequestWrapper, ) -> Result { - self.handle_lease_requests(&request.request) + self.handle_lease_requests(request) .map(CommandResponse::new) } /// sync a lease request pub(crate) async fn after_sync( &self, - request: &RequestWithToken, + request: &RequestWrapper, revision: i64, ) -> Result<(SyncResponse, Vec), ExecuteError> { - self.sync_request(&request.request, revision) + self.sync_request(request, revision) .await .map(|(rev, ops)| (SyncResponse::new(rev), ops)) } @@ -385,7 +385,7 @@ mod test { let lease_store = init_store(db); let revision_gen = lease_store.header_gen.general_revision_arc(); - let req1 = RequestWithToken::new(LeaseGrantRequest { ttl: 10, id: 1 }.into()); + let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); let _ignore1 = exe_and_sync_req(&lease_store, &req1, -1).await?; let lo = lease_store.look_up(1).unwrap(); @@ -399,15 +399,15 @@ mod test { assert!(attach_existing_lease.is_ok()); lease_store.lease_collection.detach(1, "key".as_bytes())?; - let req2 = RequestWithToken::new(LeaseRevokeRequest { id: 1 }.into()); + let req2 = RequestWrapper::from(LeaseRevokeRequest { id: 1 }); let _ignore2 = exe_and_sync_req(&lease_store, &req2, revision_gen.next()).await?; assert!(lease_store.look_up(1).is_none()); assert!(lease_store.leases().is_empty()); - let req3 = RequestWithToken::new(LeaseGrantRequest { ttl: 10, id: 3 }.into()); - let req4 = RequestWithToken::new(LeaseGrantRequest { ttl: 10, id: 4 }.into()); - let req5 = RequestWithToken::new(LeaseRevokeRequest { id: 3 }.into()); - let req6 = RequestWithToken::new(LeaseLeasesRequest {}.into()); + let req3 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 3 }); + let req4 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 4 }); + let req5 = RequestWrapper::from(LeaseRevokeRequest { id: 3 }); + let req6 = RequestWrapper::from(LeaseLeasesRequest {}); let _ignore3 = exe_and_sync_req(&lease_store, &req3, -1).await?; let _ignore4 = exe_and_sync_req(&lease_store, &req4, -1).await?; let resp_1 = exe_and_sync_req(&lease_store, &req6, -1).await?; @@ -430,7 +430,7 @@ mod test { let lease_store = init_store(db); let wait_duration = Duration::from_millis(1); - let req1 = RequestWithToken::new(LeaseGrantRequest { ttl: 10, id: 1 }.into()); + let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); let _ignore1 = lease_store.execute(&req1)?; assert!( @@ -442,7 +442,7 @@ mod test { let (_ignore, ops) = lease_store.after_sync(&req1, -1).await?; _ = lease_store.db.flush_ops(ops)?; - lease_store.mark_lease_synced(&req1.request); + lease_store.mark_lease_synced(&req1); assert!( tokio::time::timeout(wait_duration, lease_store.wait_synced(1)) @@ -451,7 +451,7 @@ mod test { "the future should complete immediately after the lease is synced" ); - let req2 = RequestWithToken::new(LeaseRevokeRequest { id: 1 }.into()); + let req2 = RequestWrapper::from(LeaseRevokeRequest { id: 1 }); let _ignore2 = lease_store.execute(&req2)?; assert!( @@ -463,7 +463,7 @@ mod test { let (_ignore, ops) = lease_store.after_sync(&req2, -1).await?; _ = lease_store.db.flush_ops(ops)?; - lease_store.mark_lease_synced(&req2.request); + lease_store.mark_lease_synced(&req2); assert!( tokio::time::timeout(wait_duration, lease_store.wait_synced(1)) @@ -481,7 +481,7 @@ mod test { let db = DB::open(&EngineConfig::Memory)?; let store = init_store(Arc::clone(&db)); - let req1 = RequestWithToken::new(LeaseGrantRequest { ttl: 10, id: 1 }.into()); + let req1 = RequestWrapper::from(LeaseGrantRequest { ttl: 10, id: 1 }); let _ignore1 = exe_and_sync_req(&store, &req1, -1).await?; store.lease_collection.attach(1, "key".into())?; @@ -510,7 +510,7 @@ mod test { async fn exe_and_sync_req( ls: &LeaseStore, - req: &RequestWithToken, + req: &RequestWrapper, revision: i64, ) -> Result { let cmd_res = ls.execute(req)?; diff --git a/crates/xlineapi/proto b/crates/xlineapi/proto index e1dd8f2a0..b5abfb6af 160000 --- a/crates/xlineapi/proto +++ b/crates/xlineapi/proto @@ -1 +1 @@ -Subproject commit e1dd8f2a0419c5e8ccf3317037e9bf705b82287a +Subproject commit b5abfb6af1a3dd848ed5657ce2d727c149d760b4 diff --git a/crates/xlineapi/src/command.rs b/crates/xlineapi/src/command.rs index 067f841ec..bc6c44e14 100644 --- a/crates/xlineapi/src/command.rs +++ b/crates/xlineapi/src/command.rs @@ -10,9 +10,8 @@ use prost::Message; use serde::{Deserialize, Serialize}; use crate::{ - execute_error::ExecuteError, DeleteRangeRequest, PbCommand, PbCommandResponse, PbKeyRange, - PbSyncResponse, PutRequest, RangeRequest, Request, RequestWithToken, RequestWrapper, - ResponseWrapper, TxnRequest, + execute_error::ExecuteError, AuthInfo, PbCommand, PbCommandResponse, PbKeyRange, + PbSyncResponse, Request, RequestWrapper, ResponseWrapper, }; /// The curp client trait object on the command of xline @@ -213,12 +212,14 @@ impl From for PbKeyRange { #[cfg_attr(test, derive(PartialEq))] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Command { + /// Request data + request: RequestWrapper, /// Keys of request keys: Vec, - /// Request data - request: RequestWithToken, /// Compact Id compact_id: u64, + /// Auth info + auth_info: Option, } /// get all lease ids in the request wrapper @@ -285,8 +286,8 @@ fn get_lease_ids(wrapper: &RequestWrapper) -> HashSet { impl ConflictCheck for Command { #[inline] fn is_conflict(&self, other: &Self) -> bool { - let this_req = &self.request.request; - let other_req = &other.request.request; + let this_req = &self.request; + let other_req = &other.request; // auth read request will not conflict with any request except the auth write request if (this_req.is_auth_read_request() && other_req.is_auth_read_request()) || (this_req.is_kv_request() && other_req.is_auth_read_request()) @@ -357,11 +358,28 @@ impl Command { /// New `Command` #[must_use] #[inline] - pub fn new(keys: Vec, request: RequestWithToken) -> Self { + pub fn new(keys: Vec, request: RequestWrapper) -> Self { Self { + request, keys, + compact_id: 0, + auth_info: None, + } + } + + /// New `Command` with auth info + #[must_use] + #[inline] + pub fn new_with_auth_info( + keys: Vec, + request: RequestWrapper, + auth_info: Option, + ) -> Self { + Self { request, + keys, compact_id: 0, + auth_info, } } @@ -383,16 +401,29 @@ impl Command { /// get request #[must_use] #[inline] - pub fn request(&self) -> &RequestWithToken { + pub fn request(&self) -> &RequestWrapper { &self.request } + /// get auth_info + #[must_use] + #[inline] + pub fn auth_info(&self) -> Option<&AuthInfo> { + self.auth_info.as_ref() + } + + /// set auth_info + #[inline] + pub fn set_auth_info(&mut self, auth_info: AuthInfo) { + self.auth_info = Some(auth_info) + } + /// need check quota #[must_use] #[inline] pub fn need_check_quota(&self) -> bool { matches!( - self.request.request, + self.request, RequestWrapper::LeaseGrantRequest(_) | RequestWrapper::PutRequest(_) | RequestWrapper::TxnRequest(_) @@ -515,11 +546,11 @@ impl CurpCommand for Command { impl PbCodec for Command { #[inline] fn encode(&self) -> Vec { - let cmd = self.clone(); let rpc_cmd = PbCommand { - keys: cmd.keys.into_iter().map(Into::into).collect(), - request: Some(cmd.request.into()), - compact_id: cmd.compact_id, + keys: self.keys.iter().cloned().map(Into::into).collect(), + compact_id: self.compact_id, + auth_info: self.auth_info.clone(), + request_wrapper: Some(self.request.clone()), }; rpc_cmd.encode_to_vec() } @@ -529,98 +560,22 @@ impl PbCodec for Command { let rpc_cmd = PbCommand::decode(buf)?; Ok(Self { keys: rpc_cmd.keys.into_iter().map(Into::into).collect(), - request: rpc_cmd - .request - .ok_or(PbSerializeError::EmptyField)? - .try_into()?, compact_id: rpc_cmd.compact_id, + auth_info: rpc_cmd.auth_info, + request: rpc_cmd + .request_wrapper + .ok_or(PbSerializeError::EmptyField)?, }) } } -/// Get command keys from a Request for conflict check -pub trait CommandKeys { - /// Key ranges - fn keys(&self) -> Vec; -} - -impl CommandKeys for RangeRequest { - fn keys(&self) -> Vec { - vec![KeyRange::new( - self.key.as_slice(), - self.range_end.as_slice(), - )] - } -} - -impl CommandKeys for PutRequest { - fn keys(&self) -> Vec { - vec![KeyRange::new_one_key(self.key.as_slice())] - } -} - -impl CommandKeys for DeleteRangeRequest { - fn keys(&self) -> Vec { - vec![KeyRange::new( - self.key.as_slice(), - self.range_end.as_slice(), - )] - } -} - -impl CommandKeys for TxnRequest { - fn keys(&self) -> Vec { - let mut keys: Vec<_> = self - .compare - .iter() - .map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice())) - .collect(); - - for op in self - .success - .iter() - .chain(self.failure.iter()) - .map(|op| &op.request) - .flatten() - { - match *op { - Request::RequestRange(ref req) => { - keys.push(KeyRange::new(req.key.as_slice(), req.range_end.as_slice())); - } - Request::RequestPut(ref req) => { - keys.push(KeyRange::new_one_key(req.key.as_slice())) - } - Request::RequestDeleteRange(ref req) => { - keys.push(KeyRange::new(req.key.as_slice(), req.range_end.as_slice())) - } - Request::RequestTxn(ref req) => keys.append(&mut req.keys()), - } - } - - keys - } -} - -/// Generate `Command` proposal from `Request` -pub fn command_from_request_wrapper(wrapper: RequestWithToken) -> Command { - #[allow(clippy::wildcard_enum_match_arm)] - let keys = match wrapper.request { - RequestWrapper::RangeRequest(ref req) => req.keys(), - RequestWrapper::PutRequest(ref req) => req.keys(), - RequestWrapper::DeleteRangeRequest(ref req) => req.keys(), - RequestWrapper::TxnRequest(ref req) => req.keys(), - _ => vec![], - }; - Command::new(keys, wrapper) -} - #[cfg(test)] mod test { use super::*; use crate::{ - AuthEnableRequest, AuthStatusRequest, CompactionRequest, Compare, LeaseGrantRequest, - LeaseLeasesRequest, LeaseRevokeRequest, PutRequest, PutResponse, RangeRequest, RequestOp, - TxnRequest, + AuthEnableRequest, AuthStatusRequest, CommandKeys, CompactionRequest, Compare, + LeaseGrantRequest, LeaseLeasesRequest, LeaseRevokeRequest, PutRequest, PutResponse, + RangeRequest, RequestOp, TxnRequest, }; #[test] @@ -659,57 +614,45 @@ mod test { fn test_command_conflict() { let cmd1 = Command::new( vec![KeyRange::new("a", "e")], - RequestWithToken::new(RequestWrapper::PutRequest(PutRequest::default())), + RequestWrapper::PutRequest(PutRequest::default()), ); let cmd2 = Command::new( vec![], - RequestWithToken::new(RequestWrapper::AuthStatusRequest( - AuthStatusRequest::default(), - )), + RequestWrapper::AuthStatusRequest(AuthStatusRequest::default()), ); let cmd3 = Command::new( vec![KeyRange::new("c", "g")], - RequestWithToken::new(RequestWrapper::PutRequest(PutRequest::default())), + RequestWrapper::PutRequest(PutRequest::default()), ); let cmd4 = Command::new( vec![], - RequestWithToken::new(RequestWrapper::AuthEnableRequest( - AuthEnableRequest::default(), - )), + RequestWrapper::AuthEnableRequest(AuthEnableRequest::default()), ); let cmd5 = Command::new( vec![], - RequestWithToken::new(RequestWrapper::LeaseGrantRequest(LeaseGrantRequest { - ttl: 1, - id: 1, - })), + RequestWrapper::LeaseGrantRequest(LeaseGrantRequest { ttl: 1, id: 1 }), ); let cmd6 = Command::new( vec![], - RequestWithToken::new(RequestWrapper::LeaseRevokeRequest(LeaseRevokeRequest { - id: 1, - })), + RequestWrapper::LeaseRevokeRequest(LeaseRevokeRequest { id: 1 }), ); let lease_grant_cmd = Command::new( vec![], - RequestWithToken::new(RequestWrapper::LeaseGrantRequest(LeaseGrantRequest { - ttl: 1, - id: 123, - })), + RequestWrapper::LeaseGrantRequest(LeaseGrantRequest { ttl: 1, id: 123 }), ); let put_with_lease_cmd = Command::new( vec![KeyRange::new_one_key("foo")], - RequestWithToken::new(RequestWrapper::PutRequest(PutRequest { + RequestWrapper::PutRequest(PutRequest { key: b"key".to_vec(), value: b"value".to_vec(), lease: 123, ..Default::default() - })), + }), ); let txn_with_lease_id_cmd = Command::new( vec![KeyRange::new_one_key("key")], - RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + RequestWrapper::TxnRequest(TxnRequest { compare: vec![], success: vec![RequestOp { request: Some(Request::RequestPut(PutRequest { @@ -720,11 +663,11 @@ mod test { })), }], failure: vec![], - })), + }), ); let lease_leases_cmd = Command::new( vec![], - RequestWithToken::new(RequestWrapper::LeaseLeasesRequest(LeaseLeasesRequest {})), + RequestWrapper::LeaseLeasesRequest(LeaseLeasesRequest {}), ); assert!(lease_grant_cmd.is_conflict(&put_with_lease_cmd)); // lease id @@ -746,11 +689,11 @@ mod test { ) -> Command { Command::new( keys, - RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + RequestWrapper::TxnRequest(TxnRequest { compare, success, failure, - })), + }), ) } @@ -758,18 +701,18 @@ mod test { fn test_compaction_txn_conflict() { let compaction_cmd_1 = Command::new( vec![], - RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest { + RequestWrapper::CompactionRequest(CompactionRequest { revision: 3, physical: false, - })), + }), ); let compaction_cmd_2 = Command::new( vec![], - RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest { + RequestWrapper::CompactionRequest(CompactionRequest { revision: 5, physical: false, - })), + }), ); let txn_with_lease_id_cmd = generate_txn_command( @@ -838,7 +781,7 @@ mod test { fn command_serialization_is_ok() { let cmd = Command::new( vec![KeyRange::new("a", "e")], - RequestWithToken::new(RequestWrapper::PutRequest(PutRequest::default())), + RequestWrapper::PutRequest(PutRequest::default()), ); let decoded_cmd = ::decode(&cmd.encode()).expect("decode should success"); diff --git a/crates/xlineapi/src/lib.rs b/crates/xlineapi/src/lib.rs index 3ce65bc65..35b9e2893 100644 --- a/crates/xlineapi/src/lib.rs +++ b/crates/xlineapi/src/lib.rs @@ -199,15 +199,15 @@ mod errorpb { use std::fmt::Display; -use curp_external_api::cmd::PbSerializeError; -use serde::{Deserialize, Serialize}; +use command::KeyRange; pub use self::{ authpb::{permission::Type, Permission, Role, User, UserAddOptions}, commandpb::{ - command_response::ResponseWrapper, request_with_token::RequestWrapper, + command::{AuthInfo, RequestWrapper}, + command_response::ResponseWrapper, Command as PbCommand, CommandResponse as PbCommandResponse, KeyRange as PbKeyRange, - RequestWithToken as PbRequestWithToken, SyncResponse as PbSyncResponse, + SyncResponse as PbSyncResponse, }, errorpb::{ execute_error::Error as PbExecuteError, ExecuteError as PbExecuteErrorOuter, @@ -271,39 +271,6 @@ impl User { } } -/// Wrapper for requests -#[derive(PartialEq)] // used only in tests, doesn't work with a `cfg_attr` macro. -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct RequestWithToken { - /// token for authentication - pub token: Option, - /// Internal request - pub request: RequestWrapper, -} - -impl TryFrom for RequestWithToken { - type Error = PbSerializeError; - - #[inline] - fn try_from(req: PbRequestWithToken) -> Result { - let request = req.request_wrapper.ok_or(PbSerializeError::EmptyField)?; - Ok(Self { - token: req.token, - request, - }) - } -} - -impl From for PbRequestWithToken { - #[inline] - fn from(req: RequestWithToken) -> Self { - PbRequestWithToken { - token: req.token, - request_wrapper: Some(req.request), - } - } -} - impl ResponseWrapper { /// Update response revision pub fn update_revision(&mut self, revision: i64) { @@ -354,7 +321,81 @@ pub enum RequestBackend { Alarm, } +/// Get command keys from a Request for conflict check +pub trait CommandKeys { + /// Key ranges + fn keys(&self) -> Vec; +} + +impl CommandKeys for RangeRequest { + fn keys(&self) -> Vec { + vec![KeyRange::new( + self.key.as_slice(), + self.range_end.as_slice(), + )] + } +} + +impl CommandKeys for PutRequest { + fn keys(&self) -> Vec { + vec![KeyRange::new_one_key(self.key.as_slice())] + } +} + +impl CommandKeys for DeleteRangeRequest { + fn keys(&self) -> Vec { + vec![KeyRange::new( + self.key.as_slice(), + self.range_end.as_slice(), + )] + } +} + +impl CommandKeys for TxnRequest { + fn keys(&self) -> Vec { + let mut keys: Vec<_> = self + .compare + .iter() + .map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice())) + .collect(); + + for op in self + .success + .iter() + .chain(self.failure.iter()) + .map(|op| &op.request) + .flatten() + { + match *op { + Request::RequestRange(ref req) => { + keys.push(KeyRange::new(req.key.as_slice(), req.range_end.as_slice())); + } + Request::RequestPut(ref req) => { + keys.push(KeyRange::new_one_key(req.key.as_slice())) + } + Request::RequestDeleteRange(ref req) => { + keys.push(KeyRange::new(req.key.as_slice(), req.range_end.as_slice())) + } + Request::RequestTxn(ref req) => keys.append(&mut req.keys()), + } + } + + keys + } +} + impl RequestWrapper { + /// Get keys of the request + pub fn keys(&self) -> Vec { + match *self { + RequestWrapper::RangeRequest(ref req) => req.keys(), + RequestWrapper::PutRequest(ref req) => req.keys(), + RequestWrapper::DeleteRangeRequest(ref req) => req.keys(), + RequestWrapper::TxnRequest(ref req) => req.keys(), + _ => vec![], + } + } + /// Get the backend of the request pub fn backend(&self) -> RequestBackend { match *self { @@ -588,21 +629,6 @@ impl From for ResponseOp { } } -impl RequestWithToken { - /// New `RequestWithToken` - pub fn new(request: RequestWrapper) -> Self { - RequestWithToken { - token: None, - request, - } - } - - /// New `RequestWithToken` with token - pub fn new_with_token(request: RequestWrapper, token: Option) -> Self { - RequestWithToken { token, request } - } -} - impl Event { pub fn is_create(&self) -> bool { let kv = self