Skip to content

Commit

Permalink
feat(compactor): implement compactor task
Browse files Browse the repository at this point in the history
Refs: #188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 authored and mergify[bot] committed Jul 25, 2023
1 parent 8aa1f3b commit 6ae4a29
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 31 deletions.
21 changes: 14 additions & 7 deletions xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,10 @@ mod test {

use parking_lot::Mutex;
use test_macros::abort_on_panic;
use tokio::time::{sleep, timeout};
use tokio::{
sync::mpsc,
time::{sleep, timeout},
};
use utils::config::{default_watch_progress_notify_interval, StorageConfig};

use super::*;
Expand Down Expand Up @@ -576,18 +579,20 @@ mod test {
#[tokio::test]
#[abort_on_panic]
async fn test_watch_prev_kv() {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let index = Arc::new(Index::new());
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let lease_collection = Arc::new(LeaseCollection::new(0));
let next_id_gen = Arc::new(WatchIdGenerator::new(1));
let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE);
let kv_store = Arc::new(KvStore::new(
index,
Arc::clone(&db),
Arc::clone(&header_gen),
kv_update_tx,
compact_tx,
lease_collection,
Arc::clone(&header_gen),
Arc::clone(&db),
index,
));
let shutdown_trigger = Arc::new(event_listener::Event::new());
let kv_watcher = KvWatcher::new_arc(
Expand Down Expand Up @@ -747,18 +752,20 @@ mod test {

#[tokio::test]
async fn watch_compacted_revision_should_fail() {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let index = Arc::new(Index::new());
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let lease_collection = Arc::new(LeaseCollection::new(0));
let next_id_gen = Arc::new(WatchIdGenerator::new(1));
let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE);
let kv_store = Arc::new(KvStore::new(
index,
Arc::clone(&db),
Arc::clone(&header_gen),
kv_update_tx,
compact_tx,
lease_collection,
Arc::clone(&header_gen),
Arc::clone(&db),
index,
));
let shutdown_trigger = Arc::new(event_listener::Event::new());
let kv_watcher = KvWatcher::new_arc(
Expand Down
15 changes: 12 additions & 3 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use clippy_utilities::{Cast, OverflowArithmetic};
use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator};
use event_listener::Event;
use jsonwebtoken::{DecodingKey, EncodingKey};
use tokio::net::TcpListener;
use tokio::{net::TcpListener, sync::mpsc::unbounded_channel};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;
use tonic_health::ServingStatus;
Expand All @@ -32,6 +32,7 @@ use crate::{
},
state::State,
storage::{
compact::compactor,
index::Index,
kvwatcher::KvWatcher,
lease_store::LeaseCollection,
Expand Down Expand Up @@ -120,15 +121,23 @@ impl XlineServer {
Arc<AuthStore<S>>,
Arc<KvWatcher<S>>,
)> {
let (compact_task_tx, compact_task_rx) = unbounded_channel();
let index = Arc::new(Index::new());
let (kv_update_tx, kv_update_rx) = tokio::sync::mpsc::channel(CHANNEL_SIZE);
let kv_storage = Arc::new(KvStore::new(
Arc::clone(&index),
Arc::clone(&persistent),
Arc::clone(&header_gen),
kv_update_tx.clone(),
compact_task_tx,
Arc::clone(&lease_collection),
Arc::clone(&header_gen),
Arc::clone(&persistent),
));
let _hd = tokio::spawn(compactor(
Arc::clone(&kv_storage),
Arc::clone(&index),
compact_task_rx,
));
// TODO: Boot up the compact policy scheduler
let lease_storage = Arc::new(LeaseStore::new(
Arc::clone(&lease_collection),
Arc::clone(&header_gen),
Expand Down
39 changes: 39 additions & 0 deletions xline/src/storage/compact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::sync::Arc;

use event_listener::Event;
use tokio::{sync::mpsc::UnboundedReceiver, time::sleep};

use super::{
index::{Index, IndexOperate},
storage_api::StorageApi,
KvStore,
};

/// background compact executor
pub(crate) async fn compactor<DB>(
kv_store: Arc<KvStore<DB>>,
index: Arc<Index>,
mut compact_task_rx: UnboundedReceiver<(i64, Option<Arc<Event>>)>,
) where
DB: StorageApi,
{
// TODO: make compact_interval and compact_batch_limit configurable
let compact_interval = std::time::Duration::from_millis(10);
let compact_batch_limit = 1000;
while let Some((revision, listener)) = compact_task_rx.recv().await {
let target_revisions = index
.compact(revision)
.into_iter()
.map(|key_rev| key_rev.as_revision().encode_to_vec())
.collect::<Vec<Vec<_>>>();
for revision_chunk in target_revisions.chunks(compact_batch_limit) {
if let Err(e) = kv_store.compact(revision_chunk) {
panic!("failed to compact revision chunk {revision_chunk:?} due to {e}");
}
sleep(compact_interval).await;
}
if let Some(notifier) = listener {
notifier.notify(usize::MAX);
}
}
}
48 changes: 30 additions & 18 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
pub(crate) const KV_TABLE: &str = "kv";

/// KV store
#[allow(dead_code)]
#[derive(Debug)]
pub(crate) struct KvStore<DB>
where
Expand All @@ -52,6 +53,8 @@ where
header_gen: Arc<HeaderGenerator>,
/// KV update sender
kv_update_tx: mpsc::Sender<(i64, Vec<Event>)>,
/// Compact task submit sender
compact_task_tx: mpsc::UnboundedSender<(i64, Option<Arc<event_listener::Event>>)>,
/// Lease collection
lease_collection: Arc<LeaseCollection>,
}
Expand Down Expand Up @@ -131,11 +134,12 @@ where
{
/// New `KvStore`
pub(crate) fn new(
index: Arc<Index>,
db: Arc<DB>,
header_gen: Arc<HeaderGenerator>,
kv_update_tx: mpsc::Sender<(i64, Vec<Event>)>,
compact_task_tx: mpsc::UnboundedSender<(i64, Option<Arc<event_listener::Event>>)>,
lease_collection: Arc<LeaseCollection>,
header_gen: Arc<HeaderGenerator>,
db: Arc<DB>,
index: Arc<Index>,
) -> Self {
Self {
index,
Expand All @@ -144,6 +148,7 @@ where
revision: header_gen.revision_arc(),
header_gen,
kv_update_tx,
compact_task_tx,
lease_collection,
}
}
Expand Down Expand Up @@ -334,16 +339,9 @@ where
}

/// Compact kv storage
#[allow(dead_code)]
fn compact(&self, at_rev: i64) -> Result<(), ExecuteError> {
let compacted_rev = self
.index
.compact(at_rev)
.into_iter()
.map(|key_rev| key_rev.as_revision().encode_to_vec())
.collect::<Vec<Vec<_>>>();
pub(crate) fn compact(&self, revisions: &[Vec<u8>]) -> Result<(), ExecuteError> {
let mut ops = Vec::new();
compacted_rev
revisions
.iter()
.for_each(|rev| ops.push(WriteOp::DeleteKeyValue(rev.as_ref())));
self.db.flush_ops(ops)?;
Expand Down Expand Up @@ -823,16 +821,18 @@ mod test {
}

fn init_empty_store(db: Arc<DB>) -> Arc<KvStore<DB>> {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE);
let lease_collection = Arc::new(LeaseCollection::new(0));
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let index = Arc::new(Index::new());
let storage = Arc::new(KvStore::new(
index,
db,
header_gen,
kv_update_tx,
compact_tx,
lease_collection,
header_gen,
db,
index,
));
let shutdown_trigger = Arc::new(event_listener::Event::new());
let _watcher = KvWatcher::new_arc(
Expand All @@ -855,6 +855,15 @@ mod test {
Ok(())
}

fn index_compact(store: &Arc<KvStore<DB>>, at_rev: i64) -> Vec<Vec<u8>> {
store
.index
.compact(at_rev)
.into_iter()
.map(|key_rev| key_rev.as_revision().encode_to_vec())
.collect::<Vec<Vec<_>>>()
}

#[tokio::test]
#[abort_on_panic]
async fn test_keys_only() -> Result<(), ExecuteError> {
Expand Down Expand Up @@ -1126,7 +1135,8 @@ mod test {
.unwrap();
}

store.compact(3)?;
let target_revisions = index_compact(&store, 3);
store.compact(target_revisions.as_ref())?;
assert_eq!(
store.get_range(b"a", b"", 2).unwrap().len(),
1,
Expand All @@ -1138,7 +1148,8 @@ mod test {
"(b, 2) should not be removed"
);

store.compact(4)?;
let target_revisions = index_compact(&store, 4);
store.compact(target_revisions.as_ref())?;
assert!(
store.get_range(b"a", b"", 2).unwrap().is_empty(),
"(a, 1) should be removed"
Expand All @@ -1154,7 +1165,8 @@ mod test {
"(a, 3) should not be removed"
);

store.compact(5)?;
let target_revisions = index_compact(&store, 5);
store.compact(target_revisions.as_ref())?;
assert!(
store.get_range(b"a", b"", 2).unwrap().is_empty(),
"(a, 1) should be removed"
Expand Down
8 changes: 5 additions & 3 deletions xline/src/storage/kvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,17 +555,19 @@ mod test {
};

fn init_empty_store() -> (Arc<KvStore<DB>>, Arc<DB>, Arc<KvWatcher<DB>>) {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let index = Arc::new(Index::new());
let lease_collection = Arc::new(LeaseCollection::new(0));
let (kv_update_tx, kv_update_rx) = mpsc::channel(128);
let store = Arc::new(KvStore::new(
index,
Arc::clone(&db),
header_gen,
kv_update_tx,
compact_tx,
lease_collection,
header_gen,
Arc::clone(&db),
index,
));
let shutdown_trigger = Arc::new(event_listener::Event::new());
let sync_victims_interval = Duration::from_millis(10);
Expand Down
2 changes: 2 additions & 0 deletions xline/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/// Storage for Auth
pub(crate) mod auth_store;
/// Compactor
pub(crate) mod compact;
/// Database module
pub mod db;
/// Execute error
Expand Down

0 comments on commit 6ae4a29

Please sign in to comment.