Skip to content

Commit

Permalink
refactor: remove DB generic
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Jun 17, 2024
1 parent e54d83f commit d836804
Show file tree
Hide file tree
Showing 21 changed files with 264 additions and 376 deletions.
21 changes: 6 additions & 15 deletions crates/xline/src/server/auth_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,15 @@ use crate::{
AuthUserRevokeRoleRequest, AuthUserRevokeRoleResponse, AuthenticateRequest,
AuthenticateResponse, RequestWrapper, ResponseWrapper,
},
storage::{storage_api::StorageApi, AuthStore},
storage::AuthStore,
};

/// Auth Server
pub(crate) struct AuthServer<S>
where
S: StorageApi,
{
pub(crate) struct AuthServer {
/// Consensus client
client: Arc<CurpClient>,
/// Auth Store
auth_store: Arc<AuthStore<S>>,
auth_store: Arc<AuthStore>,
}

/// Get token from metadata
Expand All @@ -44,12 +41,9 @@ pub(crate) fn get_token(metadata: &MetadataMap) -> Option<String> {
.and_then(|v| v.to_str().map(String::from).ok())
}

impl<S> AuthServer<S>
where
S: StorageApi,
{
impl AuthServer {
/// New `AuthServer`
pub(crate) fn new(client: Arc<CurpClient>, auth_store: Arc<AuthStore<S>>) -> Self {
pub(crate) fn new(client: Arc<CurpClient>, auth_store: Arc<AuthStore>) -> Self {
Self { client, auth_store }
}

Expand Down Expand Up @@ -89,10 +83,7 @@ where
}

#[tonic::async_trait]
impl<S> Auth for AuthServer<S>
where
S: StorageApi,
{
impl Auth for AuthServer {
async fn auth_enable(
&self,
request: tonic::Request<AuthEnableRequest>,
Expand Down
23 changes: 7 additions & 16 deletions crates/xline/src/server/auth_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,19 @@ use tracing::debug;
use xlineapi::command::Command;

use super::xline_server::CurpServer;
use crate::storage::{storage_api::StorageApi, AuthStore};
use crate::storage::AuthStore;

/// Auth wrapper
pub(crate) struct AuthWrapper<S>
where
S: StorageApi,
{
pub(crate) struct AuthWrapper {
/// Curp server
curp_server: CurpServer<S>,
curp_server: CurpServer,
/// Auth store
auth_store: Arc<AuthStore<S>>,
auth_store: Arc<AuthStore>,
}

impl<S> AuthWrapper<S>
where
S: StorageApi,
{
impl AuthWrapper {
/// Create a new auth wrapper
pub(crate) fn new(curp_server: CurpServer<S>, auth_store: Arc<AuthStore<S>>) -> Self {
pub(crate) fn new(curp_server: CurpServer, auth_store: Arc<AuthStore>) -> Self {
Self {
curp_server,
auth_store,
Expand All @@ -40,10 +34,7 @@ where
}

#[tonic::async_trait]
impl<S> Protocol for AuthWrapper<S>
where
S: StorageApi,
{
impl Protocol for AuthWrapper {
async fn propose(
&self,
mut request: tonic::Request<ProposeRequest>,
Expand Down
85 changes: 33 additions & 52 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use super::barriers::{IdBarrier, IndexBarrier};
use crate::{
revision_number::RevisionNumberGenerator,
rpc::{RequestBackend, RequestWrapper},
storage::{db::WriteOp, storage_api::StorageApi, AlarmStore, AuthStore, KvStore, LeaseStore},
storage::{
db::{WriteOp, DB},
AlarmStore, AuthStore, KvStore, LeaseStore,
},
};

/// Key of applied index
Expand Down Expand Up @@ -59,20 +62,17 @@ impl RangeType {

/// Command Executor
#[derive(Debug)]
pub(crate) struct CommandExecutor<S>
where
S: StorageApi,
{
pub(crate) struct CommandExecutor {
/// Kv Storage
kv_storage: Arc<KvStore<S>>,
kv_storage: Arc<KvStore>,
/// Auth Storage
auth_storage: Arc<AuthStore<S>>,
auth_storage: Arc<AuthStore>,
/// Lease Storage
lease_storage: Arc<LeaseStore<S>>,
lease_storage: Arc<LeaseStore>,
/// Alarm Storage
alarm_storage: Arc<AlarmStore<S>>,
alarm_storage: Arc<AlarmStore>,
/// persistent storage
persistent: Arc<S>,
db: Arc<DB>,
/// Barrier for applied index
index_barrier: Arc<IndexBarrier>,
/// Barrier for propose id
Expand All @@ -97,14 +97,11 @@ pub(crate) trait QuotaChecker: Sync + Send + Debug {

/// Quota checker for `Command`
#[derive(Debug)]
struct CommandQuotaChecker<S>
where
S: StorageApi,
{
struct CommandQuotaChecker {
/// Quota size
quota: u64,
/// persistent storage
persistent: Arc<S>,
db: Arc<DB>,
}

/// functions used to estimate request write size
Expand Down Expand Up @@ -160,27 +157,21 @@ mod size_estimate {
}
}

impl<S> CommandQuotaChecker<S>
where
S: StorageApi,
{
impl CommandQuotaChecker {
/// Create a new `CommandQuotaChecker`
fn new(quota: u64, persistent: Arc<S>) -> Self {
Self { quota, persistent }
fn new(quota: u64, db: Arc<DB>) -> Self {
Self { quota, db }
}
}

impl<S> QuotaChecker for CommandQuotaChecker<S>
where
S: StorageApi,
{
impl QuotaChecker for CommandQuotaChecker {
fn check(&self, cmd: &Command) -> bool {
if !cmd.need_check_quota() {
return true;
}
let cmd_size = size_estimate::cmd_size(cmd.request());
if self.persistent.estimated_file_size().overflow_add(cmd_size) > self.quota {
let Ok(file_size) = self.persistent.file_size() else {
if self.db.estimated_file_size().overflow_add(cmd_size) > self.quota {
let Ok(file_size) = self.db.file_size() else {
return false;
};
if file_size.overflow_add(cmd_size) > self.quota {
Expand Down Expand Up @@ -225,18 +216,15 @@ impl Alarmer {
}
}

impl<S> CommandExecutor<S>
where
S: StorageApi,
{
impl CommandExecutor {
/// New `CommandExecutor`
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
kv_storage: Arc<KvStore<S>>,
auth_storage: Arc<AuthStore<S>>,
lease_storage: Arc<LeaseStore<S>>,
alarm_storage: Arc<AlarmStore<S>>,
persistent: Arc<S>,
kv_storage: Arc<KvStore>,
auth_storage: Arc<AuthStore>,
lease_storage: Arc<LeaseStore>,
alarm_storage: Arc<AlarmStore>,
db: Arc<DB>,
index_barrier: Arc<IndexBarrier>,
id_barrier: Arc<IdBarrier>,
general_rev: Arc<RevisionNumberGenerator>,
Expand All @@ -245,13 +233,13 @@ where
quota: u64,
) -> Self {
let alarmer = RwLock::new(None);
let quota_checker = Arc::new(CommandQuotaChecker::new(quota, Arc::clone(&persistent)));
let quota_checker = Arc::new(CommandQuotaChecker::new(quota, Arc::clone(&db)));
Self {
kv_storage,
auth_storage,
lease_storage,
alarm_storage,
persistent,
db,
index_barrier,
id_barrier,
general_rev,
Expand Down Expand Up @@ -293,10 +281,7 @@ where
}

#[async_trait::async_trait]
impl<S> CurpCommandExecutor<Command> for CommandExecutor<S>
where
S: StorageApi,
{
impl CurpCommandExecutor<Command> for CommandExecutor {
fn prepare(
&self,
cmd: &Command,
Expand Down Expand Up @@ -368,7 +353,7 @@ where
}
};
ops.append(&mut wr_ops);
let key_revisions = self.persistent.flush_ops(ops)?;
let key_revisions = self.db.flush_ops(ops)?;
if !key_revisions.is_empty() {
self.kv_storage.insert_index(key_revisions);
}
Expand All @@ -393,30 +378,26 @@ where
snapshot: Option<(Snapshot, LogIndex)>,
) -> Result<(), <Command as CurpCommand>::Error> {
let s = if let Some((snapshot, index)) = snapshot {
_ = self
.persistent
.flush_ops(vec![WriteOp::PutAppliedIndex(index)])?;
_ = self.db.flush_ops(vec![WriteOp::PutAppliedIndex(index)])?;
Some(snapshot)
} else {
None
};
self.persistent.reset(s).await
self.db.reset(s).await
}

async fn snapshot(&self) -> Result<Snapshot, <Command as CurpCommand>::Error> {
let path = format!("/tmp/snapshot-{}", uuid::Uuid::new_v4());
self.persistent.get_snapshot(path)
self.db.get_snapshot(path)
}

fn set_last_applied(&self, index: LogIndex) -> Result<(), <Command as CurpCommand>::Error> {
_ = self
.persistent
.flush_ops(vec![WriteOp::PutAppliedIndex(index)])?;
_ = self.db.flush_ops(vec![WriteOp::PutAppliedIndex(index)])?;
Ok(())
}

fn last_applied(&self) -> Result<LogIndex, <Command as CurpCommand>::Error> {
let Some(index_bytes) = self.persistent.get_value(META_TABLE, APPLIED_INDEX_KEY)? else {
let Some(index_bytes) = self.db.get_value(META_TABLE, APPLIED_INDEX_KEY)? else {
return Ok(0);
};
let buf: [u8; 8] = index_bytes
Expand Down
25 changes: 8 additions & 17 deletions crates/xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,15 @@ use crate::{
PutRequest, PutResponse, RangeRequest, RangeResponse, RequestWrapper, Response, ResponseOp,
TxnRequest, TxnResponse,
},
storage::{storage_api::StorageApi, AuthStore, KvStore},
storage::{AuthStore, KvStore},
};

/// KV Server
pub(crate) struct KvServer<S>
where
S: StorageApi,
{
pub(crate) struct KvServer {
/// KV storage
kv_storage: Arc<KvStore<S>>,
kv_storage: Arc<KvStore>,
/// Auth storage
auth_storage: Arc<AuthStore<S>>,
auth_storage: Arc<AuthStore>,
/// Barrier for applied index
index_barrier: Arc<IndexBarrier>,
/// Barrier for propose id
Expand All @@ -56,15 +53,12 @@ where
next_compact_id: AtomicU64,
}

impl<S> KvServer<S>
where
S: StorageApi,
{
impl KvServer {
/// New `KvServer`
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
kv_storage: Arc<KvStore<S>>,
auth_storage: Arc<AuthStore<S>>,
kv_storage: Arc<KvStore>,
auth_storage: Arc<AuthStore>,
index_barrier: Arc<IndexBarrier>,
id_barrier: Arc<IdBarrier>,
range_retry_timeout: Duration,
Expand Down Expand Up @@ -193,10 +187,7 @@ where
}

#[tonic::async_trait]
impl<S> Kv for KvServer<S>
where
S: StorageApi,
{
impl Kv for KvServer {
/// Range gets the keys in the range from the key-value store.
#[instrument(skip_all)]
async fn range(
Expand Down
27 changes: 9 additions & 18 deletions crates/xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,18 @@ use crate::{
LeaseKeepAliveResponse, LeaseLeasesRequest, LeaseLeasesResponse, LeaseRevokeRequest,
LeaseRevokeResponse, LeaseTimeToLiveRequest, LeaseTimeToLiveResponse, RequestWrapper,
},
storage::{storage_api::StorageApi, AuthStore, LeaseStore},
storage::{AuthStore, LeaseStore},
};

/// Default Lease Request Time
const DEFAULT_LEASE_REQUEST_TIME: Duration = Duration::from_millis(500);

/// Lease Server
pub(crate) struct LeaseServer<S>
where
S: StorageApi,
{
pub(crate) struct LeaseServer {
/// Lease storage
lease_storage: Arc<LeaseStore<S>>,
lease_storage: Arc<LeaseStore>,
/// Auth storage
auth_storage: Arc<AuthStore<S>>,
auth_storage: Arc<AuthStore>,
/// Consensus client
client: Arc<CurpClient>,
/// Id generator
Expand All @@ -55,14 +52,11 @@ where
task_manager: Arc<TaskManager>,
}

impl<S> LeaseServer<S>
where
S: StorageApi,
{
impl LeaseServer {
/// New `LeaseServer`
pub(crate) fn new(
lease_storage: Arc<LeaseStore<S>>,
auth_storage: Arc<AuthStore<S>>,
lease_storage: Arc<LeaseStore>,
auth_storage: Arc<AuthStore>,
client: Arc<CurpClient>,
id_gen: Arc<IdGenerator>,
cluster_info: Arc<ClusterInfo>,
Expand All @@ -87,7 +81,7 @@ where
/// Task of revoke expired leases
#[allow(clippy::arithmetic_side_effects, clippy::ignored_unit_patterns)] // Introduced by tokio::select!
async fn revoke_expired_leases_task(
lease_server: Arc<LeaseServer<S>>,
lease_server: Arc<LeaseServer>,
shutdown_listener: Listener,
) {
loop {
Expand Down Expand Up @@ -258,10 +252,7 @@ fn build_endpoints(
}

#[tonic::async_trait]
impl<S> Lease for LeaseServer<S>
where
S: StorageApi,
{
impl Lease for LeaseServer {
/// LeaseGrant creates a lease which expires if the server does not receive a keepAlive
/// within a given time to live period. All keys attached to the lease will be expired and
/// deleted if the lease expires. Each expired key generates a delete event in the event history.
Expand Down
Loading

0 comments on commit d836804

Please sign in to comment.