Skip to content

Commit

Permalink
chore: remove some redundant reference
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon authored and mergify[bot] committed Oct 19, 2023
1 parent d5aad1c commit aa83ea0
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 40 deletions.
53 changes: 25 additions & 28 deletions curp/src/server/cmd_board.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,46 +64,46 @@ impl<C: Command> CommandBoard<C> {
}

/// Insert er to internal buffer
pub(super) fn insert_er(&mut self, id: &ProposeId, er: Result<C::ER, C::Error>) {
pub(super) fn insert_er(&mut self, id: ProposeId, er: Result<C::ER, C::Error>) {
let er_ok = er.is_ok();
assert!(
self.er_buffer.insert(*id, er).is_none(),
self.er_buffer.insert(id, er).is_none(),
"er should not be inserted twice"
);

self.notify_er(id);
self.notify_er(&id);

// wait_synced response is also ready when execution fails
if !er_ok {
self.notify_asr(id);
self.notify_asr(&id);
}
}

/// Insert asr to internal buffer
pub(super) fn insert_asr(&mut self, id: &ProposeId, asr: Result<C::ASR, C::Error>) {
pub(super) fn insert_asr(&mut self, id: ProposeId, asr: Result<C::ASR, C::Error>) {
assert!(
self.asr_buffer.insert(*id, asr).is_none(),
self.asr_buffer.insert(id, asr).is_none(),
"asr should not be inserted twice"
);

self.notify_asr(id);
self.notify_asr(&id);
}

/// Insert conf change result to internal buffer
pub(super) fn insert_conf(&mut self, id: &ProposeId) {
pub(super) fn insert_conf(&mut self, id: ProposeId) {
assert!(
self.conf_buffer.insert(*id),
self.conf_buffer.insert(id),
"conf should not be inserted twice"
);

self.notify_conf(id);
self.notify_conf(&id);
}

/// Get a listener for execution result
fn er_listener(&mut self, id: &ProposeId) -> EventListener {
let event = self.er_notifiers.entry(*id).or_insert_with(Event::new);
fn er_listener(&mut self, id: ProposeId) -> EventListener {
let event = self.er_notifiers.entry(id).or_insert_with(Event::new);
let listener = event.listen();
if self.er_buffer.contains_key(id) {
if self.er_buffer.contains_key(&id) {
event.notify(usize::MAX);
}
listener
Expand All @@ -115,20 +115,20 @@ impl<C: Command> CommandBoard<C> {
}

/// Get a listener for after sync result
fn asr_listener(&mut self, id: &ProposeId) -> EventListener {
let event = self.asr_notifiers.entry(*id).or_insert_with(Event::new);
fn asr_listener(&mut self, id: ProposeId) -> EventListener {
let event = self.asr_notifiers.entry(id).or_insert_with(Event::new);
let listener = event.listen();
if self.asr_buffer.contains_key(id) {
if self.asr_buffer.contains_key(&id) {
event.notify(usize::MAX);
}
listener
}

/// Get a listener for conf change result
fn conf_listener(&mut self, id: &ProposeId) -> EventListener {
let event = self.conf_notifier.entry(*id).or_insert_with(Event::new);
fn conf_listener(&mut self, id: ProposeId) -> EventListener {
let event = self.conf_notifier.entry(id).or_insert_with(Event::new);
let listener = event.listen();
if self.conf_buffer.contains(id) {
if self.conf_buffer.contains(&id) {
event.notify(usize::MAX);
}
listener
Expand Down Expand Up @@ -161,12 +161,9 @@ impl<C: Command> CommandBoard<C> {
}

/// Wait for an execution result
pub(super) async fn wait_for_er(
cb: &CmdBoardRef<C>,
id: &ProposeId,
) -> Result<C::ER, C::Error> {
pub(super) async fn wait_for_er(cb: &CmdBoardRef<C>, id: ProposeId) -> Result<C::ER, C::Error> {
loop {
if let Some(er) = cb.map_read(|cb_r| cb_r.er_buffer.get(id).cloned()) {
if let Some(er) = cb.map_read(|cb_r| cb_r.er_buffer.get(&id).cloned()) {
return er;
}
let listener = cb.write().er_listener(id);
Expand All @@ -183,12 +180,12 @@ impl<C: Command> CommandBoard<C> {
/// Wait for an after sync result
pub(super) async fn wait_for_er_asr(
cb: &CmdBoardRef<C>,
id: &ProposeId,
id: ProposeId,
) -> (Result<C::ER, C::Error>, Option<Result<C::ASR, C::Error>>) {
loop {
{
let cb_r = cb.read();
match (cb_r.er_buffer.get(id), cb_r.asr_buffer.get(id)) {
match (cb_r.er_buffer.get(&id), cb_r.asr_buffer.get(&id)) {
(Some(er), None) if er.is_err() => return (er.clone(), None),
(Some(er), Some(asr)) => return (er.clone(), Some(asr.clone())),
_ => {}
Expand All @@ -200,9 +197,9 @@ impl<C: Command> CommandBoard<C> {
}

/// Wait for an conf change result
pub(super) async fn wait_for_conf(cb: &CmdBoardRef<C>, id: &ProposeId) {
pub(super) async fn wait_for_conf(cb: &CmdBoardRef<C>, id: ProposeId) {
loop {
if let Some(_ccr) = cb.map_read(|cb_r| cb_r.conf_buffer.get(id).copied()) {
if let Some(_ccr) = cb.map_read(|cb_r| cb_r.conf_buffer.get(&id).copied()) {
return;
}
let listener = cb.write().conf_listener(id);
Expand Down
6 changes: 3 additions & 3 deletions curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn worker_exe<
ce.execute(cmd, entry.index).await
};
let er_ok = er.is_ok();
cb.write().insert_er(&entry.id(), er);
cb.write().insert_er(entry.id(), er);
if !er_ok {
sp.lock().remove(&entry.id());
let _ig = ucp.lock().remove(&entry.id());
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn worker_as<
};
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr_ok = asr.is_ok();
cb.write().insert_asr(&entry.id(), asr);
cb.write().insert_asr(entry.id(), asr);
sp.lock().remove(&entry.id());
let _ig = ucp.lock().remove(&entry.id());
debug!("{id} cmd({}) after sync is called", entry.id());
Expand All @@ -170,7 +170,7 @@ async fn worker_as<
});
let shutdown_self =
change.change_type() == ConfChangeType::Remove && change.node_id == id;
cb.write().insert_conf(&entry.id());
cb.write().insert_conf(entry.id());
if shutdown_self {
curp.shutdown_trigger().self_shutdown();
}
Expand Down
6 changes: 3 additions & 3 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
let ((leader_id, term), result) = self.curp.handle_propose(Arc::clone(&cmd));
let resp = match result {
Ok(true) => {
let er_res = CommandBoard::wait_for_er(&self.cmd_board, &cmd.id()).await;
let er_res = CommandBoard::wait_for_er(&self.cmd_board, cmd.id()).await;
ProposeResponse::new_result::<C>(leader_id, term, &er_res)
}
Ok(false) => ProposeResponse::new_empty(leader_id, term),
Expand Down Expand Up @@ -171,7 +171,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
let ((leader_id, term), result) = self.curp.handle_propose_conf_change(req.into());
let error = match result {
Ok(()) => {
CommandBoard::wait_for_conf(&self.cmd_board, &id).await;
CommandBoard::wait_for_conf(&self.cmd_board, id).await;
None
}
Err(err) => Some(err),
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
let id = req.propose_id();
debug!("{} get wait synced request for cmd({id})", self.curp.id());

let (er, asr) = CommandBoard::wait_for_er_asr(&self.cmd_board, &id).await;
let (er, asr) = CommandBoard::wait_for_er_asr(&self.cmd_board, id).await;
let resp = WaitSyncedResponse::new_from_result::<C>(Some(er), asr);

debug!("{} wait synced for cmd({id}) finishes", self.curp.id());
Expand Down
6 changes: 3 additions & 3 deletions xline/src/server/barriers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ impl IdBarrier {
}

/// Trigger the barrier of the given id.
pub(crate) fn trigger(&self, id: &ProposeId) {
if let Some(event) = self.barriers.lock().remove(id) {
pub(crate) fn trigger(&self, id: ProposeId) {
if let Some(event) = self.barriers.lock().remove(&id) {
event.notify(usize::MAX);
}
}
Expand Down Expand Up @@ -120,7 +120,7 @@ mod test {
.collect::<Vec<_>>();
sleep(Duration::from_millis(10)).await;
for i in 0..5 {
id_barrier.trigger(&ProposeId(i, i));
id_barrier.trigger(ProposeId(i, i));
}
timeout(Duration::from_millis(100), join_all(barriers))
.await
Expand Down
6 changes: 3 additions & 3 deletions xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ where
) -> Result<<Command as CurpCommand>::PR, <Command as CurpCommand>::Error> {
let wrapper = cmd.request();
if let Err(e) = self.auth_storage.check_permission(wrapper) {
self.id_barrier.trigger(&cmd.id());
self.id_barrier.trigger(cmd.id());
self.index_barrier.trigger(index);
return Err(e);
}
Expand Down Expand Up @@ -346,7 +346,7 @@ where
match res {
Ok(res) => Ok(res),
Err(e) => {
self.id_barrier.trigger(&cmd.id());
self.id_barrier.trigger(cmd.id());
self.index_barrier.trigger(index);
Err(e)
}
Expand All @@ -372,7 +372,7 @@ where
self.kv_storage.insert_index(key_revisions);
}
self.lease_storage.mark_lease_synced(&wrapper.request);
self.id_barrier.trigger(&cmd.id());
self.id_barrier.trigger(cmd.id());
self.index_barrier.trigger(index);
Ok(res)
}
Expand Down

0 comments on commit aa83ea0

Please sign in to comment.