Skip to content

Commit

Permalink
feat: add auth wrapper
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Feb 17, 2024
1 parent 1f17be2 commit ac27079
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 23 deletions.
46 changes: 27 additions & 19 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ use curp_external_api::{
use prost::Message;
use serde::{Deserialize, Serialize};

pub(crate) use self::proto::{
commandpb::CurpError as CurpErrorWrapper,
inner_messagepb::{
inner_protocol_server::InnerProtocol, AppendEntriesRequest, AppendEntriesResponse,
InstallSnapshotRequest, InstallSnapshotResponse, TriggerShutdownRequest,
TriggerShutdownResponse, TryBecomeLeaderNowRequest, TryBecomeLeaderNowResponse,
VoteRequest, VoteResponse,
},
};
pub use self::proto::{
commandpb::{
cmd_result::Result as CmdResultInner,
Expand All @@ -15,37 +24,30 @@ pub use self::proto::{
fetch_read_state_response::{IdSet, ReadState},
propose_conf_change_request::{ConfChange, ConfChangeType},
protocol_client,
protocol_server::ProtocolServer,
protocol_server::{Protocol, ProtocolServer},
CmdResult,
FetchClusterRequest,
FetchClusterResponse,
FetchReadStateRequest,
FetchReadStateResponse,
LeaseKeepAliveMsg,
Member,
MoveLeaderRequest,
MoveLeaderResponse,
ProposeConfChangeRequest,
ProposeConfChangeResponse,
ProposeId as PbProposeId,
ProposeRequest,

ProposeResponse,
PublishRequest,
PublishResponse,
ShutdownRequest,
ShutdownResponse,
WaitSyncedRequest,
WaitSyncedResponse,
},
inner_messagepb::inner_protocol_server::InnerProtocolServer,
};
pub(crate) use self::proto::{
commandpb::{
protocol_server::Protocol, CurpError as CurpErrorWrapper, FetchReadStateRequest,
FetchReadStateResponse, LeaseKeepAliveMsg, ShutdownRequest, ShutdownResponse,
WaitSyncedRequest, WaitSyncedResponse,
},
inner_messagepb::{
inner_protocol_server::InnerProtocol, AppendEntriesRequest, AppendEntriesResponse,
InstallSnapshotRequest, InstallSnapshotResponse, TriggerShutdownRequest,
TriggerShutdownResponse, TryBecomeLeaderNowRequest, TryBecomeLeaderNowResponse,
VoteRequest, VoteResponse,
},
};
use crate::{cmd::Command, log_entry::LogEntry, members::ServerId, LogIndex};

/// Metrics
Expand Down Expand Up @@ -127,7 +129,8 @@ impl FetchClusterResponse {

impl ProposeRequest {
/// Create a new `Propose` request
pub(crate) fn new<C: Command>(propose_id: ProposeId, cmd: &C, cluster_version: u64) -> Self {
#[inline]
pub fn new<C: Command>(propose_id: ProposeId, cmd: &C, cluster_version: u64) -> Self {
Self {
propose_id: Some(propose_id.into()),
command: cmd.encode(),
Expand All @@ -136,15 +139,20 @@ impl ProposeRequest {
}

/// Get the propose id
pub(crate) fn propose_id(&self) -> ProposeId {
#[inline]
#[must_use]
pub fn propose_id(&self) -> ProposeId {
self.propose_id
.clone()
.unwrap_or_else(|| unreachable!("propose id must be set in ProposeRequest"))
.into()
}

/// Get command
pub(crate) fn cmd<C: Command>(&self) -> Result<C, PbSerializeError> {
/// # Errors
/// Return error if the command can't be decoded
#[inline]
pub fn cmd<C: Command>(&self) -> Result<C, PbSerializeError> {
C::decode(&self.command)
}
}
Expand Down Expand Up @@ -883,7 +891,7 @@ impl<C> From<Vec<ConfChange>> for PoolEntryInner<C> {
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Ord, PartialOrd, Default,
)]
#[allow(clippy::exhaustive_structs)] // It is exhaustive
pub(crate) struct ProposeId(pub(crate) u64, pub(crate) u64);
pub struct ProposeId(pub(crate) u64, pub(crate) u64);

impl std::fmt::Display for ProposeId {
#[inline]
Expand Down
110 changes: 110 additions & 0 deletions crates/xline/src/server/auth_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::sync::Arc;

use curp::rpc::{
FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest, FetchReadStateResponse,
LeaseKeepAliveMsg, MoveLeaderRequest, MoveLeaderResponse, ProposeConfChangeRequest,
ProposeConfChangeResponse, ProposeRequest, ProposeResponse, Protocol, PublishRequest,
PublishResponse, ShutdownRequest, ShutdownResponse, WaitSyncedRequest, WaitSyncedResponse,
};
use tracing::debug;

use super::xline_server::CurpServer;
use crate::storage::{storage_api::StorageApi, AuthStore};

/// Auth wrapper
pub(crate) struct AuthWrapper<S>
where
S: StorageApi,
{
/// Curp server
curp_server: CurpServer<S>,
/// Auth store
#[allow(unused)] // TODO: this will be used in the future
auth_store: Arc<AuthStore<S>>,
}

impl<S> AuthWrapper<S>
where
S: StorageApi,
{
/// Create a new auth wrapper
pub(crate) fn new(curp_server: CurpServer<S>, auth_store: Arc<AuthStore<S>>) -> Self {
Self {
curp_server,
auth_store,
}
}
}

#[tonic::async_trait]
impl<S> Protocol for AuthWrapper<S>
where
S: StorageApi,
{
async fn propose(
&self,
request: tonic::Request<ProposeRequest>,
) -> Result<tonic::Response<ProposeResponse>, tonic::Status> {
debug!(
"AuthWrapper received propose request: {}",
request.get_ref().propose_id()
);
self.curp_server.propose(request).await
}

async fn shutdown(
&self,
request: tonic::Request<ShutdownRequest>,
) -> Result<tonic::Response<ShutdownResponse>, tonic::Status> {
self.curp_server.shutdown(request).await
}

async fn propose_conf_change(
&self,
request: tonic::Request<ProposeConfChangeRequest>,
) -> Result<tonic::Response<ProposeConfChangeResponse>, tonic::Status> {
self.curp_server.propose_conf_change(request).await
}

async fn publish(
&self,
request: tonic::Request<PublishRequest>,
) -> Result<tonic::Response<PublishResponse>, tonic::Status> {
self.curp_server.publish(request).await
}

async fn wait_synced(
&self,
request: tonic::Request<WaitSyncedRequest>,
) -> Result<tonic::Response<WaitSyncedResponse>, tonic::Status> {
self.curp_server.wait_synced(request).await
}

async fn fetch_cluster(
&self,
request: tonic::Request<FetchClusterRequest>,
) -> Result<tonic::Response<FetchClusterResponse>, tonic::Status> {
self.curp_server.fetch_cluster(request).await
}

async fn fetch_read_state(
&self,
request: tonic::Request<FetchReadStateRequest>,
) -> Result<tonic::Response<FetchReadStateResponse>, tonic::Status> {
self.curp_server.fetch_read_state(request).await
}

async fn move_leader(
&self,
request: tonic::Request<MoveLeaderRequest>,
) -> Result<tonic::Response<MoveLeaderResponse>, tonic::Status> {
self.curp_server.move_leader(request).await
}

async fn lease_keep_alive(
&self,
request: tonic::Request<tonic::Streaming<LeaseKeepAliveMsg>>,
) -> Result<tonic::Response<LeaseKeepAliveMsg>, tonic::Status> {
self.curp_server.lease_keep_alive(request).await
}
}
2 changes: 2 additions & 0 deletions crates/xline/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/// Xline auth server
mod auth_server;
/// Auth Wrapper
mod auth_wrapper;
/// Barriers for range requests
mod barriers;
/// Cluster server
Expand Down
14 changes: 10 additions & 4 deletions crates/xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use xlineapi::command::{Command, CurpClient};

use super::{
auth_server::AuthServer,
auth_wrapper::AuthWrapper,
barriers::{IdBarrier, IndexBarrier},
cluster_server::ClusterServer,
command::{Alarmer, CommandExecutor},
Expand Down Expand Up @@ -72,7 +73,7 @@ use crate::{
};

/// Rpc Server of curp protocol
type CurpServer<S> = Rpc<Command, State<S, Arc<CurpClient>>>;
pub(crate) type CurpServer<S> = Rpc<Command, State<S, Arc<CurpClient>>>;

/// Xline server
#[derive(Debug)]
Expand Down Expand Up @@ -330,6 +331,7 @@ impl XlineServer {
maintenance_server,
cluster_server,
curp_server,
auth_wrapper,
curp_client,
) = self.init_servers(persistent, key_pair).await?;
let mut builder = Server::builder();
Expand All @@ -345,7 +347,9 @@ impl XlineServer {
.add_service(RpcWatchServer::new(watch_server))
.add_service(RpcMaintenanceServer::new(maintenance_server))
.add_service(RpcClusterServer::new(cluster_server))
.add_service(ProtocolServer::new(curp_server.clone()))
.add_service(ProtocolServer::new(auth_wrapper))
// TODO: run origin curp server in a separate port
// .add_service(ProtocolServer::new(curp_server.clone()))
.add_service(InnerProtocolServer::new(curp_server));
#[cfg(not(madsim))]
let router = {
Expand Down Expand Up @@ -470,6 +474,7 @@ impl XlineServer {
MaintenanceServer<S>,
ClusterServer,
CurpServer<S>,
AuthWrapper<S>,
Arc<CurpClient>,
)> {
let (header_gen, id_gen) = Self::construct_generator(&self.cluster_info);
Expand Down Expand Up @@ -583,7 +588,7 @@ impl XlineServer {
),
LeaseServer::new(
lease_storage,
auth_storage,
Arc::clone(&auth_storage),
Arc::clone(&client),
id_gen,
Arc::clone(&self.cluster_info),
Expand All @@ -608,7 +613,8 @@ impl XlineServer {
alarm_storage,
),
ClusterServer::new(Arc::clone(&client), header_gen),
curp_server,
curp_server.clone(),
AuthWrapper::new(curp_server, auth_storage),
client,
))
}
Expand Down

0 comments on commit ac27079

Please sign in to comment.