Skip to content

Commit

Permalink
refactor: refactor compact mod
Browse files Browse the repository at this point in the history
* unify the argument between PutKeyValue and DeleteKeyValue
* move compact.rs into compact mod
* using a bounded channel to replace the unbounded channel (compact_task_tx, compact_task_rx)

Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 authored and mergify[bot] committed Jul 25, 2023
1 parent 2ee85ae commit a48b303
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 69 deletions.
4 changes: 2 additions & 2 deletions xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ where
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect(),
RequestWrapper::CompactionRequest(ref _req) => Vec::new(),
_ => unreachable!("Other request should not be sent to this store"),
};
Command::new(key_ranges, wrapper, propose_id)
Expand Down Expand Up @@ -222,7 +223,7 @@ where
range_revision: i64,
compacted_revision: i64,
) -> Result<(), tonic::Status> {
(range_revision >= compacted_revision)
(range_revision <= 0 || range_revision >= compacted_revision)
.then_some(())
.ok_or(tonic::Status::invalid_argument(format!(
"required revision {range_revision} has been compacted, compacted revision is {compacted_revision}"
Expand Down Expand Up @@ -671,7 +672,6 @@ mod test {
.unwrap_err()
.to_string();
assert_eq!(message, expected_err_message);
assert_eq!(message, expected_err_message);
}

#[tokio::test]
Expand Down
8 changes: 4 additions & 4 deletions xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ mod test {
use crate::{
rpc::{PutRequest, RequestWithToken, WatchProgressRequest},
storage::{
db::DB, index::Index, kvwatcher::MockKvWatcherOps, lease_store::LeaseCollection,
KvStore,
compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, kvwatcher::MockKvWatcherOps,
lease_store::LeaseCollection, KvStore,
},
};

Expand Down Expand Up @@ -579,7 +579,7 @@ mod test {
#[tokio::test]
#[abort_on_panic]
async fn test_watch_prev_kv() {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let index = Arc::new(Index::new());
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
Expand Down Expand Up @@ -752,7 +752,7 @@ mod test {

#[tokio::test]
async fn watch_compacted_revision_should_fail() {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let index = Arc::new(Index::new());
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
Expand Down
15 changes: 8 additions & 7 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use clippy_utilities::{Cast, OverflowArithmetic};
use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator};
use event_listener::Event;
use jsonwebtoken::{DecodingKey, EncodingKey};
use tokio::{net::TcpListener, sync::mpsc::unbounded_channel};
use tokio::{net::TcpListener, sync::mpsc::channel};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;
use tonic_health::ServingStatus;
Expand All @@ -32,7 +32,7 @@ use crate::{
},
state::State,
storage::{
compact::compactor,
compact::{compactor, COMPACT_CHANNEL_SIZE},
index::Index,
kvwatcher::KvWatcher,
lease_store::LeaseCollection,
Expand Down Expand Up @@ -112,7 +112,7 @@ impl XlineServer {
/// Construct underlying storages, including `KvStore`, `LeaseStore`, `AuthStore`
#[allow(clippy::type_complexity)] // it is easy to read
#[inline]
fn construct_underlying_storages<S: StorageApi>(
async fn construct_underlying_storages<S: StorageApi>(
&self,
persistent: Arc<S>,
lease_collection: Arc<LeaseCollection>,
Expand All @@ -125,9 +125,9 @@ impl XlineServer {
Arc<AuthStore<S>>,
Arc<KvWatcher<S>>,
)> {
let (compact_task_tx, compact_task_rx) = unbounded_channel();
let (compact_task_tx, compact_task_rx) = channel(COMPACT_CHANNEL_SIZE);
let index = Arc::new(Index::new());
let (kv_update_tx, kv_update_rx) = tokio::sync::mpsc::channel(CHANNEL_SIZE);
let (kv_update_tx, kv_update_rx) = channel(CHANNEL_SIZE);
let kv_storage = Arc::new(KvStore::new(
Arc::clone(&index),
Arc::clone(&persistent),
Expand Down Expand Up @@ -167,7 +167,7 @@ impl XlineServer {
);
// lease storage must recover before kv storage
lease_storage.recover()?;
kv_storage.recover()?;
kv_storage.recover().await?;
auth_storage.recover()?;
Ok((kv_storage, lease_storage, auth_storage, watcher))
}
Expand Down Expand Up @@ -299,7 +299,8 @@ impl XlineServer {
Arc::clone(&header_gen),
Arc::clone(&auth_revision_gen),
key_pair,
)?;
)
.await?;

let index_barrier = Arc::new(IndexBarrier::new());
let id_barrier = Arc::new(IdBarrier::new());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use std::{sync::Arc, time::Duration};

use event_listener::Event;
use tokio::{sync::mpsc::UnboundedReceiver, time::sleep};
use tokio::{sync::mpsc::Receiver, time::sleep};

use super::{
index::{Index, IndexOperate},
storage_api::StorageApi,
KvStore,
};

/// compact task channel size
pub(crate) const COMPACT_CHANNEL_SIZE: usize = 32;

/// background compact executor
pub(crate) async fn compactor<DB>(
kv_store: Arc<KvStore<DB>>,
index: Arc<Index>,
batch_limit: usize,
interval: Duration,
mut compact_task_rx: UnboundedReceiver<(i64, Option<Arc<Event>>)>,
mut compact_task_rx: Receiver<(i64, Option<Arc<Event>>)>,
) where
DB: StorageApi,
{
Expand Down
18 changes: 9 additions & 9 deletions xline/src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{
kv_store::KV_TABLE,
lease_store::LEASE_TABLE,
storage_api::StorageApi,
ExecuteError, Revision,
ExecuteError,
};
use crate::{
rpc::{PbLease, Role, User},
Expand Down Expand Up @@ -133,8 +133,7 @@ impl StorageApi for DB {
.collect::<HashMap<_, _>>();
for op in ops {
let wop = match op {
WriteOp::PutKeyValue(rev, value) => {
let key = rev.encode_to_vec();
WriteOp::PutKeyValue(key, value) => {
WriteOperation::new_put(KV_TABLE, key, value.clone())
}
WriteOp::PutAppliedIndex(index) => WriteOperation::new_put(
Expand Down Expand Up @@ -198,7 +197,7 @@ impl StorageApi for DB {
#[non_exhaustive]
pub enum WriteOp<'a> {
/// Put a key-value pair to kv table
PutKeyValue(Revision, Vec<u8>),
PutKeyValue(Vec<u8>, Vec<u8>),
/// Put the applied index to meta table
PutAppliedIndex(u64),
/// Put a lease to lease table
Expand Down Expand Up @@ -231,14 +230,15 @@ mod test {
use test_macros::abort_on_panic;

use super::*;
use crate::storage::Revision;
#[tokio::test]
#[abort_on_panic]
async fn test_reset() -> Result<(), ExecuteError> {
let data_dir = PathBuf::from("/tmp/test_reset");
let db = DB::open(&StorageConfig::RocksDB(data_dir.clone()))?;

let revision = Revision::new(1, 1);
let key = revision.encode_to_vec();
let revision = Revision::new(1, 1).encode_to_vec();
let key = revision.clone();
let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())];
db.flush_ops(ops)?;
let res = db.get_value(KV_TABLE, &key)?;
Expand All @@ -264,8 +264,8 @@ mod test {
let snapshot_path = dir.join("snapshot");
let origin_db = DB::open(&StorageConfig::RocksDB(origin_db_path))?;

let revision = Revision::new(1, 1);
let key = revision.encode_to_vec();
let revision = Revision::new(1, 1).encode_to_vec();
let key: Vec<u8> = revision.clone();
let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())];
origin_db.flush_ops(ops)?;

Expand Down Expand Up @@ -336,7 +336,7 @@ mod test {
};
let role_bytes = role.encode_to_vec();
let write_ops = vec![
WriteOp::PutKeyValue(Revision::new(1, 2), "value".into()),
WriteOp::PutKeyValue(Revision::new(1, 2).encode_to_vec(), "value".into()),
WriteOp::PutAppliedIndex(5),
WriteOp::PutLease(lease),
WriteOp::PutAuthEnable(true),
Expand Down
48 changes: 16 additions & 32 deletions xline/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,44 +299,28 @@ impl IndexOperate for Index {
inner.index.iter_mut().for_each(|(key, revisions)| {
if let Some(revision) = revisions.first() {
if revision.mod_revision < at_rev {
match revisions.binary_search_by(|rev| rev.mod_revision.cmp(&at_rev)) {
Ok(idx) => {
let key_rev = revisions.get(idx).unwrap_or_else(|| {
unreachable!(
"{idx} is out of range, len of revisions is {}",
revisions.len()
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=idx)
} else {
revisions.drain(..idx)
};
revs.extend(compact_revs.into_iter());
}
Err(idx) => {
let compacted_last_idx = idx.overflow_sub(1);
let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| {
unreachable!(
"{idx} is out of range, len of revisions is {}",
revisions.len()
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=compacted_last_idx)
} else {
revisions.drain(..compacted_last_idx)
};
revs.extend(compact_revs.into_iter());
}
}
let pivot = revisions.partition_point(|rev| rev.mod_revision <= at_rev);
let compacted_last_idx = pivot.overflow_sub(1);
// There is at least 1 element in the first partition, so the key revision at `compacted_last_idx`
// must exist.
let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| {
unreachable!(
"Oops, the key revision at {compacted_last_idx} should not be None",
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=compacted_last_idx)
} else {
revisions.drain(..compacted_last_idx)
};
revs.extend(compact_revs.into_iter());

if revisions.is_empty() {
del_keys.push(key.clone());
}
}
}
});

for key in del_keys {
let _ignore = inner.index.remove(&key);
}
Expand Down
25 changes: 15 additions & 10 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
/// KV update sender
kv_update_tx: mpsc::Sender<(i64, Vec<Event>)>,
/// Compact task submit sender
compact_task_tx: mpsc::UnboundedSender<(i64, Option<Arc<event_listener::Event>>)>,
compact_task_tx: mpsc::Sender<(i64, Option<Arc<event_listener::Event>>)>,
/// Lease collection
lease_collection: Arc<LeaseCollection>,
}
Expand Down Expand Up @@ -86,7 +86,7 @@ where
}

/// Recover data from persistent storage
pub(crate) fn recover(&self) -> Result<(), ExecuteError> {
pub(crate) async fn recover(&self) -> Result<(), ExecuteError> {
let mut key_to_lease: HashMap<Vec<u8>, i64> = HashMap::new();
let kvs = self.db.get_all(KV_TABLE)?;

Expand Down Expand Up @@ -131,7 +131,7 @@ where
"compacted revision corruption, which ({compacted_revision}) must belong to the range [-1, {current_rev}]"
);
self.update_compacted_revision(compacted_revision);
if let Err(e) = self.compact_task_tx.send((compacted_revision, None)) {
if let Err(e) = self.compact_task_tx.send((compacted_revision, None)).await {
panic!("the compactor exited unexpectedly: {e:?}");
}
}
Expand All @@ -154,7 +154,7 @@ where
db: Arc<DB>,
header_gen: Arc<HeaderGenerator>,
kv_update_tx: mpsc::Sender<(i64, Vec<Event>)>,
compact_task_tx: mpsc::UnboundedSender<(i64, Option<Arc<event_listener::Event>>)>,
compact_task_tx: mpsc::Sender<(i64, Option<Arc<event_listener::Event>>)>,
lease_collection: Arc<LeaseCollection>,
) -> Self {
Self {
Expand Down Expand Up @@ -655,12 +655,13 @@ where
if let Err(e) = self
.compact_task_tx
.send((revision, Some(Arc::clone(&event))))
.await
{
panic!("the compactor exited unexpectedly: {e:?}");
}
event.listen().await;
} else {
if let Err(e) = self.compact_task_tx.send((revision, None)) {
if let Err(e) = self.compact_task_tx.send((revision, None)).await {
panic!("the compactor exited unexpectedly: {e:?}");
}
}
Expand Down Expand Up @@ -747,7 +748,7 @@ where
.unwrap_or_else(|e| panic!("unexpected error from lease Attach: {e}"));
}
ops.push(WriteOp::PutKeyValue(
new_rev.as_revision(),
new_rev.as_revision().encode_to_vec(),
kv.encode_to_vec(),
));
let event = Event {
Expand Down Expand Up @@ -793,7 +794,7 @@ where
..KeyValue::default()
};
let value = del_kv.encode_to_vec();
WriteOp::PutKeyValue(new_rev, value)
WriteOp::PutKeyValue(new_rev.encode_to_vec(), value)
})
.collect()
}
Expand Down Expand Up @@ -850,7 +851,11 @@ mod test {
use crate::{
revision_number::RevisionNumberGenerator,
rpc::RequestOp,
storage::{compact::compactor, db::DB, kvwatcher::KvWatcher},
storage::{
compact::{compactor, COMPACT_CHANNEL_SIZE},
db::DB,
kvwatcher::KvWatcher,
},
};

const CHANNEL_SIZE: usize = 1024;
Expand Down Expand Up @@ -887,7 +892,7 @@ mod test {
}

fn init_empty_store(db: Arc<DB>) -> Arc<KvStore<DB>> {
let (compact_tx, compact_rx) = mpsc::unbounded_channel();
let (compact_tx, compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE);
let lease_collection = Arc::new(LeaseCollection::new(0));
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
Expand Down Expand Up @@ -1079,7 +1084,7 @@ mod test {
assert_eq!(res.kvs.len(), 0);
assert_eq!(new_store.compacted_revision(), -1);

new_store.recover()?;
new_store.recover().await?;

let res = new_store.handle_range_request(&range_req)?;
assert_eq!(res.kvs.len(), 1);
Expand Down
7 changes: 5 additions & 2 deletions xline/src/storage/kvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,14 @@ mod test {
use crate::{
header_gen::HeaderGenerator,
rpc::{PutRequest, RequestWithToken},
storage::{db::DB, index::Index, lease_store::LeaseCollection, KvStore},
storage::{
compact::COMPACT_CHANNEL_SIZE, db::DB, index::Index, lease_store::LeaseCollection,
KvStore,
},
};

fn init_empty_store() -> (Arc<KvStore<DB>>, Arc<DB>, Arc<KvWatcher<DB>>) {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let index = Arc::new(Index::new());
Expand Down
2 changes: 1 addition & 1 deletion xline/src/storage/revision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(super) struct KeyRevision {

/// Revision
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct Revision {
pub(crate) struct Revision {
/// Main revision
revision: i64,
/// Sub revision in one transaction or range deletion
Expand Down

0 comments on commit a48b303

Please sign in to comment.