Skip to content

Commit

Permalink
refactor: revision fallback
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Jul 25, 2024
1 parent dac47fc commit 64eafda
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 88 deletions.
51 changes: 41 additions & 10 deletions crates/xline/src/revision_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,34 @@ use std::sync::atomic::{AtomicI64, Ordering};

/// Revision number
#[derive(Debug)]
pub(crate) struct RevisionNumberGenerator(AtomicI64);
pub(crate) struct RevisionNumberGenerator {
current: AtomicI64,
}

impl RevisionNumberGenerator {
/// Create a new revision
pub(crate) fn new(rev: i64) -> Self {
Self(AtomicI64::new(rev))
Self {
current: AtomicI64::new(rev),
}
}

/// Get the revision number
/// Get the current revision number
pub(crate) fn get(&self) -> i64 {
self.0.load(Ordering::Relaxed)
}

/// Get the next revision number
pub(crate) fn next(&self) -> i64 {
self.0.fetch_add(1, Ordering::Relaxed).wrapping_add(1)
self.current.load(Ordering::Acquire)
}

/// Set the revision number
pub(crate) fn set(&self, rev: i64) {
self.0.store(rev, Ordering::Relaxed);
self.current.store(rev, Ordering::Release);
}

/// Gets a temporary state
pub(crate) fn state(&self) -> RevisionNumberGeneratorState {
RevisionNumberGeneratorState {
current: &self.current,
next: AtomicI64::new(self.get()),
}
}
}

Expand All @@ -32,3 +39,27 @@ impl Default for RevisionNumberGenerator {
RevisionNumberGenerator::new(1)
}
}

/// Revision generator with temporary state
pub(crate) struct RevisionNumberGeneratorState<'a> {
current: &'a AtomicI64,
next: AtomicI64,
}

impl RevisionNumberGeneratorState<'_> {
/// Get the current revision number
pub(crate) fn get(&self) -> i64 {
self.next.load(Ordering::Acquire)
}

/// Increases the next revision number
pub(crate) fn next(&self) -> i64 {
self.next.fetch_add(1, Ordering::Release).wrapping_add(1)
}

/// Commit the revision number
pub(crate) fn commit(&self) {
self.current
.store(self.next.load(Ordering::Acquire), Ordering::Release)
}
}
36 changes: 30 additions & 6 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
})
.collect::<Result<_, _>>()?;

let index = self.kv_storage.index();
let mut index_state = index.state();
let general_revision_gen = self.kv_storage.revision_gen();
let auth_revision_gen = self.auth_storage.revision_gen();
let general_revision_state = general_revision_gen.state();
let auth_revision_state = auth_revision_gen.state();

let txn_db = self.db.transaction();
txn_db.write_op(WriteOp::PutAppliedIndex(highest_index))?;

Expand All @@ -323,7 +330,9 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
let wrapper = cmd.request();
let er = to_execute
.then(|| match wrapper.backend() {
RequestBackend::Kv => self.kv_storage.execute(wrapper, Some(&txn_db)),
RequestBackend::Kv => self
.kv_storage
.execute(wrapper, Some((&txn_db, &mut index_state))),
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
Expand All @@ -334,10 +343,23 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
tracing::info!("execute in after sync for: {cmd:?}");
}
let (asr, wr_ops) = match wrapper.backend() {
RequestBackend::Kv => (self.kv_storage.after_sync(wrapper, &txn_db).await?, vec![]),
RequestBackend::Auth => self.auth_storage.after_sync(wrapper)?,
RequestBackend::Lease => self.lease_storage.after_sync(wrapper).await?,
RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper),
RequestBackend::Kv => (
self.kv_storage
.after_sync(wrapper, &txn_db, &index_state, &general_revision_state)
.await?,
vec![],
),
RequestBackend::Auth => self
.auth_storage
.after_sync(wrapper, &auth_revision_state)?,
RequestBackend::Lease => {
self.lease_storage
.after_sync(wrapper, &general_revision_state)
.await?
}
RequestBackend::Alarm => self
.alarm_storage
.after_sync(wrapper, &general_revision_state),
};
txn_db.write_ops(wr_ops)?;
resps.push((asr, er));
Expand All @@ -359,10 +381,12 @@ impl CurpCommandExecutor<Command> for CommandExecutor {

self.lease_storage.mark_lease_synced(wrapper);
}
// FIXME: revision needs to fallback when commit failed
txn_db
.commit()
.map_err(|e| ExecuteError::DbError(e.to_string()))?;
index_state.commit();
general_revision_state.commit();
auth_revision_state.commit();

if !quota_enough {
if let Some(alarmer) = self.alarmer.read().clone() {
Expand Down
10 changes: 9 additions & 1 deletion crates/xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,15 @@ mod test {
});

let txn = store.db().transaction();
store.after_sync(&req, &txn).await.unwrap();
store
.after_sync(
&req,
&txn,
&store.index().state(),
&store.revision_gen().state(),
)
.await
.unwrap();
}

#[tokio::test]
Expand Down
13 changes: 7 additions & 6 deletions crates/xline/src/storage/alarm_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ use xlineapi::{
};

use super::db::{WriteOp, DB};
use crate::{header_gen::HeaderGenerator, revision_number::RevisionNumberGenerator};
use crate::{header_gen::HeaderGenerator, revision_number::RevisionNumberGeneratorState};

/// Alarm store
#[derive(Debug)]
pub(crate) struct AlarmStore {
/// Revision
revision: Arc<RevisionNumberGenerator>,
/// Header generator
header_gen: Arc<HeaderGenerator>,
/// Persistent storage
Expand Down Expand Up @@ -63,7 +61,11 @@ impl AlarmStore {
}

/// sync a alarm request
pub(crate) fn after_sync(&self, request: &RequestWrapper) -> (SyncResponse, Vec<WriteOp>) {
pub(crate) fn after_sync(
&self,
request: &RequestWrapper,
revision_gen: &RevisionNumberGeneratorState<'_>,
) -> (SyncResponse, Vec<WriteOp>) {
#[allow(clippy::wildcard_enum_match_arm)]
let ops = match *request {
RequestWrapper::AlarmRequest(ref req) => match req.action() {
Expand All @@ -75,7 +77,7 @@ impl AlarmStore {
unreachable!("Other request should not be sent to this store");
}
};
(SyncResponse::new(self.revision.get()), ops)
(SyncResponse::new(revision_gen.get()), ops)
}

/// Recover data form persistent storage
Expand All @@ -96,7 +98,6 @@ impl AlarmStore {
/// Create a new alarm store
pub(crate) fn new(header_gen: Arc<HeaderGenerator>, db: Arc<DB>) -> Self {
Self {
revision: header_gen.general_revision_arc(),
header_gen,
db,
types: RwLock::new(HashMap::new()),
Expand Down
19 changes: 15 additions & 4 deletions crates/xline/src/storage/auth_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use super::{
};
use crate::{
header_gen::HeaderGenerator,
revision_number::RevisionNumberGenerator,
revision_number::{RevisionNumberGenerator, RevisionNumberGeneratorState},
rpc::{
AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse,
AuthRoleAddRequest, AuthRoleAddResponse, AuthRoleDeleteRequest, AuthRoleDeleteResponse,
Expand Down Expand Up @@ -527,11 +527,12 @@ impl AuthStore {
pub(crate) fn after_sync<'a>(
&self,
request: &'a RequestWrapper,
revision_gen: &RevisionNumberGeneratorState,
) -> Result<(SyncResponse, Vec<WriteOp<'a>>), ExecuteError> {
let revision = if request.skip_auth_revision() {
self.revision.get()
revision_gen.get()
} else {
self.revision.next()
revision_gen.next()
};
#[allow(clippy::wildcard_enum_match_arm)]
let ops = match *request {
Expand Down Expand Up @@ -1165,6 +1166,13 @@ fn get_cn<T>(request: &tonic::Request<T>) -> Option<String> {
cert.subject_common_name()
}

impl AuthStore {
/// Gets the auth revision generator
pub(crate) fn revision_gen(&self) -> Arc<RevisionNumberGenerator> {
Arc::clone(&self.revision)
}
}

#[cfg(test)]
mod test {
use std::collections::HashMap;
Expand Down Expand Up @@ -1398,7 +1406,10 @@ mod test {
req: &RequestWrapper,
) -> Result<(CommandResponse, SyncResponse), ExecuteError> {
let cmd_res = store.execute(req)?;
let (sync_res, ops) = store.after_sync(req)?;
let rev_gen = store.revision_gen();
let rev_gen_state = rev_gen.state();
let (sync_res, ops) = store.after_sync(req, &rev_gen_state)?;
rev_gen_state.commit();
store.backend.flush_ops(ops)?;
Ok((cmd_res, sync_res))
}
Expand Down
7 changes: 4 additions & 3 deletions crates/xline/src/storage/compact/revision_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,17 @@ mod test {
let mut compactable = MockCompactable::new();
compactable.expect_compact().times(3).returning(Ok);
let revision_gen = Arc::new(RevisionNumberGenerator::new(110));
let revision_gen_state = revision_gen.state();
let revision_compactor = RevisionCompactor::new_arc(true, Arc::clone(&revision_gen), 100);
revision_compactor.set_compactable(compactable).await;
// auto_compactor works successfully
assert_eq!(revision_compactor.do_compact(None).await, Some(10));
revision_gen.next(); // current revision: 111
revision_gen_state.next(); // current revision: 111
assert_eq!(revision_compactor.do_compact(Some(10)).await, Some(11));
revision_compactor.pause();
revision_gen.next(); // current revision 112
revision_gen_state.next(); // current revision 112
assert!(revision_compactor.do_compact(Some(11)).await.is_none());
revision_gen.next(); // current revision 113
revision_gen_state.next(); // current revision 113
assert!(revision_compactor.do_compact(Some(11)).await.is_none());
revision_compactor.resume();
assert_eq!(revision_compactor.do_compact(Some(11)).await, Some(13));
Expand Down
Loading

0 comments on commit 64eafda

Please sign in to comment.