Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: move token authentication to the network layer #648

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub trait ClientApi {
async fn propose(
&self,
cmd: &Self::Cmd,
token: Option<&String>, // TODO: Allow external custom interceptors, do not pass token in parameters
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, Self::Error>;

Expand Down Expand Up @@ -126,6 +127,7 @@ trait RepeatableClientApi: ClientApi {
&self,
propose_id: ProposeId,
cmd: &Self::Cmd,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, Self::Error>;

Expand Down
3 changes: 2 additions & 1 deletion crates/curp/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,14 @@ where
async fn propose(
&self,
cmd: &Self::Cmd,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, tonic::Status> {
let propose_id = self
.retry::<_, _>(RepeatableClientApi::gen_propose_id)
.await?;
self.retry::<_, _>(|client| {
RepeatableClientApi::propose(client, propose_id, cmd, use_fast_path)
RepeatableClientApi::propose(client, propose_id, cmd, token, use_fast_path)
})
.await
}
Expand Down
211 changes: 115 additions & 96 deletions crates/curp/src/client/tests.rs

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions crates/curp/src/client/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl<C: Command> Unary<C> {
&self,
propose_id: ProposeId,
cmd: &C,
token: Option<&String>,
) -> Result<Result<C::ER, C::Error>, CurpError> {
let req = ProposeRequest::new(propose_id, cmd, self.state.cluster_version().await);
let timeout = self.config.propose_timeout;
Expand All @@ -91,7 +92,8 @@ impl<C: Command> Unary<C> {
.state
.for_each_server(|conn| {
let req_c = req.clone();
async move { (conn.id(), conn.propose(req_c, timeout).await) }
let token_c = token.cloned();
async move { (conn.id(), conn.propose(req_c, token_c, timeout).await) }
})
.await;
let super_quorum = super_quorum(responses.len());
Expand Down Expand Up @@ -209,9 +211,14 @@ impl<C: Command> ClientApi for Unary<C> {

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
/// requests (event the requests are commutative).
async fn propose(&self, cmd: &C, use_fast_path: bool) -> Result<ProposeResponse<C>, CurpError> {
async fn propose(
&self,
cmd: &C,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<C>, CurpError> {
let propose_id = self.gen_propose_id().await?;
RepeatableClientApi::propose(self, propose_id, cmd, use_fast_path).await
RepeatableClientApi::propose(self, propose_id, cmd, token, use_fast_path).await
}

/// Send propose configuration changes to the cluster
Expand Down Expand Up @@ -393,10 +400,11 @@ impl<C: Command> RepeatableClientApi for Unary<C> {
&self,
propose_id: ProposeId,
cmd: &Self::Cmd,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, Self::Error> {
tokio::pin! {
let fast_round = self.fast_round(propose_id, cmd);
let fast_round = self.fast_round(propose_id, cmd, token);
let slow_round = self.slow_round(propose_id);
}

Expand Down
9 changes: 9 additions & 0 deletions crates/curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub(crate) trait ConnectApi: Send + Sync + 'static {
async fn propose(
&self,
request: ProposeRequest,
token: Option<String>,
timeout: Duration,
) -> Result<tonic::Response<ProposeResponse>, CurpError>;

Expand Down Expand Up @@ -385,12 +386,16 @@ impl ConnectApi for Connect<ProtocolClient<Channel>> {
async fn propose(
&self,
request: ProposeRequest,
token: Option<String>,
timeout: Duration,
) -> Result<tonic::Response<ProposeResponse>, CurpError> {
let mut client = self.rpc_connect.clone();
let mut req = tonic::Request::new(request);
req.set_timeout(timeout);
req.metadata_mut().inject_current();
if let Some(token) = token {
_ = req.metadata_mut().insert("token", token.parse()?);
}
client.propose(req).await.map_err(Into::into)
}

Expand Down Expand Up @@ -667,11 +672,15 @@ where
async fn propose(
&self,
request: ProposeRequest,
token: Option<String>,
_timeout: Duration,
) -> Result<tonic::Response<ProposeResponse>, CurpError> {
let mut req = tonic::Request::new(request);
req.metadata_mut().inject_bypassed();
req.metadata_mut().inject_current();
if let Some(token) = token {
_ = req.metadata_mut().insert("token", token.parse()?);
}
self.server.propose(req).await.map_err(Into::into)
}

Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl Snapshot {
}

/// Get the inner snapshot ref
#[cfg(feature = "client-metrics")]
pub(crate) fn inner(&self) -> &EngineSnapshot {
&self.inner
}
Expand Down
7 changes: 6 additions & 1 deletion crates/curp/tests/it/read_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ async fn read_state() {
let put_cmd = TestCommand::new_put(vec![0], 0).set_exe_dur(Duration::from_millis(100));
tokio::spawn(async move {
assert_eq!(
put_client.propose(&put_cmd, true).await.unwrap().unwrap().0,
put_client
.propose(&put_cmd, None, true)
.await
.unwrap()
.unwrap()
.0,
TestCommandResult::default(),
);
});
Expand Down
33 changes: 19 additions & 14 deletions crates/curp/tests/it/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ async fn basic_propose() {
init_logger();

let group = CurpGroup::new(3).await;

let client = group.new_client().await;

assert_eq!(
client
.propose(&TestCommand::new_put(vec![0], 0), true)
.propose(&TestCommand::new_put(vec![0], 0), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -40,7 +41,7 @@ async fn basic_propose() {
);
assert_eq!(
client
.propose(&TestCommand::new_get(vec![0]), true)
.propose(&TestCommand::new_get(vec![0]), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -58,7 +59,7 @@ async fn synced_propose() {
let client = group.new_client().await;
let cmd = TestCommand::new_get(vec![0]);

let (er, index) = client.propose(&cmd, false).await.unwrap().unwrap();
let (er, index) = client.propose(&cmd, None, false).await.unwrap().unwrap();
assert_eq!(er, TestCommandResult::new(vec![], vec![]));
assert_eq!(index.unwrap(), 1.into()); // log[0] is a fake one

Expand All @@ -84,7 +85,7 @@ async fn exe_exact_n_times() {
let client = group.new_client().await;
let cmd = TestCommand::new_get(vec![0]);

let er = client.propose(&cmd, true).await.unwrap().unwrap().0;
let er = client.propose(&cmd, None, true).await.unwrap().unwrap().0;
assert_eq!(er, TestCommandResult::new(vec![], vec![]));

for exe_rx in group.exe_rxs() {
Expand Down Expand Up @@ -216,7 +217,7 @@ async fn concurrent_cmd_order() {

assert_eq!(
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -240,7 +241,11 @@ async fn concurrent_cmd_order_should_have_correct_revision() {
for i in sample_range.clone() {
let rand_dur = Duration::from_millis(thread_rng().gen_range(0..500).numeric_cast());
let _er = client
.propose(&TestCommand::new_put(vec![i], i).set_as_dur(rand_dur), true)
.propose(
&TestCommand::new_put(vec![i], i).set_as_dur(rand_dur),
None,
true,
)
.await
.unwrap()
.unwrap();
Expand All @@ -249,7 +254,7 @@ async fn concurrent_cmd_order_should_have_correct_revision() {
for i in sample_range {
assert_eq!(
client
.propose(&TestCommand::new_get(vec![i]), true)
.propose(&TestCommand::new_get(vec![i]), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -272,7 +277,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
let mut collection = vec![];
for i in 0..10 {
let cmd = TestCommand::new_put(vec![i], i);
let res = req_client.propose(&cmd, true).await;
let res = req_client.propose(&cmd, None, true).await;
if res.is_ok() && res.unwrap().is_ok() {
collection.push(i);
}
Expand All @@ -284,7 +289,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
client.propose_shutdown().await.unwrap();

let res = client
.propose(&TestCommand::new_put(vec![888], 1), false)
.propose(&TestCommand::new_put(vec![888], 1), None, false)
.await;
assert!(matches!(
CurpError::from(res.unwrap_err()),
Expand All @@ -299,7 +304,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
let client = group.new_client().await;
for i in collection {
let res = client
.propose(&TestCommand::new_get(vec![i]), true)
.propose(&TestCommand::new_get(vec![i]), None, true)
.await
.unwrap();
assert_eq!(res.unwrap().0.values, vec![i]);
Expand Down Expand Up @@ -346,7 +351,7 @@ async fn propose_remove_follower_should_success() {
.is_finished());
// check if the old client can propose to the new cluster
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -375,7 +380,7 @@ async fn propose_remove_leader_should_success() {
assert_ne!(new_leader_id, leader_id);
// check if the old client can propose to the new cluster
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -448,7 +453,7 @@ async fn check_new_node(is_learner: bool) {
let mut group = CurpGroup::new(3).await;
let client = group.new_client().await;
let req = TestCommand::new_put(vec![123], 123);
let _res = client.propose(&req, true).await.unwrap().unwrap();
let _res = client.propose(&req, None, true).await.unwrap().unwrap();

let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
let addr = listener.local_addr().unwrap().to_string();
Expand Down Expand Up @@ -505,7 +510,7 @@ async fn check_new_node(is_learner: bool) {

// 5. check if the old client can propose to the new cluster
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl<C: Command> SimClient<C> {
) -> Result<Result<(C::ER, Option<C::ASR>), C::Error>, tonic::Status> {
let inner = self.inner.clone();
self.handle
.spawn(async move { inner.propose(&cmd, use_fast_path).await })
.spawn(async move { inner.propose(&cmd, None, use_fast_path).await })
.await
.unwrap()
}
Expand Down
24 changes: 13 additions & 11 deletions crates/xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use pbkdf2::{
};
use tonic::transport::Channel;
use xlineapi::{
command::command_from_request_wrapper, AuthDisableResponse, AuthEnableResponse,
AuthRoleAddResponse, AuthRoleDeleteResponse, AuthRoleGetResponse,
AuthRoleGrantPermissionResponse, AuthRoleListResponse, AuthRoleRevokePermissionResponse,
AuthStatusResponse, AuthUserAddResponse, AuthUserChangePasswordResponse,
AuthUserDeleteResponse, AuthUserGetResponse, AuthUserGrantRoleResponse, AuthUserListResponse,
AuthUserRevokeRoleResponse, AuthenticateResponse, RequestWithToken, RequestWrapper,
ResponseWrapper,
command::Command, AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse,
AuthRoleDeleteResponse, AuthRoleGetResponse, AuthRoleGrantPermissionResponse,
AuthRoleListResponse, AuthRoleRevokePermissionResponse, AuthStatusResponse,
AuthUserAddResponse, AuthUserChangePasswordResponse, AuthUserDeleteResponse,
AuthUserGetResponse, AuthUserGrantRoleResponse, AuthUserListResponse,
AuthUserRevokeRoleResponse, AuthenticateResponse, RequestWrapper, ResponseWrapper,
};

use crate::{
Expand Down Expand Up @@ -720,14 +719,17 @@ impl AuthClient {
request: Req,
use_fast_path: bool,
) -> Result<Res> {
let request = RequestWithToken::new_with_token(request.into(), self.token.clone());
let cmd = command_from_request_wrapper(request);
let request = request.into();
let cmd = Command::new(request.keys(), request);

let res_wrapper = if use_fast_path {
let (cmd_res, _sync_error) = self.curp_client.propose(&cmd, true).await??;
let (cmd_res, _sync_error) = self
.curp_client
.propose(&cmd, self.token.as_ref(), true)
.await??;
cmd_res.into_inner()
} else {
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd, false).await?? else {
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(),false).await?? else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
let mut res_wrapper = cmd_res.into_inner();
Expand Down
Loading
Loading