Skip to content

Commit

Permalink
refactor: remove read state from kv_server
Browse files Browse the repository at this point in the history
refactor: disable fast path in etcd competible layer

WIP: fix rebase kv server
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Jun 24, 2024
1 parent bf5140a commit 7dd07b4
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 239 deletions.
2 changes: 1 addition & 1 deletion crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ where
async fn reset(&self, snapshot: Option<(Snapshot, LogIndex)>) -> Result<(), C::Error>;

/// Trigger the barrier of the given trigger id (based on propose id) and log index.
fn trigger(&self, id: InflightId, index: LogIndex);
fn trigger(&self, id: InflightId);
}

/// Codec for encoding and decoding data into/from the Protobuf format
Expand Down
2 changes: 1 addition & 1 deletion crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl CommandExecutor<TestCommand> for TestCE {
Ok(())
}

fn trigger(&self, _id: InflightId, _index: LogIndex) {}
fn trigger(&self, _id: InflightId) {}
}

impl TestCE {
Expand Down
2 changes: 1 addition & 1 deletion crates/curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
None
}
Err(err) => {
self.cmd_executor.trigger(entry.inflight_id(), entry.index);
self.cmd_executor.trigger(entry.inflight_id());
Some(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async fn worker_exe<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
| EntryData::SetNodeState(_, _, _) => true,
};
if !success {
ce.trigger(entry.inflight_id(), entry.index);
ce.trigger(entry.inflight_id());
}
success
}
Expand Down Expand Up @@ -240,7 +240,7 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
}
EntryData::Empty => true,
};
ce.trigger(entry.inflight_id(), entry.index);
ce.trigger(entry.inflight_id());
success
}

Expand Down
113 changes: 0 additions & 113 deletions crates/xline/src/server/barriers.rs

This file was deleted.

8 changes: 1 addition & 7 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use xlineapi::{
AlarmAction, AlarmRequest, AlarmType,
};

use super::barriers::IndexBarrier;
use crate::{
revision_number::RevisionNumberGenerator,
rpc::{RequestBackend, RequestWrapper},
Expand Down Expand Up @@ -73,8 +72,6 @@ pub(crate) struct CommandExecutor {
alarm_storage: Arc<AlarmStore>,
/// persistent storage
db: Arc<DB>,
/// Barrier for applied index
index_barrier: Arc<IndexBarrier>,
/// Barrier for propose id
id_barrier: Arc<IdBarrier<InflightId>>,
/// Revision Number generator for KV request and Lease request
Expand Down Expand Up @@ -225,7 +222,6 @@ impl CommandExecutor {
lease_storage: Arc<LeaseStore>,
alarm_storage: Arc<AlarmStore>,
db: Arc<DB>,
index_barrier: Arc<IndexBarrier>,
id_barrier: Arc<IdBarrier<InflightId>>,
general_rev: Arc<RevisionNumberGenerator>,
auth_rev: Arc<RevisionNumberGenerator>,
Expand All @@ -240,7 +236,6 @@ impl CommandExecutor {
lease_storage,
alarm_storage,
db,
index_barrier,
id_barrier,
general_rev,
auth_rev,
Expand Down Expand Up @@ -406,9 +401,8 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
Ok(u64::from_le_bytes(buf))
}

fn trigger(&self, id: InflightId, index: LogIndex) {
fn trigger(&self, id: InflightId) {
self.id_barrier.trigger(&id);
self.index_barrier.trigger(index);
}
}

Expand Down
Loading

0 comments on commit 7dd07b4

Please sign in to comment.