Skip to content

Commit

Permalink
refactor: refactor compact mod
Browse files Browse the repository at this point in the history
* move compact.rs into compact mod
* using a bounded channel to replace the unbounded channel (compact_task_tx, compact_task_rx)

Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jul 25, 2023
1 parent 28cadab commit fa7e0f5
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 23 deletions.
1 change: 0 additions & 1 deletion xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,6 @@ mod test {
.unwrap_err()
.to_string();
assert_eq!(message, expected_err_message);
assert_eq!(message, expected_err_message);
}

#[tokio::test]
Expand Down
6 changes: 3 additions & 3 deletions xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ mod test {
rpc::{PutRequest, RequestWithToken, WatchProgressRequest},
storage::{
db::DB, index::Index, kvwatcher::MockKvWatcherOps, lease_store::LeaseCollection,
KvStore,
KvStore, compact::COMPACT_CHANNEL_SIZE,
},
};

Expand Down Expand Up @@ -579,7 +579,7 @@ mod test {
#[tokio::test]
#[abort_on_panic]
async fn test_watch_prev_kv() {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let index = Arc::new(Index::new());
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
Expand Down Expand Up @@ -752,7 +752,7 @@ mod test {

#[tokio::test]
async fn watch_compacted_revision_should_fail() {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let index = Arc::new(Index::new());
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
Expand Down
14 changes: 7 additions & 7 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use clippy_utilities::{Cast, OverflowArithmetic};
use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator};
use event_listener::Event;
use jsonwebtoken::{DecodingKey, EncodingKey};
use tokio::{net::TcpListener, sync::mpsc::unbounded_channel};
use tokio::{net::TcpListener, sync::mpsc::channel};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;
use tonic_health::ServingStatus;
Expand All @@ -32,7 +32,7 @@ use crate::{
},
state::State,
storage::{
compact::compactor,
compact::{compactor, COMPACT_CHANNEL_SIZE},
index::Index,
kvwatcher::KvWatcher,
lease_store::LeaseCollection,
Expand Down Expand Up @@ -112,7 +112,7 @@ impl XlineServer {
/// Construct underlying storages, including `KvStore`, `LeaseStore`, `AuthStore`
#[allow(clippy::type_complexity)] // it is easy to read
#[inline]
fn construct_underlying_storages<S: StorageApi>(
async fn construct_underlying_storages<S: StorageApi>(
&self,
persistent: Arc<S>,
lease_collection: Arc<LeaseCollection>,
Expand All @@ -125,9 +125,9 @@ impl XlineServer {
Arc<AuthStore<S>>,
Arc<KvWatcher<S>>,
)> {
let (compact_task_tx, compact_task_rx) = unbounded_channel();
let (compact_task_tx, compact_task_rx) = channel(COMPACT_CHANNEL_SIZE);
let index = Arc::new(Index::new());
let (kv_update_tx, kv_update_rx) = tokio::sync::mpsc::channel(CHANNEL_SIZE);
let (kv_update_tx, kv_update_rx) = channel(CHANNEL_SIZE);
let kv_storage = Arc::new(KvStore::new(
Arc::clone(&index),
Arc::clone(&persistent),
Expand Down Expand Up @@ -167,7 +167,7 @@ impl XlineServer {
);
// lease storage must recover before kv storage
lease_storage.recover()?;
kv_storage.recover()?;
kv_storage.recover().await?;
auth_storage.recover()?;
Ok((kv_storage, lease_storage, auth_storage, watcher))
}
Expand Down Expand Up @@ -299,7 +299,7 @@ impl XlineServer {
Arc::clone(&header_gen),
Arc::clone(&auth_revision_gen),
key_pair,
)?;
).await?;

let index_barrier = Arc::new(IndexBarrier::new());
let id_barrier = Arc::new(IdBarrier::new());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use std::{sync::Arc, time::Duration};

use event_listener::Event;
use tokio::{sync::mpsc::UnboundedReceiver, time::sleep};
use tokio::{sync::mpsc::Receiver, time::sleep};

use super::{
index::{Index, IndexOperate},
storage_api::StorageApi,
KvStore,
};

/// compact task channel size
pub(crate) const COMPACT_CHANNEL_SIZE: usize = 32;

/// background compact executor
pub(crate) async fn compactor<DB>(
kv_store: Arc<KvStore<DB>>,
index: Arc<Index>,
batch_limit: usize,
interval: Duration,
mut compact_task_rx: UnboundedReceiver<(i64, Option<Arc<Event>>)>,
mut compact_task_rx: Receiver<(i64, Option<Arc<Event>>)>,
) where
DB: StorageApi,
{
Expand Down
17 changes: 9 additions & 8 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
/// KV update sender
kv_update_tx: mpsc::Sender<(i64, Vec<Event>)>,
/// Compact task submit sender
compact_task_tx: mpsc::UnboundedSender<(i64, Option<Arc<event_listener::Event>>)>,
compact_task_tx: mpsc::Sender<(i64, Option<Arc<event_listener::Event>>)>,
/// Lease collection
lease_collection: Arc<LeaseCollection>,
}
Expand Down Expand Up @@ -86,7 +86,7 @@ where
}

/// Recover data from persistent storage
pub(crate) fn recover(&self) -> Result<(), ExecuteError> {
pub(crate) async fn recover(&self) -> Result<(), ExecuteError> {
let mut key_to_lease: HashMap<Vec<u8>, i64> = HashMap::new();
let kvs = self.db.get_all(KV_TABLE)?;

Expand Down Expand Up @@ -131,7 +131,7 @@ where
"compacted revision corruption, which ({compacted_revision}) must belong to the range [-1, {current_rev}]"
);
self.update_compacted_revision(compacted_revision);
if let Err(e) = self.compact_task_tx.send((compacted_revision, None)) {
if let Err(e) = self.compact_task_tx.send((compacted_revision, None)).await {
panic!("the compactor exited unexpectedly: {e:?}");
}
}
Expand All @@ -154,7 +154,7 @@ where
db: Arc<DB>,
header_gen: Arc<HeaderGenerator>,
kv_update_tx: mpsc::Sender<(i64, Vec<Event>)>,
compact_task_tx: mpsc::UnboundedSender<(i64, Option<Arc<event_listener::Event>>)>,
compact_task_tx: mpsc::Sender<(i64, Option<Arc<event_listener::Event>>)>,
lease_collection: Arc<LeaseCollection>,
) -> Self {
Self {
Expand Down Expand Up @@ -655,12 +655,13 @@ where
if let Err(e) = self
.compact_task_tx
.send((revision, Some(Arc::clone(&event))))
.await
{
panic!("the compactor exited unexpectedly: {e:?}");
}
event.listen().await;
} else {
if let Err(e) = self.compact_task_tx.send((revision, None)) {
if let Err(e) = self.compact_task_tx.send((revision, None)).await {
panic!("the compactor exited unexpectedly: {e:?}");
}
}
Expand Down Expand Up @@ -850,7 +851,7 @@ mod test {
use crate::{
revision_number::RevisionNumberGenerator,
rpc::RequestOp,
storage::{compact::compactor, db::DB, kvwatcher::KvWatcher},
storage::{compact::{compactor, COMPACT_CHANNEL_SIZE}, db::DB, kvwatcher::KvWatcher},
};

const CHANNEL_SIZE: usize = 1024;
Expand Down Expand Up @@ -887,7 +888,7 @@ mod test {
}

fn init_empty_store(db: Arc<DB>) -> Arc<KvStore<DB>> {
let (compact_tx, compact_rx) = mpsc::unbounded_channel();
let (compact_tx, compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE);
let lease_collection = Arc::new(LeaseCollection::new(0));
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
Expand Down Expand Up @@ -1079,7 +1080,7 @@ mod test {
assert_eq!(res.kvs.len(), 0);
assert_eq!(new_store.compacted_revision(), -1);

new_store.recover()?;
new_store.recover().await?;

let res = new_store.handle_range_request(&range_req)?;
assert_eq!(res.kvs.len(), 1);
Expand Down
4 changes: 2 additions & 2 deletions xline/src/storage/kvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,11 @@ mod test {
use crate::{
header_gen::HeaderGenerator,
rpc::{PutRequest, RequestWithToken},
storage::{db::DB, index::Index, lease_store::LeaseCollection, KvStore},
storage::{db::DB, index::Index, lease_store::LeaseCollection, KvStore, compact::COMPACT_CHANNEL_SIZE},
};

fn init_empty_store() -> (Arc<KvStore<DB>>, Arc<DB>, Arc<KvWatcher<DB>>) {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (compact_tx, _compact_rx) = mpsc::channel(COMPACT_CHANNEL_SIZE);
let db = DB::open(&StorageConfig::Memory).unwrap();
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let index = Arc::new(Index::new());
Expand Down

0 comments on commit fa7e0f5

Please sign in to comment.